Skip to content
This repository has been archived by the owner on Nov 10, 2023. It is now read-only.

Add support for async worker processes #2547

Merged
merged 7 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Allow WorkerProcess to be used for multiple jobs at the same time.
No users yet, but next up is the process pool.
  • Loading branch information
mikekap committed Sep 25, 2020
commit 15ea869d76e7de3770e464f233beea976691da05
7 changes: 6 additions & 1 deletion src/com/facebook/buck/shell/WorkerShellStep.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import com.facebook.buck.worker.WorkerProcessPool.BorrowedWorkerProcess;
import com.facebook.buck.worker.WorkerProcessPoolFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class WorkerShellStep implements Step {
Expand Down Expand Up @@ -80,7 +82,10 @@ public StepExecutionResult execute(ExecutionContext context)
factory.getWorkerProcessPool(context, paramsToUse.getWorkerProcessParams());
WorkerJobResult result;
try (BorrowedWorkerProcess process = pool.borrowWorkerProcess()) {
result = process.submitAndWaitForJob(getExpandedJobArgs(context));
result = process.submitJob(getExpandedJobArgs(context)).get();
} catch (ExecutionException e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}

Verbosity verbosity = context.getVerbosity();
Expand Down
167 changes: 148 additions & 19 deletions src/com/facebook/buck/worker/WorkerProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,20 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;

Expand All @@ -45,8 +53,14 @@ public class WorkerProcess implements Closeable {
private final Path stdErr;
private final AtomicInteger currentMessageID = new AtomicInteger();
private boolean handshakePerformed = false;
private final ConcurrentHashMap<Integer, SettableFuture<Integer>> commandExitCodes =
new ConcurrentHashMap<>();
@Nullable private WorkerProcessProtocol.CommandSender protocol;
@Nullable private ProcessExecutor.LaunchedProcess launchedProcess;
private final Thread readerThread;
private final CountDownLatch readerShutdownSignal = new CountDownLatch(1);
private final Thread watchdogThread;
private final CountDownLatch watchdogShutdownSignal = new CountDownLatch(1);
Copy link
Contributor

@v-jizhang v-jizhang Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variables readerShutdownSignal and watchdogShutdownSignal are used as flags, should we make them volatile boolean? That way we will have 2 less objects.

Copy link
Contributor Author

@mikekap mikekap Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite - the latches are .await-able. Realistically these objects probably aren't memory-intensive for buck - there's a WorkerProcess object per long-lived process. You would rarely have more than ncpus of these processes running -- though I guess it depends how many worker tools you use!

Update: I got rid of these by killing the watchdog and went with your suggestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks you


/**
* Worker process is a process that stays alive and receives commands which describe jobs. Worker
Expand All @@ -73,18 +87,26 @@ public WorkerProcess(
processParams.withRedirectError(ProcessBuilder.Redirect.to(stdErr.toFile()));
this.filesystem = filesystem;
this.tmpPath = tmpPath;
this.readerThread = new Thread(this::readerLoop);
this.readerThread.setDaemon(true);
this.readerThread.setName(
"Worker Process IO Thread: " + Joiner.on(' ').join(processParams.getCommand()));
this.watchdogThread = new Thread(this::watchdogLoop);
this.watchdogThread.setDaemon(true);
this.watchdogThread.setName(
"Worker Process Watchdog Thread: " + Joiner.on(' ').join(processParams.getCommand()));
}

public boolean isAlive() {
return launchedProcess != null && launchedProcess.isAlive();
return launchedProcess != null && launchedProcess.isAlive() && this.readerThread.isAlive();
}

public synchronized void ensureLaunchAndHandshake() throws IOException {
if (handshakePerformed) {
return;
}
LOG.debug(
"Starting up process %d using command: \'%s\'",
"Starting up process %d using command: '%s'",
this.hashCode(), Joiner.on(' ').join(processParams.getCommand()));
launchedProcess = executor.launchProcess(processParams);
protocol =
Expand All @@ -101,10 +123,12 @@ public synchronized void ensureLaunchAndHandshake() throws IOException {

LOG.debug("Handshaking with process %d", this.hashCode());
protocol.handshake(currentMessageID.getAndIncrement());
this.readerThread.start();
this.watchdogThread.start();
handshakePerformed = true;
}

public synchronized WorkerJobResult submitAndWaitForJob(String jobArgs) throws IOException {
public ListenableFuture<WorkerJobResult> submitJob(String jobArgs) throws IOException {
Preconditions.checkState(
protocol != null,
"Tried to submit a job to the worker process before the handshake was performed.");
Expand All @@ -117,31 +141,60 @@ public synchronized WorkerJobResult submitAndWaitForJob(String jobArgs) throws I
filesystem.deleteFileAtPathIfExists(stderrPath);
filesystem.writeContentsToPath(jobArgs, argsPath);

LOG.debug(
"Sending job %d to process %d \n" + " job arguments: \'%s\'",
messageID, this.hashCode(), jobArgs);
protocol.send(messageID, ImmutableWorkerProcessCommand.of(argsPath, stdoutPath, stderrPath));
LOG.debug("Receiving response for job %d from process %d", messageID, this.hashCode());
int exitCode = protocol.receiveCommandResponse(messageID);
Optional<String> stdout = filesystem.readFileIfItExists(stdoutPath);
Optional<String> stderr = filesystem.readFileIfItExists(stderrPath);
LOG.debug(
"Job %d for process %d finished \n"
+ " exit code: %d \n"
+ " stdout: %s \n"
+ " stderr: %s",
messageID, this.hashCode(), exitCode, stdout.orElse(""), stderr.orElse(""));
SettableFuture<Integer> exitCodeFuture = SettableFuture.create();
commandExitCodes.put(messageID, exitCodeFuture);

try {
synchronized (this) {
Preconditions.checkState(
protocol != null && readerThread.isAlive(), "Worker job died somehow");

LOG.debug(
"Sending job %d to process %d \n" + " job arguments: '%s'",
messageID, this.hashCode(), jobArgs);
protocol.send(
messageID, ImmutableWorkerProcessCommand.of(argsPath, stdoutPath, stderrPath));
}
} catch (Throwable t) {
commandExitCodes.remove(messageID);
throw t;
}

synchronized (commandExitCodes) {
commandExitCodes.notify();
}

return exitCodeFuture.transform(
(exitCode) -> {
LOG.debug(
"Receiving response for job %d from process %d - %d",
messageID, this.hashCode(), exitCode);
Optional<String> stdout = filesystem.readFileIfItExists(stdoutPath);
Optional<String> stderr = filesystem.readFileIfItExists(stderrPath);
LOG.debug(
"Job %d for process %d finished \n"
+ " exit code: %d \n"
+ " stdout: %s \n"
+ " stderr: %s",
messageID, this.hashCode(), exitCode, stdout.orElse(""), stderr.orElse(""));

return WorkerJobResult.of(exitCode, stdout, stderr);
return WorkerJobResult.of(exitCode, stdout, stderr);
},
MoreExecutors.directExecutor());
}

@Override
public synchronized void close() {
LOG.debug("Closing process %d", this.hashCode());
try {
readerShutdownSignal.countDown();
synchronized (commandExitCodes) {
commandExitCodes.notifyAll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why notifyAll? we have only one thread in waiting.

}
if (protocol != null) {
protocol.close();
}
readerThread.join(5000);
Files.deleteIfExists(stdErr);
} catch (Exception e) {
LOG.debug(e, "Error closing worker process %s.", processParams.getCommand());
Expand All @@ -166,11 +219,87 @@ public synchronized void close() {
e,
"Error while trying to close the worker process %s.",
Joiner.on(' ').join(processParams.getCommand()));
} finally {
watchdogShutdownSignal.countDown();
}
}

@VisibleForTesting
void setProtocol(WorkerProcessProtocol.CommandSender protocolMock) {
void launchForTesting(WorkerProcessProtocol.CommandSender protocolMock) {
this.protocol = protocolMock;
this.readerThread.start();
this.watchdogThread.start();
handshakePerformed = true;
}

private void processNextCommandResponse() throws Throwable {
Preconditions.checkState(
protocol != null,
"Tried to submit a job to the worker process before the handshake was performed.");

WorkerProcessProtocol.CommandResponse commandResponse =
this.protocol.receiveNextCommandResponse();
SettableFuture<Integer> result = commandExitCodes.remove(commandResponse.getCommandId());
Copy link
Contributor

@v-jizhang v-jizhang Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is called in thread readerLoop, a different thread from submitJob, and both threads access commandExitCodes. Should we protect it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commandExitCodes is a ConcurrentHashMap so there's no need. Though honestly the threading here is a bit complex because of the watchdog. I updated the implementation to get rid of the watchdog (it was mostly for paranoia) and add a volatile boolean for the shutdown signal as you suggested. Also added a test for worker death.

The locking is still a bit more complex than I'd prefer, so I'm open to suggestions here - there's a weird internal vs external locking of data - externally this is used to lock submitJob & close, but the reader thread uses readerThread to be woken up when there is something to do.

Copy link
Contributor

@v-jizhang v-jizhang Oct 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's Ok.

Preconditions.checkState(
result != null,
"Received message id %d with no corresponding waiter! (result was %d)",
commandResponse.getCommandId(),
commandResponse.getExitCode());

result.set(commandResponse.getExitCode());
}

private void readerLoop() {
try {
while (readerShutdownSignal.getCount() > 0) {
// A dance here to avoid calling `processNextCommandResponse` if we're not waiting for any
// commands. In the best case scenario, where we don't call `close()` until all commands are
// done, this will avoid painful exceptions in `close()`.
synchronized (commandExitCodes) {
while (readerShutdownSignal.getCount() > 0 && commandExitCodes.isEmpty()) {
commandExitCodes.wait();
}
}
if (readerShutdownSignal.getCount() > 0 && !commandExitCodes.isEmpty()) {
processNextCommandResponse();
}
}
} catch (Throwable t) {
// The WorkerProcessProtocol isn't really conducive to concurrency, so we just assume
// that shutdowns will cause some kind of exception.
// TODO(mikekap): Refactor the protocol enough to allow interrupting reads on a shutdown
// signal.
if (readerShutdownSignal.getCount() == 0) {
return;
}
LOG.error(t, "Worker pool reader process failed");
Throwables.throwIfUnchecked(t);
throw new RuntimeException(t);
}
}

private void watchdogLoop() {
try {
while (!watchdogShutdownSignal.await(5, TimeUnit.SECONDS)) {
if (!readerThread.isAlive()) {
HashSet<Integer> keys = new HashSet<>(commandExitCodes.keySet());
for (Integer key : keys) {
SettableFuture<Integer> result = commandExitCodes.remove(key);
if (result != null) {
result.setException(new RuntimeException("Worker process error"));
}
}
}
}
} catch (Throwable t) {
LOG.error(t, "Worker Process Watchdog thread error!");
Throwables.throwIfUnchecked(t);
throw new RuntimeException(t);
} finally {
// Kill it with fire.
while (readerThread.isAlive()) {
readerThread.interrupt();
}
}
}
}
5 changes: 3 additions & 2 deletions src/com/facebook/buck/worker/WorkerProcessPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.hash.HashCode;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -207,8 +208,8 @@ public void close() {
*
* @throws IOException
*/
public WorkerJobResult submitAndWaitForJob(String expandedJobArgs) throws IOException {
return get().submitAndWaitForJob(expandedJobArgs);
public ListenableFuture<WorkerJobResult> submitJob(String expandedJobArgs) throws IOException {
return get().submitJob(expandedJobArgs);
}

@VisibleForTesting
Expand Down
19 changes: 18 additions & 1 deletion src/com/facebook/buck/worker/WorkerProcessProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,30 @@
import java.io.IOException;

public interface WorkerProcessProtocol {
class CommandResponse {
private final int commandId;
private final int exitCode;

public CommandResponse(int commandId, int exitCode) {
this.commandId = commandId;
this.exitCode = exitCode;
}

public int getExitCode() {
return exitCode;
}

public int getCommandId() {
return commandId;
}
}

interface CommandSender extends Closeable {
void handshake(int messageId) throws IOException;

void send(int messageId, WorkerProcessCommand command) throws IOException;

int receiveCommandResponse(int messageID) throws IOException;
CommandResponse receiveNextCommandResponse() throws IOException;

/** Instructs the CommandReceiver to shut itself down. */
@Override
Expand Down
10 changes: 2 additions & 8 deletions src/com/facebook/buck/worker/WorkerProcessProtocolZero.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void send(int messageId, WorkerProcessCommand command) throws IOException
}
*/
@Override
public int receiveCommandResponse(int messageID) throws IOException {
public WorkerProcessProtocol.CommandResponse receiveNextCommandResponse() throws IOException {
int id = -1;
int exitCode = -1;
String type = "";
Expand Down Expand Up @@ -157,20 +157,14 @@ public int receiveCommandResponse(int messageID) throws IOException {
getStdErrorOutput(stdErr));
}

if (id != messageID) {
throw new HumanReadableException(
String.format(
"Expected response's \"id\" value to be " + "\"%d\", got \"%d\" instead.",
messageID, id));
}
if (!type.equals(TYPE_RESULT) && !type.equals(TYPE_ERROR)) {
throw new HumanReadableException(
String.format(
"Expected response's \"type\" "
+ "to be one of [\"%s\",\"%s\"], got \"%s\" instead.",
TYPE_RESULT, TYPE_ERROR, type));
}
return exitCode;
return new WorkerProcessProtocol.CommandResponse(id, exitCode);
}

@Override
Expand Down
9 changes: 6 additions & 3 deletions test/com/facebook/buck/worker/FakeWorkerProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -41,7 +43,6 @@ public FakeWorkerProcess(ImmutableMap<String, WorkerJobResult> jobArgsToJobResul
Paths.get("tmp").toAbsolutePath().normalize());
this.jobArgsToJobResultMap = jobArgsToJobResultMap;
this.isAlive = false;
this.setProtocol(new FakeWorkerProcessProtocol.FakeCommandSender());
}

@Override
Expand All @@ -55,13 +56,15 @@ public synchronized void ensureLaunchAndHandshake() {
}

@Override
public synchronized WorkerJobResult submitAndWaitForJob(String jobArgs) {
public synchronized ListenableFuture<WorkerJobResult> submitJob(String jobArgs) {
WorkerJobResult result = this.jobArgsToJobResultMap.get(jobArgs);
if (result == null) {
throw new IllegalArgumentException(
String.format("No fake WorkerJobResult found for job arguments '%s'", jobArgs));
}
return result;
SettableFuture<WorkerJobResult> out = SettableFuture.create();
out.set(result);
return out;
}

@Override
Expand Down
Loading