Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Use CPU clock timeout for PVF jobs #6282

Merged
merged 23 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
aad7de7
Put in skeleton logic for CPU-time-preparation
mrcnski Nov 10, 2022
c62a8e1
Continue filling in logic for prepare worker CPU time changes
mrcnski Nov 13, 2022
9c68422
Fix compiler errors
mrcnski Nov 13, 2022
9fac3a4
Update lenience factor
mrcnski Nov 14, 2022
9f67150
Fix some clippy lints for PVF module
mrcnski Nov 14, 2022
e34825a
Fix compilation errors
mrcnski Nov 15, 2022
fe139e1
Address some review comments
mrcnski Nov 16, 2022
7b7886c
Add logging
mrcnski Nov 16, 2022
6006fe0
Add another log
mrcnski Nov 16, 2022
ebc9825
Merge branch 'master' into m-cat/pvf-preparation-cpu-time
mrcnski Nov 20, 2022
80d2cf6
Address some review comments; change Mutex to AtomicBool
mrcnski Nov 20, 2022
8e5139f
Refactor handling response bytes
mrcnski Nov 20, 2022
d8add29
Add CPU clock timeout logic for execute jobs
mrcnski Nov 20, 2022
c50a08f
Properly handle AtomicBool flag
mrcnski Nov 20, 2022
bc6172e
Use `Ordering::Relaxed`
mrcnski Nov 20, 2022
eb9372c
Refactor thread coordination logic
mrcnski Nov 21, 2022
6576ea1
Fix bug
mrcnski Nov 22, 2022
ec60713
Add some timing information to execute tests
mrcnski Nov 22, 2022
efd50fe
Merge branch 'master' into m-cat/pvf-preparation-cpu-time
mrcnski Nov 23, 2022
09560ee
Add section about the mitigation to the IG
mrcnski Nov 23, 2022
7c859d6
minor: Change more `Ordering`s to `Relaxed`
mrcnski Nov 25, 2022
fb6c5e9
candidate-validation: Fix build errors
mrcnski Nov 28, 2022
3e98ee6
Merge branch 'master' into m-cat/pvf-preparation-cpu-time
mrcnski Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix compilation errors
  • Loading branch information
mrcnski committed Nov 15, 2022
commit e34825aa290d2b05f51f62c3afc6f4c7f95f4bcc
9 changes: 7 additions & 2 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,16 @@ impl Artifacts {
/// This function must be used only for brand-new artifacts and should never be used for
/// replacing existing ones.
#[cfg(test)]
pub fn insert_prepared(&mut self, artifact_id: ArtifactId, last_time_needed: SystemTime) {
pub fn insert_prepared(
&mut self,
artifact_id: ArtifactId,
last_time_needed: SystemTime,
cpu_time_elapsed: Duration,
) {
// See the precondition.
always!(self
.artifacts
.insert(artifact_id, ArtifactState::Prepared { last_time_needed })
.insert(artifact_id, ArtifactState::Prepared { last_time_needed, cpu_time_elapsed })
.is_none());
}

Expand Down
33 changes: 24 additions & 9 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,8 +1017,8 @@ mod tests {
let mut builder = Builder::default();
builder.cleanup_pulse_interval = Duration::from_millis(100);
builder.artifact_ttl = Duration::from_millis(500);
builder.artifacts.insert_prepared(artifact_id(1), mock_now);
builder.artifacts.insert_prepared(artifact_id(2), mock_now);
builder.artifacts.insert_prepared(artifact_id(1), mock_now, Duration::default());
builder.artifacts.insert_prepared(artifact_id(2), mock_now, Duration::default());
let mut test = builder.build();
let mut host = test.host_handle();

Expand Down Expand Up @@ -1088,7 +1088,10 @@ mod tests {
);

test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
})
.await
.unwrap();
let result_tx_pvf_1_1 = assert_matches!(
Expand All @@ -1101,7 +1104,10 @@ mod tests {
);

test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) })
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(Duration::default()),
})
.await
.unwrap();
let result_tx_pvf_2 = assert_matches!(
Expand Down Expand Up @@ -1150,13 +1156,16 @@ mod tests {
);
// Send `Ok` right away and poll the host.
test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
})
.await
.unwrap();
// No pending execute requests.
test.poll_ensure_to_execute_queue_is_empty().await;
// Received the precheck result.
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(()));
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));

// Send multiple requests for the same PVF.
let mut precheck_receivers = Vec::new();
Expand Down Expand Up @@ -1254,7 +1263,10 @@ mod tests {
prepare::ToQueue::Enqueue { .. }
);
test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) })
.send(prepare::FromQueue {
artifact_id: artifact_id(2),
result: Ok(Duration::default()),
})
.await
.unwrap();
// The execute queue receives new request, preckecking is finished and we can
Expand All @@ -1264,7 +1276,7 @@ mod tests {
execute::ToQueue::Enqueue { .. }
);
for result_rx in precheck_receivers {
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(()));
assert_matches!(result_rx.now_or_never().unwrap().unwrap(), Ok(_));
}
}

Expand Down Expand Up @@ -1512,7 +1524,10 @@ mod tests {
);

test.from_prepare_queue_tx
.send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.send(prepare::FromQueue {
artifact_id: artifact_id(1),
result: Ok(Duration::default()),
})
.await
.unwrap();

Expand Down
24 changes: 20 additions & 4 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,11 @@ mod tests {

let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));
test.send_from_pool(pool::FromPool::Concluded { worker: w, rip: false, result: Ok(()) });
test.send_from_pool(pool::FromPool::Concluded {
worker: w,
rip: false,
result: Ok(Duration::default()),
});

assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
}
Expand Down Expand Up @@ -645,7 +649,11 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });

test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) });
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: false,
result: Ok(Duration::default()),
});

assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });

Expand Down Expand Up @@ -691,7 +699,11 @@ mod tests {
// That's a bit silly in this context, but in production there will be an entire pool up
// to the `soft_capacity` of workers and it doesn't matter which one to cull. Either way,
// we just check that edge case of an edge case works.
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) });
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: false,
result: Ok(Duration::default()),
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}

Expand All @@ -717,7 +729,11 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });

// Conclude worker 1 and rip it.
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) });
test.send_from_pool(pool::FromPool::Concluded {
worker: w1,
rip: true,
result: Ok(Duration::default()),
});

// Since there is still work, the queue requested one extra worker to spawn to handle the
// remaining enqueued work items.
Expand Down
2 changes: 1 addition & 1 deletion node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ pub fn worker_entrypoint(socket_path: &str) {
"worker: preparing artifact",
);

// Create a static Mutex. We lock it when either thread finishes and set a flag.
// Create a shared Mutex. We lock it when either thread finishes and set the flag.
let mutex = Arc::new(Mutex::new(false));

let cpu_time_start = ProcessTime::now();
Expand Down