-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Add support for async worker processes #2547
Changes from 1 commit
15ea869
f9b38ca
2486caf
b2f3ebd
11ca5fd
c767aba
fa1acbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
No users yet, but next up is the process pool.
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,12 +25,20 @@ | |
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Joiner; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.base.Throwables; | ||
import com.google.common.util.concurrent.ListenableFuture; | ||
import com.google.common.util.concurrent.MoreExecutors; | ||
import com.google.common.util.concurrent.SettableFuture; | ||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.util.HashSet; | ||
import java.util.Optional; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import javax.annotation.Nullable; | ||
|
||
|
@@ -45,8 +53,14 @@ public class WorkerProcess implements Closeable { | |
private final Path stdErr; | ||
private final AtomicInteger currentMessageID = new AtomicInteger(); | ||
private boolean handshakePerformed = false; | ||
private final ConcurrentHashMap<Integer, SettableFuture<Integer>> commandExitCodes = | ||
new ConcurrentHashMap<>(); | ||
@Nullable private WorkerProcessProtocol.CommandSender protocol; | ||
@Nullable private ProcessExecutor.LaunchedProcess launchedProcess; | ||
private final Thread readerThread; | ||
private final CountDownLatch readerShutdownSignal = new CountDownLatch(1); | ||
private final Thread watchdogThread; | ||
private final CountDownLatch watchdogShutdownSignal = new CountDownLatch(1); | ||
|
||
/** | ||
* Worker process is a process that stays alive and receives commands which describe jobs. Worker | ||
|
@@ -73,18 +87,26 @@ public WorkerProcess( | |
processParams.withRedirectError(ProcessBuilder.Redirect.to(stdErr.toFile())); | ||
this.filesystem = filesystem; | ||
this.tmpPath = tmpPath; | ||
this.readerThread = new Thread(this::readerLoop); | ||
this.readerThread.setDaemon(true); | ||
this.readerThread.setName( | ||
"Worker Process IO Thread: " + Joiner.on(' ').join(processParams.getCommand())); | ||
this.watchdogThread = new Thread(this::watchdogLoop); | ||
this.watchdogThread.setDaemon(true); | ||
this.watchdogThread.setName( | ||
"Worker Process Watchdog Thread: " + Joiner.on(' ').join(processParams.getCommand())); | ||
} | ||
|
||
public boolean isAlive() { | ||
return launchedProcess != null && launchedProcess.isAlive(); | ||
return launchedProcess != null && launchedProcess.isAlive() && this.readerThread.isAlive(); | ||
} | ||
|
||
public synchronized void ensureLaunchAndHandshake() throws IOException { | ||
if (handshakePerformed) { | ||
return; | ||
} | ||
LOG.debug( | ||
"Starting up process %d using command: \'%s\'", | ||
"Starting up process %d using command: '%s'", | ||
this.hashCode(), Joiner.on(' ').join(processParams.getCommand())); | ||
launchedProcess = executor.launchProcess(processParams); | ||
protocol = | ||
|
@@ -101,10 +123,12 @@ public synchronized void ensureLaunchAndHandshake() throws IOException { | |
|
||
LOG.debug("Handshaking with process %d", this.hashCode()); | ||
protocol.handshake(currentMessageID.getAndIncrement()); | ||
this.readerThread.start(); | ||
this.watchdogThread.start(); | ||
handshakePerformed = true; | ||
} | ||
|
||
public synchronized WorkerJobResult submitAndWaitForJob(String jobArgs) throws IOException { | ||
public ListenableFuture<WorkerJobResult> submitJob(String jobArgs) throws IOException { | ||
Preconditions.checkState( | ||
protocol != null, | ||
"Tried to submit a job to the worker process before the handshake was performed."); | ||
|
@@ -117,31 +141,60 @@ public synchronized WorkerJobResult submitAndWaitForJob(String jobArgs) throws I | |
filesystem.deleteFileAtPathIfExists(stderrPath); | ||
filesystem.writeContentsToPath(jobArgs, argsPath); | ||
|
||
LOG.debug( | ||
"Sending job %d to process %d \n" + " job arguments: \'%s\'", | ||
messageID, this.hashCode(), jobArgs); | ||
protocol.send(messageID, ImmutableWorkerProcessCommand.of(argsPath, stdoutPath, stderrPath)); | ||
LOG.debug("Receiving response for job %d from process %d", messageID, this.hashCode()); | ||
int exitCode = protocol.receiveCommandResponse(messageID); | ||
Optional<String> stdout = filesystem.readFileIfItExists(stdoutPath); | ||
Optional<String> stderr = filesystem.readFileIfItExists(stderrPath); | ||
LOG.debug( | ||
"Job %d for process %d finished \n" | ||
+ " exit code: %d \n" | ||
+ " stdout: %s \n" | ||
+ " stderr: %s", | ||
messageID, this.hashCode(), exitCode, stdout.orElse(""), stderr.orElse("")); | ||
SettableFuture<Integer> exitCodeFuture = SettableFuture.create(); | ||
commandExitCodes.put(messageID, exitCodeFuture); | ||
|
||
try { | ||
synchronized (this) { | ||
Preconditions.checkState( | ||
protocol != null && readerThread.isAlive(), "Worker job died somehow"); | ||
|
||
LOG.debug( | ||
"Sending job %d to process %d \n" + " job arguments: '%s'", | ||
messageID, this.hashCode(), jobArgs); | ||
protocol.send( | ||
messageID, ImmutableWorkerProcessCommand.of(argsPath, stdoutPath, stderrPath)); | ||
} | ||
} catch (Throwable t) { | ||
commandExitCodes.remove(messageID); | ||
throw t; | ||
} | ||
|
||
synchronized (commandExitCodes) { | ||
commandExitCodes.notify(); | ||
} | ||
|
||
return exitCodeFuture.transform( | ||
(exitCode) -> { | ||
LOG.debug( | ||
"Receiving response for job %d from process %d - %d", | ||
messageID, this.hashCode(), exitCode); | ||
Optional<String> stdout = filesystem.readFileIfItExists(stdoutPath); | ||
Optional<String> stderr = filesystem.readFileIfItExists(stderrPath); | ||
LOG.debug( | ||
"Job %d for process %d finished \n" | ||
+ " exit code: %d \n" | ||
+ " stdout: %s \n" | ||
+ " stderr: %s", | ||
messageID, this.hashCode(), exitCode, stdout.orElse(""), stderr.orElse("")); | ||
|
||
return WorkerJobResult.of(exitCode, stdout, stderr); | ||
return WorkerJobResult.of(exitCode, stdout, stderr); | ||
}, | ||
MoreExecutors.directExecutor()); | ||
} | ||
|
||
@Override | ||
public synchronized void close() { | ||
LOG.debug("Closing process %d", this.hashCode()); | ||
try { | ||
readerShutdownSignal.countDown(); | ||
synchronized (commandExitCodes) { | ||
commandExitCodes.notifyAll(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why notifyAll? we have only one thread in waiting. |
||
} | ||
if (protocol != null) { | ||
protocol.close(); | ||
} | ||
readerThread.join(5000); | ||
Files.deleteIfExists(stdErr); | ||
} catch (Exception e) { | ||
LOG.debug(e, "Error closing worker process %s.", processParams.getCommand()); | ||
|
@@ -166,11 +219,87 @@ public synchronized void close() { | |
e, | ||
"Error while trying to close the worker process %s.", | ||
Joiner.on(' ').join(processParams.getCommand())); | ||
} finally { | ||
watchdogShutdownSignal.countDown(); | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
void setProtocol(WorkerProcessProtocol.CommandSender protocolMock) { | ||
void launchForTesting(WorkerProcessProtocol.CommandSender protocolMock) { | ||
this.protocol = protocolMock; | ||
this.readerThread.start(); | ||
this.watchdogThread.start(); | ||
handshakePerformed = true; | ||
} | ||
|
||
private void processNextCommandResponse() throws Throwable { | ||
Preconditions.checkState( | ||
protocol != null, | ||
"Tried to submit a job to the worker process before the handshake was performed."); | ||
|
||
WorkerProcessProtocol.CommandResponse commandResponse = | ||
this.protocol.receiveNextCommandResponse(); | ||
SettableFuture<Integer> result = commandExitCodes.remove(commandResponse.getCommandId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is called in thread readerLoop, a different thread from submitJob, and both threads access commandExitCodes. Should we protect it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The locking is still a bit more complex than I'd prefer, so I'm open to suggestions here - there's a weird internal vs external locking of data - externally There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's Ok. |
||
Preconditions.checkState( | ||
result != null, | ||
"Received message id %d with no corresponding waiter! (result was %d)", | ||
commandResponse.getCommandId(), | ||
commandResponse.getExitCode()); | ||
|
||
result.set(commandResponse.getExitCode()); | ||
} | ||
|
||
private void readerLoop() { | ||
try { | ||
while (readerShutdownSignal.getCount() > 0) { | ||
// A dance here to avoid calling `processNextCommandResponse` if we're not waiting for any | ||
// commands. In the best case scenario, where we don't call `close()` until all commands are | ||
// done, this will avoid painful exceptions in `close()`. | ||
synchronized (commandExitCodes) { | ||
while (readerShutdownSignal.getCount() > 0 && commandExitCodes.isEmpty()) { | ||
commandExitCodes.wait(); | ||
} | ||
} | ||
if (readerShutdownSignal.getCount() > 0 && !commandExitCodes.isEmpty()) { | ||
processNextCommandResponse(); | ||
} | ||
} | ||
} catch (Throwable t) { | ||
// The WorkerProcessProtocol isn't really conducive to concurrency, so we just assume | ||
// that shutdowns will cause some kind of exception. | ||
// TODO(mikekap): Refactor the protocol enough to allow interrupting reads on a shutdown | ||
// signal. | ||
if (readerShutdownSignal.getCount() == 0) { | ||
return; | ||
} | ||
LOG.error(t, "Worker pool reader process failed"); | ||
Throwables.throwIfUnchecked(t); | ||
throw new RuntimeException(t); | ||
} | ||
} | ||
|
||
private void watchdogLoop() { | ||
try { | ||
while (!watchdogShutdownSignal.await(5, TimeUnit.SECONDS)) { | ||
if (!readerThread.isAlive()) { | ||
HashSet<Integer> keys = new HashSet<>(commandExitCodes.keySet()); | ||
for (Integer key : keys) { | ||
SettableFuture<Integer> result = commandExitCodes.remove(key); | ||
if (result != null) { | ||
result.setException(new RuntimeException("Worker process error")); | ||
} | ||
} | ||
} | ||
} | ||
} catch (Throwable t) { | ||
LOG.error(t, "Worker Process Watchdog thread error!"); | ||
Throwables.throwIfUnchecked(t); | ||
throw new RuntimeException(t); | ||
} finally { | ||
// Kill it with fire. | ||
while (readerThread.isAlive()) { | ||
readerThread.interrupt(); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variables readerShutdownSignal and watchdogShutdownSignal are used as flags, should we make them volatile boolean? That way we will have 2 less objects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite - the latches are
.await
-able. Realistically these objects probably aren't memory-intensive for buck - there's aWorkerProcess
object per long-lived process. You would rarely have more than ncpus of these processes running -- though I guess it depends how many worker tools you use!Update: I got rid of these by killing the watchdog and went with your suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks you