Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Refactor handling response bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
mrcnski committed Nov 20, 2022
1 parent 80d2cf6 commit 8e5139f
Showing 1 changed file with 91 additions and 70 deletions.
161 changes: 91 additions & 70 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ pub enum Outcome {
DidNotMakeIt,
}

#[derive(Debug)]
enum Selected {
Done(PrepareResult),
IoErr,
Deadline,
}

/// Given the idle token of a worker and parameters of work, communicates with the worker and
/// returns the outcome.
pub async fn start_work(
Expand Down Expand Up @@ -105,17 +112,9 @@ pub async fn start_work(
}

// Wait for the result from the worker, keeping in mind that there may be a timeout, the
// worker may get killed, or something along these lines.
// worker may get killed, or something along these lines. In that case we should propagate
// the error to the pool.
//
// In that case we should propagate the error to the pool.

#[derive(Debug)]
enum Selected {
Done(PrepareResult),
IoErr,
Deadline,
}

// We use a generous timeout here. This is in addition to the one in the child process, in
// case the child stalls. We have a wall clock timeout here in the host, but a CPU timeout
// in the child. We want to use CPU time because it varies less than wall clock time under
Expand All @@ -125,64 +124,15 @@ pub async fn start_work(
let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await;

let selected = match result {
// TODO: This case is really long, refactor.
Ok(Ok(response_bytes)) => {
// Received bytes from worker within the time limit.
// By convention we expect encoded `PrepareResult`.
if let Ok(result) = PrepareResult::decode(&mut response_bytes.as_slice()) {
if let Ok(cpu_time_elapsed) = result {
if cpu_time_elapsed > preparation_timeout {
// The job didn't complete within the timeout.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"prepare job took {}ms cpu time, exceeded preparation timeout {}ms",
cpu_time_elapsed.as_millis(),
preparation_timeout.as_millis()
);

// Return a timeout error. The artifact exists, but is located in a
// temporary file which will be cleared by `with_tmp_file`.
Selected::Deadline
} else {
gum::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"promoting WIP artifact {} to {}",
tmp_file.display(),
artifact_path.display(),
);

async_std::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Selected::Done(result))
.unwrap_or_else(|err| {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to rename the artifact from {} to {}: {:?}",
tmp_file.display(),
artifact_path.display(),
err,
);
Selected::IoErr
})
}
} else {
Selected::Done(result)
}
} else {
// We received invalid bytes from the worker.
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes),
);
Selected::IoErr
}
},
Ok(Ok(response_bytes)) =>
handle_response_bytes(
response_bytes,
pid,
tmp_file,
artifact_path,
preparation_timeout,
)
.await,
Ok(Err(err)) => {
// Communication error within the time limit.
gum::warn!(
Expand All @@ -194,7 +144,7 @@ pub async fn start_work(
Selected::IoErr
},
Err(_) => {
// Timed out.
// Timed out here on the host.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
Expand All @@ -205,7 +155,7 @@ pub async fn start_work(
};

match selected {
// Timed out. This should already be logged by the child.
// Timed out on the child. This should already be logged by the child.
Selected::Done(Err(PrepareError::TimedOut)) => Outcome::TimedOut,
Selected::Done(result) =>
Outcome::Concluded { worker: IdleWorker { stream, pid }, result },
Expand All @@ -216,6 +166,77 @@ pub async fn start_work(
.await
}

/// Handles receiving response bytes on the host from the child.
async fn handle_response_bytes(
response_bytes: Vec<u8>,
pid: u32,
tmp_file: PathBuf,
artifact_path: PathBuf,
preparation_timeout: Duration,
) -> Selected {
// Received bytes from worker within the time limit.
// By convention we expect encoded `PrepareResult`.
let result = match PrepareResult::decode(&mut response_bytes.as_slice()) {
Ok(result) => result,
Err(_) => {
// We received invalid bytes from the worker.
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes),
);
return Selected::IoErr
},
};
let cpu_time_elapsed = match result {
Ok(result) => result,
Err(_) => return Selected::Done(result),
};

if cpu_time_elapsed > preparation_timeout {
// The job didn't complete within the timeout.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"prepare job took {}ms cpu time, exceeded preparation timeout {}ms. Clearing WIP artifact {}",
cpu_time_elapsed.as_millis(),
preparation_timeout.as_millis(),
tmp_file.display(),
);

// Return a timeout error.
//
// NOTE: The artifact exists, but is located in a temporary file which
// will be cleared by `with_tmp_file`.
return Selected::Deadline
}

gum::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"promoting WIP artifact {} to {}",
tmp_file.display(),
artifact_path.display(),
);

async_std::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Selected::Done(result))
.unwrap_or_else(|err| {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to rename the artifact from {} to {}: {:?}",
tmp_file.display(),
artifact_path.display(),
err,
);
Selected::IoErr
})
}

/// Create a temporary file for an artifact at the given cache path and execute the given
/// future/closure passing the file path in.
///
Expand Down

0 comments on commit 8e5139f

Please sign in to comment.