Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Use CPU clock timeout for PVF jobs #6282

Merged
merged 23 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
aad7de7
Put in skeleton logic for CPU-time-preparation
mrcnski Nov 10, 2022
c62a8e1
Continue filling in logic for prepare worker CPU time changes
mrcnski Nov 13, 2022
9c68422
Fix compiler errors
mrcnski Nov 13, 2022
9fac3a4
Update lenience factor
mrcnski Nov 14, 2022
9f67150
Fix some clippy lints for PVF module
mrcnski Nov 14, 2022
e34825a
Fix compilation errors
mrcnski Nov 15, 2022
fe139e1
Address some review comments
mrcnski Nov 16, 2022
7b7886c
Add logging
mrcnski Nov 16, 2022
6006fe0
Add another log
mrcnski Nov 16, 2022
ebc9825
Merge branch 'master' into m-cat/pvf-preparation-cpu-time
mrcnski Nov 20, 2022
80d2cf6
Address some review comments; change Mutex to AtomicBool
mrcnski Nov 20, 2022
8e5139f
Refactor handling response bytes
mrcnski Nov 20, 2022
d8add29
Add CPU clock timeout logic for execute jobs
mrcnski Nov 20, 2022
c50a08f
Properly handle AtomicBool flag
mrcnski Nov 20, 2022
bc6172e
Use `Ordering::Relaxed`
mrcnski Nov 20, 2022
eb9372c
Refactor thread coordination logic
mrcnski Nov 21, 2022
6576ea1
Fix bug
mrcnski Nov 22, 2022
ec60713
Add some timing information to execute tests
mrcnski Nov 22, 2022
efd50fe
Merge branch 'master' into m-cat/pvf-preparation-cpu-time
mrcnski Nov 23, 2022
09560ee
Add section about the mitigation to the IG
mrcnski Nov 23, 2022
7c859d6
minor: Change more `Ordering`s to `Relaxed`
mrcnski Nov 25, 2022
fb6c5e9
candidate-validation: Fix build errors
mrcnski Nov 28, 2022
3e98ee6
Merge branch 'master' into m-cat/pvf-preparation-cpu-time
mrcnski Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Put in skeleton logic for CPU-time-preparation
Still needed:
- Flesh out logic
- Refactor some spots
- Tests
  • Loading branch information
mrcnski committed Nov 13, 2022
commit aad7de732add696e513c0a1a39eba83a323d8b09
5 changes: 3 additions & 2 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
use parity_scale_codec::{Decode, Encode};
use std::any::Any;

/// Result of PVF preparation performed by the validation host.
pub type PrepareResult = Result<(), PrepareError>;
/// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if
/// successful.
pub type PrepareResult = Result<Duration, PrepareError>;

/// An error that occurred during the prepare part of the PVF pipeline.
#[derive(Debug, Clone, Encode, Decode)]
Expand Down
4 changes: 2 additions & 2 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ async fn handle_to_host(

/// Handles PVF prechecking requests.
///
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]).
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_PREPARATION_TIMEOUT`]).
///
/// If the prepare job failed previously, we may retry it under certain conditions.
async fn handle_precheck_pvf(
Expand Down Expand Up @@ -490,7 +490,7 @@ async fn handle_precheck_pvf(
///
/// If the prepare job failed previously, we may retry it under certain conditions.
///
/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_COMPILATION_TIMEOUT`])
/// When preparing for execution, we use a more lenient timeout ([`EXECUTE_PREPARATION_TIMEOUT`])
/// than when prechecking.
async fn handle_execute_pvf(
cache_path: &Path,
Expand Down
159 changes: 104 additions & 55 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,36 @@ pub async fn start_work(
Deadline,
}

let selected =
match async_std::future::timeout(preparation_timeout, framed_recv(&mut stream)).await {
Ok(Ok(response_bytes)) => {
// Received bytes from worker within the time limit.
// By convention we expect encoded `PrepareResult`.
if let Ok(result) = PrepareResult::decode(&mut response_bytes.as_slice()) {
if result.is_ok() {
// We use a generous timeout here. We use a wall clock timeout here in the host, but a CPU
// timeout in the child. This is because CPU time can vary more under load, but more closely
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
// corresponds to the actual amount of work performed. The child should already terminate
// itself after `preparation_timeout` duration in CPU time, but we have this simple wall
// clock timeout in case the child stalls.
let timeout = preparation_timeout * PREPARATION_TIMEOUT_STALLED_FACTOR;
let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await;

let selected = match result {
// TODO: This case is really long, refactor.
slumber marked this conversation as resolved.
Show resolved Hide resolved
Ok(Ok(response_bytes)) => {
// Received bytes from worker within the time limit.
// By convention we expect encoded `PrepareResult`.
if let Ok(result) = PrepareResult::decode(&mut response_bytes.as_slice()) {
if let Ok(cpu_time_elapsed) = result {
let given_cpu_time =
preparation_timeout + PREPARATION_TIMEOUT_MINIMUM_INTERVAL;
if cpu_time_elapsed > given_cpu_time {
// Sanity check: we expect the preparation to complete within the
// timeout, plus the minimum polling interval.
gum::debug!(
target: LOG_TARGET,
worker_pid = %pid,
"child took {} cpu_time, exceeding given time {}",
cpu_time_elapsed,
given_cpu_time
);

Selected::Deadline
} else {
gum::debug!(
target: LOG_TARGET,
worker_pid = %pid,
Expand All @@ -128,36 +151,46 @@ pub async fn start_work(
);
Selected::IoErr
})
} else {
Selected::Done(result)
}
} else {
// We received invalid bytes from the worker.
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes),
);
Selected::IoErr
Selected::Done(result)
}
},
Ok(Err(err)) => {
// Communication error within the time limit.
} else {
// We received invalid bytes from the worker.
let bound_bytes = &response_bytes[..response_bytes.len().min(4)];
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to recv a prepare response: {:?}",
err,
"received unexpected response from the prepare worker: {}",
HexDisplay::from(&bound_bytes),
);
Selected::IoErr
},
Err(_) => {
// Timed out.
Selected::Deadline
},
};
}
},
Ok(Err(PrepareError::TimedOut)) => {
// Timed out. This should already be logged by the child.
Selected::Deadline
},
Ok(Err(err)) => {
// Communication error within the time limit.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"failed to recv a prepare response: {:?}",
err,
);
Selected::IoErr
},
Err(_) => {
// Timed out.
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
"did not recv a prepare response within the time limit",
);
Selected::Deadline
},
};

match selected {
Selected::Done(result) =>
Expand Down Expand Up @@ -218,13 +251,15 @@ async fn send_request(
stream: &mut UnixStream,
code: Arc<Vec<u8>>,
tmp_file: &Path,
preparation_timeout: Duration,
) -> io::Result<()> {
framed_send(stream, &*code).await?;
framed_send(stream, path_to_bytes(tmp_file)).await?;
framed_send(stream, duration_to_bytes(preparation_timeout)).await?;
Ok(())
}

async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)> {
async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf, Duration)> {
let code = framed_recv(stream).await?;
let tmp_file = framed_recv(stream).await?;
let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
Expand All @@ -233,46 +268,60 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec<u8>, PathBuf)>
"prepare pvf recv_request: non utf-8 artifact path".to_string(),
)
})?;
Ok((code, tmp_file))
let preparation_timeout = framed_recv(stream).await?;
let preparation_timeout = bytes_to_duration(&preparation_timeout).ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"prepare pvf recv_request: invalid duration".to_string(),
)
})?;
Ok((code, tmp_file, preparation_timeout))
}

/// The entrypoint that the spawned prepare worker should start with. The `socket_path` specifies
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("prepare", socket_path, |mut stream| async move {
loop {
let (code, dest) = recv_request(&mut stream).await?;
let (code, dest, timeout) = recv_request(&mut stream).await?;

gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: preparing artifact",
);

let result = match prepare_artifact(&code) {
Err(err) => {
// Serialized error will be written into the socket.
Err(err)
},
Ok(compiled_artifact) => {
// Write the serialized artifact into a temp file.
// PVF host only keeps artifacts statuses in its memory,
// successfully compiled code gets stored on the disk (and
// consequently deserialized by execute-workers). The prepare
// worker is only required to send an empty `Ok` to the pool
// to indicate the success.

gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: writing artifact to {}",
dest.display(),
);
async_std::fs::write(&dest, &compiled_artifact).await?;
// TODO: Spawn a new thread that wakes up periodically to check the elapsed CPU time.
// Terminates the process if we exceed the CPU timeout.
thread::spawn(|| {
// TODO: Treat the timeout as CPU time, which is less subject to variance due to load.

// TODO: Log if we exceed the timeout.

// TODO: Send back a PrepareError::TimedOut on timeout.
});

// Serialized error will be written into the socket.
let result = prepare_artifact(&code).map(|compiled_artifact| {
// Write the serialized artifact into a temp file.
// PVF host only keeps artifacts statuses in its memory,
// successfully compiled code gets stored on the disk (and
// consequently deserialized by execute-workers). The prepare
// worker is only required to send an empty `Ok` to the pool
// to indicate the success.

gum::debug!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"worker: writing artifact to {}",
dest.display(),
);
async_std::fs::write(&dest, &compiled_artifact).await?;

Ok(())
},
};
// TODO: We are now sending the CPU time back, changing the expected interface. Do
// we need to account for this breaking change in any way?
cpu_time_elapsed
});

framed_send(&mut stream, result.encode().as_slice()).await?;
}
Expand Down