Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Let the PVF host kill the worker on timeout (#6381)
Browse files Browse the repository at this point in the history
* Let the PVF host kill the worker on timeout

* Fix comment

* Fix inaccurate comments; add missing return statement

* Fix a comment

* Fix comment
  • Loading branch information
mrcnski authored Dec 6, 2022
1 parent 9cf336a commit d57049f
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 15 deletions.
4 changes: 3 additions & 1 deletion node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ pub enum Outcome {

/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
///
/// NOTE: Returning the `HardTimeout` or `IoErr` errors will trigger the child process being killed.
pub async fn start_work(
worker: IdleWorker,
artifact: ArtifactPathId,
Expand Down Expand Up @@ -148,7 +150,7 @@ pub async fn start_work(
target: LOG_TARGET,
worker_pid = %pid,
validation_code_hash = ?artifact.id.code_hash,
"execution worker exceeded alloted time for execution",
"execution worker exceeded allotted time for execution",
);
// TODO: This case is not really a hard timeout as the timeout here in the host is
// lenient. Should fix this as part of
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ async fn run(
from_prepare_queue = from_prepare_queue_rx.next() => {
let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));

// Note that preparation always succeeds.
// Note that the preparation outcome is always reported as concluded.
//
// That's because the error conditions are written into the artifact and will be
// reported at the time of the execution. It potentially, but not necessarily, can
Expand Down
5 changes: 4 additions & 1 deletion node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,15 @@ fn handle_mux(
Ok(())
},
PoolEvent::StartWork(worker, outcome) => {
// If we receive any outcome other than `Concluded`, we attempt to kill the worker
// process.
match outcome {
Outcome::Concluded { worker: idle, result } => {
let data = match spawned.get_mut(worker) {
None => {
// Perhaps the worker was killed meanwhile and the result is no longer
// relevant.
// relevant. We already send `Rip` when purging if we detect that the
// worker is dead.
return Ok(())
},
Some(data) => data,
Expand Down
10 changes: 7 additions & 3 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ enum Selected {

/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
///
/// NOTE: Returning the `TimedOut` or `DidNotMakeIt` errors will trigger the child process being
/// killed.
pub async fn start_work(
worker: IdleWorker,
code: Arc<Vec<u8>>,
Expand Down Expand Up @@ -149,6 +152,7 @@ pub async fn start_work(
},
};

// NOTE: A `TimedOut` or `DidNotMakeIt` error triggers the child process being killed.
match selected {
// Timed out on the child. This should already be logged by the child.
Selected::Done(Err(PrepareError::TimedOut)) => Outcome::TimedOut,
Expand All @@ -162,6 +166,9 @@ pub async fn start_work(
}

/// Handles the case where we successfully received response bytes on the host from the child.
///
/// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be
/// cleared by `with_tmp_file`.
async fn handle_response_bytes(
response_bytes: Vec<u8>,
pid: u32,
Expand Down Expand Up @@ -201,9 +208,6 @@ async fn handle_response_bytes(
);

// Return a timeout error.
//
// NOTE: The artifact exists, but is located in a temporary file which
// will be cleared by `with_tmp_file`.
return Selected::Deadline
}

Expand Down
19 changes: 10 additions & 9 deletions node/core/pvf/src/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,8 @@ where
}

/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
/// from sleeping and then either sleeps for the remaining CPU time, or kills the process if we
/// exceed the CPU timeout.
///
/// NOTE: Killed processes are detected and cleaned up in `purge_dead`.
/// from sleeping and then either sleeps for the remaining CPU time, or sends back a timeout error
/// if we exceed the CPU timeout.
///
/// NOTE: If the job completes and this thread is still sleeping, it will continue sleeping in the
/// background. When it wakes, it will see that the flag has been set and return.
Expand Down Expand Up @@ -233,7 +231,11 @@ pub async fn cpu_time_monitor_loop(
timeout.as_millis(),
);

// Send back a TimedOut error on timeout.
// Send back a `TimedOut` error.
//
// NOTE: This will cause the worker, whether preparation or execution, to be killed by
// the host. We do not kill the process here because it would interfere with the proper
// handling of this error.
let encoded_result = match job_kind {
JobKind::Prepare => {
let result: Result<(), PrepareError> = Err(PrepareError::TimedOut);
Expand All @@ -244,8 +246,8 @@ pub async fn cpu_time_monitor_loop(
result.encode()
},
};
// If we error there is nothing else we can do here, and we are killing the process,
// anyway. The receiving side will just have to time out.
// If we error here there is nothing we can do apart from log it. The receiving side
// will just have to time out.
if let Err(err) = framed_send(&mut stream, encoded_result.as_slice()).await {
gum::warn!(
target: LOG_TARGET,
Expand All @@ -255,8 +257,7 @@ pub async fn cpu_time_monitor_loop(
);
}

// Kill the process.
std::process::exit(1);
return
}

// Sleep for the remaining CPU time, plus a bit to account for overhead. Note that the sleep
Expand Down

0 comments on commit d57049f

Please sign in to comment.