Skip to content

Commit

Permalink
WorkerProcess: make crash on start not hang.
Browse files Browse the repository at this point in the history
Summary:
If a worker_tool crashes on startup (due to invalid args, for example)
it could cause a hang:
 1) WorkerPool is initialized, semaphore = 1
 2) a process is requested from the pool, semaphore = 0
 3) WorkerProcess fails to perform handshake. The code in WorkerShellStep
    does _not_ call returnWorkerProcess (which would normally increment the
    semaphore).
 4) the WorkerPool's semaphore count is == 0, and will never get incremented
 5) another WorkerShellStep tries to run the worker, it will wait forever.

Test Plan: added new test

Reviewed By: illicitonion

fbshipit-source-id: 084b943
  • Loading branch information
mkosiba authored and Facebook Github Bot committed Nov 15, 2016
1 parent 49a8316 commit 454545c
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 16 deletions.
64 changes: 51 additions & 13 deletions src/com/facebook/buck/shell/WorkerProcessPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,78 @@

package com.facebook.buck.shell;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.HashCode;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

public abstract class WorkerProcessPool {

private final Semaphore available;
private final int capacity;
private final LinkedBlockingQueue<WorkerProcess> workerProcesses;
private final BlockingQueue<WorkerProcess> availableWorkers;
@GuardedBy("createdWorkers")
private final List<WorkerProcess> createdWorkers;
private final HashCode poolHash;

public WorkerProcessPool(int maxWorkers, HashCode poolHash) {
capacity = maxWorkers;
available = new Semaphore(capacity, true);
workerProcesses = new LinkedBlockingQueue<>();
this.capacity = maxWorkers;
this.availableWorkers = new LinkedBlockingQueue<>();
this.createdWorkers = new ArrayList<>();
this.poolHash = poolHash;
}

public WorkerProcess borrowWorkerProcess()
throws IOException, InterruptedException {
available.acquire();
WorkerProcess workerProcess = workerProcesses.poll();
return workerProcess != null ? workerProcess : startWorkerProcess();
WorkerProcess workerProcess = availableWorkers.poll(0, TimeUnit.SECONDS);
if (workerProcess == null) {
workerProcess = createNewWorkerIfPossible();
}
if (workerProcess != null) {
return workerProcess;
}
return availableWorkers.take();
}

public void returnWorkerProcess(WorkerProcess workerProcess) throws InterruptedException {
workerProcesses.put(workerProcess);
available.release();
private @Nullable WorkerProcess createNewWorkerIfPossible() throws IOException {
synchronized (createdWorkers) {
if (createdWorkers.size() == capacity) {
return null;
}
WorkerProcess process = Preconditions.checkNotNull(startWorkerProcess());
createdWorkers.add(process);
return process;
}
}

public void returnWorkerProcess(WorkerProcess workerProcess)
throws InterruptedException {
synchronized (createdWorkers) {
Preconditions.checkArgument(
createdWorkers.contains(workerProcess),
"Trying to return a foreign WorkerProcess to the pool");
}
availableWorkers.put(workerProcess);
}

public void close() {
for (WorkerProcess process : workerProcesses) {
ImmutableSet<WorkerProcess> processesToClose;
synchronized (createdWorkers) {
processesToClose = ImmutableSet.copyOf(createdWorkers);
Preconditions.checkState(
availableWorkers.size() == createdWorkers.size(),
"WorkerProcessPool was still running when shutdown was called.");
}

for (WorkerProcess process : processesToClose) {
process.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,11 @@ public void testPersistentWorkerToolReusesProcessOnlyIfUnchanged() throws Except
ImmutableSet<String> processIDs = ImmutableSet.copyOf(contents.trim().split("\\s+"));
assertThat(processIDs.size(), Matchers.equalTo(2));
}

@Test
public void testWorkerCrashDoesNotHang() throws Exception {
workspace
.runBuckBuild("//:test10")
.assertFailure();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ worker_tool(

worker_tool(
name = 'worker3',
args = '--num-jobs 2',
args = ['--num-jobs 2'],
exe = ':concurrent_tool',
persistent = True,
)

worker_tool(
name = 'worker_crash_on_start',
args = ['--num-jobs 2', '--loc /doesnotexsist'],
exe = ':external_tool',
)

genrule(
name = 'test1',
srcs = [],
Expand Down Expand Up @@ -81,3 +87,9 @@ genrule(
out = 'output.txt',
cmd = '$(worker :worker3) $OUT',
)
genrule(
name = 'test10',
srcs = [],
out = 'output.txt',
cmd = '$(worker :worker_crash_on_start) $OUT',
)
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
#!/bin/bash

for i in "$@"; do
# Extract the "--num-jobs <n>" value from the args. This value is the number of
# jobs this script should expect to be sent to it from buck.
num_jobs=$(echo "$@" | sed 's/--num-jobs \([0-9]*\)/\1/')
case $i in
--num-jobs)
num_jobs="$2"
shift
;;
# Extract the "--loc <file>" value from the args. This value is a path that
# should exist.
--loc)
loc="$2"
shift
;;
*)
shift
;;
esac
done

ARGS="--num-jobs $num_jobs"
if [ "$loc" != "" -a ! -e $loc ]; then
echo "$loc does not exist" 1>&2
exit 1
elif [ "$loc" != "" ]; then
ARGS="$ARGS --loc exists"
fi

# Read in the handshake JSON.
read -d "}" handshake_json
Expand All @@ -23,7 +47,7 @@ do
# contains the path for the output file.
output_path=$(cat "$args_path")
# Write to the output file.
echo "the startup arguments were: $@" > $output_path
echo "the startup arguments were: $ARGS" > $output_path
# Send the job result reply.
printf ",{\"id\":%s, \"type\":\"result\", \"exit_code\":0}" "$message_id"
done
Expand Down

0 comments on commit 454545c

Please sign in to comment.