Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

PVF timeouts follow-up #6151

Merged
merged 6 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ use std::{
time::{Duration, SystemTime},
};

/// The time period after which the precheck preparation worker is considered unresponsive and will
/// be killed.
/// For prechecking requests, the time period after which the preparation worker is considered
/// unresponsive and will be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
pub const PRECHECK_COMPILATION_TIMEOUT: Duration = Duration::from_secs(60);
pub const PRECHECK_PREPARATION_TIMEOUT: Duration = Duration::from_secs(60);

/// The time period after which the execute preparation worker is considered unresponsive and will
/// be killed.
/// For execution and heads-up requests, the time period after which the preparation worker is
/// considered unresponsive and will be killed. More lenient than the timeout for prechecking to
/// prevent honest validators from timing out on valid PVFs.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
pub const EXECUTE_COMPILATION_TIMEOUT: Duration = Duration::from_secs(180);
pub const LENIENT_PREPARATION_TIMEOUT: Duration = Duration::from_secs(360);

/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;
Expand Down Expand Up @@ -429,9 +430,10 @@ async fn handle_to_host(
Ok(())
}

/// Handles PVF prechecking.
/// Handles PVF prechecking requests.
///
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]).
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout
/// ([`PRECHECK_PREPARATION_TIMEOUT`]).
async fn handle_precheck_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
Expand Down Expand Up @@ -459,7 +461,7 @@ async fn handle_precheck_pvf(
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf,
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
},
)
.await?;
Expand All @@ -469,9 +471,10 @@ async fn handle_precheck_pvf(

/// Handles PVF execution.
///
/// This will first try to prepare the PVF, if a prepared artifact does not already exist. If there is already a
/// preparation job, we coalesce the two preparation jobs. When preparing for execution, we use a more lenient timeout
/// ([`EXECUTE_COMPILATION_TIMEOUT`]) than when prechecking.
/// This will first try to prepare the PVF, if a prepared artifact does not already exist. If there
/// is already a preparation job, we coalesce the two preparation jobs. When preparing for
/// execution, we use a more lenient timeout ([`LENIENT_PREPARATION_TIMEOUT`]) than when
/// prechecking.
async fn handle_execute_pvf(
cache_path: &Path,
artifacts: &mut Artifacts,
Expand Down Expand Up @@ -518,7 +521,7 @@ async fn handle_execute_pvf(
prepare::ToQueue::Enqueue {
priority,
pvf,
compilation_timeout: EXECUTE_COMPILATION_TIMEOUT,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
)
.await?;
Expand Down Expand Up @@ -557,7 +560,7 @@ async fn handle_heads_up(
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf: active_pvf,
compilation_timeout: EXECUTE_COMPILATION_TIMEOUT,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
)
.await?;
Expand Down
9 changes: 6 additions & 3 deletions node/core/pvf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ impl metrics::Metrics for Metrics {
"Time spent in preparing PVF artifacts in seconds",
)
.buckets(vec![
// This is synchronized with the PRECHECK_COMPILATION_TIMEOUT=60s
// and EXECUTE_COMPILATION_TIMEOUT=180s constants found in
// This is synchronized with the PRECHECK_PREPARATION_TIMEOUT=60s
// and LENIENT_PREPARATION_TIMEOUT=360s constants found in
// src/prepare/worker.rs
0.1,
0.5,
Expand All @@ -167,7 +167,10 @@ impl metrics::Metrics for Metrics {
20.0,
30.0,
60.0,
180.0,
120.0,
240.0,
360.0,
480.0,
]),
)?,
registry,
Expand Down
10 changes: 5 additions & 5 deletions node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub enum ToPool {
worker: Worker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
compilation_timeout: Duration,
preparation_timeout: Duration,
},
}

Expand Down Expand Up @@ -210,7 +210,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path, compilation_timeout } => {
ToPool::StartWork { worker, code, artifact_path, preparation_timeout } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
Expand All @@ -221,7 +221,7 @@ fn handle_to_pool(
code,
cache_path.to_owned(),
artifact_path,
compilation_timeout,
preparation_timeout,
preparation_timer,
)
.boxed(),
Expand Down Expand Up @@ -269,11 +269,11 @@ async fn start_work_task<Timer>(
code: Arc<Vec<u8>>,
cache_path: PathBuf,
artifact_path: PathBuf,
compilation_timeout: Duration,
preparation_timeout: Duration,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome =
worker::start_work(idle, code, &cache_path, artifact_path, compilation_timeout).await;
worker::start_work(idle, code, &cache_path, artifact_path, preparation_timeout).await;
PoolEvent::StartWork(worker, outcome)
}

Expand Down
48 changes: 24 additions & 24 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub enum ToQueue {
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue`] response.
Enqueue { priority: Priority, pvf: Pvf, compilation_timeout: Duration },
Enqueue { priority: Priority, pvf: Pvf, preparation_timeout: Duration },
}

/// A response from queue.
Expand Down Expand Up @@ -80,7 +80,7 @@ struct JobData {
priority: Priority,
pvf: Pvf,
/// The timeout for the preparation job.
compilation_timeout: Duration,
preparation_timeout: Duration,
worker: Option<Worker>,
}

Expand Down Expand Up @@ -208,8 +208,8 @@ impl Queue {

async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> {
match to_queue {
ToQueue::Enqueue { priority, pvf, compilation_timeout } => {
handle_enqueue(queue, priority, pvf, compilation_timeout).await?;
ToQueue::Enqueue { priority, pvf, preparation_timeout } => {
handle_enqueue(queue, priority, pvf, preparation_timeout).await?;
},
}
Ok(())
Expand All @@ -219,13 +219,13 @@ async fn handle_enqueue(
queue: &mut Queue,
priority: Priority,
pvf: Pvf,
compilation_timeout: Duration,
preparation_timeout: Duration,
) -> Result<(), Fatal> {
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?pvf.code_hash,
?priority,
?compilation_timeout,
?preparation_timeout,
"PVF is enqueued for preparation.",
);
queue.metrics.prepare_enqueued();
Expand All @@ -247,7 +247,7 @@ async fn handle_enqueue(
return Ok(())
}

let job = queue.jobs.insert(JobData { priority, pvf, compilation_timeout, worker: None });
let job = queue.jobs.insert(JobData { priority, pvf, preparation_timeout, worker: None });
queue.artifact_id_to_job.insert(artifact_id, job);

if let Some(available) = find_idle_worker(queue) {
Expand Down Expand Up @@ -439,7 +439,7 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal
worker,
code: job_data.pvf.code.clone(),
artifact_path,
compilation_timeout: job_data.compilation_timeout,
preparation_timeout: job_data.preparation_timeout,
},
)
.await?;
Expand Down Expand Up @@ -494,7 +494,7 @@ pub fn start(
#[cfg(test)]
mod tests {
use super::*;
use crate::{error::PrepareError, host::PRECHECK_COMPILATION_TIMEOUT};
use crate::{error::PrepareError, host::PRECHECK_PREPARATION_TIMEOUT};
use assert_matches::assert_matches;
use futures::{future::BoxFuture, FutureExt};
use slotmap::SlotMap;
Expand Down Expand Up @@ -612,7 +612,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);

Expand All @@ -626,12 +626,12 @@ mod tests {
#[async_std::test]
async fn dont_spawn_over_soft_limit_unless_critical() {
let mut test = Test::new(2, 3);
let compilation_timeout = PRECHECK_COMPILATION_TIMEOUT;
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;

let priority = Priority::Normal;
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), compilation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), compilation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), compilation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout });

// Receive only two spawns.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
Expand All @@ -655,7 +655,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical,
pvf: pvf(4),
compilation_timeout,
preparation_timeout,
});

// 2 out of 2 are working, but there is a critical job incoming. That means that spawning
Expand All @@ -666,12 +666,12 @@ mod tests {
#[async_std::test]
async fn cull_unwanted() {
let mut test = Test::new(1, 2);
let compilation_timeout = PRECHECK_COMPILATION_TIMEOUT;
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;

test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
compilation_timeout,
preparation_timeout,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
let w1 = test.workers.insert(());
Expand All @@ -682,7 +682,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical,
pvf: pvf(2),
compilation_timeout,
preparation_timeout,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);

Expand All @@ -701,10 +701,10 @@ mod tests {
async fn worker_mass_die_out_doesnt_stall_queue() {
let mut test = Test::new(2, 2);

let (priority, compilation_timeout) = (Priority::Normal, PRECHECK_COMPILATION_TIMEOUT);
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), compilation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), compilation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), compilation_timeout });
let (priority, preparation_timeout) = (Priority::Normal, PRECHECK_PREPARATION_TIMEOUT);
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), preparation_timeout });

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
Expand Down Expand Up @@ -734,7 +734,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
});

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
Expand All @@ -759,7 +759,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
pvf: pvf(1),
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
});

assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
Expand Down
4 changes: 2 additions & 2 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub async fn start_work(
code: Arc<Vec<u8>>,
cache_path: &Path,
artifact_path: PathBuf,
compilation_timeout: Duration,
preparation_timeout: Duration,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;

Expand Down Expand Up @@ -100,7 +100,7 @@ pub async fn start_work(
}

let selected =
match async_std::future::timeout(compilation_timeout, framed_recv(&mut stream)).await {
match async_std::future::timeout(preparation_timeout, framed_recv(&mut stream)).await {
Ok(Ok(response_bytes)) => {
// Received bytes from worker within the time limit.
// By convention we expect encoded `PrepareResult`.
Expand Down
Loading