Skip to content

Commit

Permalink
fix: beforeunload event didn't trigger on the early drop state (#455)
Browse files Browse the repository at this point in the history
* fix: `beforeunload` event didn't trigger on the early drop state

* stamp: polishing

* stamp: rid comments
  • Loading branch information
nyannyacha authored Nov 29, 2024
1 parent 18d8f69 commit 5707665
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 71 deletions.
1 change: 1 addition & 0 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ pub enum WillTerminateReason {
CPU,
Memory,
WallClock,
EarlyDrop,
}

pub struct DenoRuntime<RuntimeContext = DefaultRuntimeContext> {
Expand Down
89 changes: 54 additions & 35 deletions crates/base/src/rt_worker/supervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,6 @@ use crate::{

use super::{worker_ctx::TerminationToken, worker_pool::SupervisorPolicy};

#[repr(C)]
pub struct V8HandleTerminationData {
pub should_terminate: bool,
pub isolate_memory_usage_tx: Option<oneshot::Sender<IsolateMemoryStats>>,
}

pub extern "C" fn v8_handle_termination(isolate: &mut v8::Isolate, data: *mut std::ffi::c_void) {
let mut boxed_data: Box<V8HandleTerminationData>;

unsafe {
boxed_data = Box::from_raw(data as *mut V8HandleTerminationData);
}

// log memory usage
let mut heap_stats = v8::HeapStatistics::default();

isolate.get_heap_statistics(&mut heap_stats);

let usage = IsolateMemoryStats {
used_heap_size: heap_stats.used_heap_size(),
external_memory: heap_stats.external_memory(),
};

if let Some(usage_tx) = boxed_data.isolate_memory_usage_tx.take() {
if usage_tx.send(usage).is_err() {
error!("failed to send isolate memory usage - receiver may have been dropped");
}
}

if boxed_data.should_terminate {
isolate.terminate_execution();
}
}

#[repr(C)]
pub struct IsolateMemoryStats {
pub used_heap_size: usize,
Expand Down Expand Up @@ -172,20 +138,73 @@ async fn create_wall_clock_beforeunload_alert(wall_clock_limit_ms: u64, pct: Opt
}
}

#[repr(C)]
pub struct V8HandleTerminationData {
pub should_terminate: bool,
pub isolate_memory_usage_tx: Option<oneshot::Sender<IsolateMemoryStats>>,
}

pub extern "C" fn v8_handle_termination(isolate: &mut v8::Isolate, data: *mut std::ffi::c_void) {
let mut data = unsafe { Box::from_raw(data as *mut V8HandleTerminationData) };

// log memory usage
let mut heap_stats = v8::HeapStatistics::default();

isolate.get_heap_statistics(&mut heap_stats);

let usage = IsolateMemoryStats {
used_heap_size: heap_stats.used_heap_size(),
external_memory: heap_stats.external_memory(),
};

if let Some(usage_tx) = data.isolate_memory_usage_tx.take() {
if usage_tx.send(usage).is_err() {
error!("failed to send isolate memory usage - receiver may have been dropped");
}
}

if data.should_terminate {
isolate.terminate_execution();
}
}

extern "C" fn v8_handle_wall_clock_beforeunload(
isolate: &mut v8::Isolate,
_data: *mut std::ffi::c_void,
) {
if let Err(err) = MaybeDenoRuntime::<()>::Isolate(isolate)
.dispatch_beforeunload_event(WillTerminateReason::WallClock)
{
warn!(
error!(
"found an error while dispatching the beforeunload event: {}",
err
);
}
}

#[repr(C)]
pub struct V8HandleEarlyRetireData {
token: CancellationToken,
}

extern "C" fn v8_handle_early_drop_beforeunload(
isolate: &mut v8::Isolate,
data: *mut std::ffi::c_void,
) {
let data = unsafe { Box::from_raw(data as *mut V8HandleEarlyRetireData) };

if let Err(err) = MaybeDenoRuntime::<()>::Isolate(isolate)
.dispatch_beforeunload_event(WillTerminateReason::EarlyDrop)
{
error!(
"found an error while dispatching the beforeunload event: {}",
err
);
} else {
data.token.cancel();
}
}

#[instrument(level = "debug", skip_all)]
extern "C" fn v8_handle_early_retire(isolate: &mut v8::Isolate, _data: *mut std::ffi::c_void) {
isolate.low_memory_notification();
Expand Down
108 changes: 73 additions & 35 deletions crates/base/src/rt_worker/supervisor/strategy_per_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use std::{future::pending, sync::atomic::Ordering, time::Duration};
use std::thread::ThreadId;

use event_worker::events::ShutdownReason;
use log::error;
use log::{error, info};
use sb_workers::context::{Timing, TimingStatus, UserWorkerMsgs};
use tokio_util::sync::CancellationToken;

use crate::rt_worker::supervisor::{
create_wall_clock_beforeunload_alert, v8_handle_early_retire,
v8_handle_wall_clock_beforeunload, wait_cpu_alarm, CPUUsage, Tokens,
create_wall_clock_beforeunload_alert, v8_handle_early_drop_beforeunload,
v8_handle_early_retire, v8_handle_wall_clock_beforeunload, wait_cpu_alarm, CPUUsage, Tokens,
V8HandleEarlyRetireData,
};

use super::{v8_handle_termination, Arguments, CPUUsageMetrics, V8HandleTerminationData};
Expand Down Expand Up @@ -57,12 +59,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
let mut is_worker_entered = false;
let mut is_wall_clock_beforeunload_armed = false;
let mut is_cpu_time_soft_limit_reached = false;
let mut is_termination_requested = false;
let mut is_waiting_for_termination = false;
let mut have_all_reqs_been_acknowledged = false;

let mut cpu_usage_metrics_rx = cpu_usage_metrics_rx.unwrap();
let mut cpu_usage_ms = 0i64;

let mut complete_reason = None::<ShutdownReason>;
let mut wall_clock_alerts = 0;
let mut req_ack_count = 0usize;

Expand Down Expand Up @@ -97,6 +100,25 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
guard.raise();
};

let early_drop_token = CancellationToken::new();
let early_drop_fut = early_drop_token.cancelled();

let mut dispatch_early_drop_beforeunload_fn = Some({
let token = early_drop_token.clone();
|| {
let data_ptr_mut = Box::into_raw(Box::new(V8HandleEarlyRetireData { token }));

if !thread_safe_handle.request_interrupt(
v8_handle_early_drop_beforeunload,
data_ptr_mut as *mut std::ffi::c_void,
) {
drop(unsafe { Box::from_raw(data_ptr_mut) });
} else {
waker.wake();
}
}
});

let terminate_fn = {
let thread_safe_handle = thread_safe_handle.clone();
move || {
Expand All @@ -115,23 +137,25 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {

tokio::pin!(wall_clock_duration_alert);
tokio::pin!(wall_clock_beforeunload_alert);
tokio::pin!(early_drop_fut);

let result = 'scope: loop {
loop {
tokio::select! {
_ = supervise.cancelled() => {
break 'scope (ShutdownReason::TerminationRequested, cpu_usage_ms);
complete_reason = Some(ShutdownReason::TerminationRequested);
}

_ = async {
match termination.as_ref() {
Some(token) => token.inbound.cancelled().await,
None => pending().await,
}
}, if !is_termination_requested => {
is_termination_requested = true;
}, if !is_waiting_for_termination => {
is_waiting_for_termination = true;
if promise_metrics.have_all_promises_been_resolved() {
terminate_fn();
break 'scope (ShutdownReason::TerminationRequested, cpu_usage_ms);
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
func();
}
}
}

Expand Down Expand Up @@ -164,9 +188,8 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {

if !cpu_timer_param.is_disabled() {
if cpu_usage_ms >= hard_limit_ms as i64 {
terminate_fn();
error!("CPU time hard limit reached: isolate: {:?}", key);
break 'scope (ShutdownReason::CPUTime, cpu_usage_ms);
complete_reason = Some(ShutdownReason::CPUTime);
} else if cpu_usage_ms >= soft_limit_ms as i64 && !is_cpu_time_soft_limit_reached {
early_retire_fn();
error!("CPU time soft limit reached: isolate: {:?}", key);
Expand All @@ -177,17 +200,18 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
if have_all_reqs_been_acknowledged
&& promise_metrics.have_all_promises_been_resolved()
{
terminate_fn();
error!("early termination due to the last request being completed: isolate: {:?}", key);
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
func();
}
}

} else if is_cpu_time_soft_limit_reached
&& have_all_reqs_been_acknowledged
&& promise_metrics.have_all_promises_been_resolved()
{
terminate_fn();
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
func();
}
}
}
}
Expand All @@ -206,14 +230,13 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
if have_all_reqs_been_acknowledged
&& promise_metrics.have_all_promises_been_resolved()
{
terminate_fn();
error!("early termination due to the last request being completed: isolate: {:?}", key);
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
func();
}
}
} else {
terminate_fn();
error!("CPU time hard limit reached: isolate: {:?}", key);
break 'scope (ShutdownReason::CPUTime, cpu_usage_ms);
complete_reason = Some(ShutdownReason::CPUTime);
}
}
}
Expand All @@ -237,9 +260,9 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
continue;
}

terminate_fn();
error!("early termination due to the last request being completed: isolate: {:?}", key);
break 'scope (ShutdownReason::EarlyDrop, cpu_usage_ms);
if let Some(func) = dispatch_early_drop_beforeunload_fn.take() {
func();
}
}

_ = wall_clock_duration_alert.tick(), if !is_wall_clock_limit_disabled => {
Expand All @@ -253,9 +276,8 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
} else {
let is_in_flight_req_exists = req_ack_count != demand.load(Ordering::Acquire);

terminate_fn();
error!("wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists);
break 'scope (ShutdownReason::WallClockTime, cpu_usage_ms);
complete_reason = Some(ShutdownReason::WallClockTime);
}
}

Expand All @@ -273,18 +295,34 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
}

Some(_) = memory_limit_rx.recv() => {
terminate_fn();
error!("memory limit reached for the worker: isolate: {:?}", key);
break 'scope (ShutdownReason::Memory, cpu_usage_ms);
complete_reason = Some(ShutdownReason::Memory);
}
}
};

match result {
(ShutdownReason::EarlyDrop, cpu_usage_ms) if is_termination_requested => {
(ShutdownReason::TerminationRequested, cpu_usage_ms)
_ = &mut early_drop_fut => {
info!("early termination has been triggered: isolate: {:?}", key);
complete_reason = Some(ShutdownReason::EarlyDrop);
}
}

result => result,
match complete_reason.take() {
Some(ShutdownReason::EarlyDrop) => {
terminate_fn();
return (
if is_waiting_for_termination {
ShutdownReason::TerminationRequested
} else {
ShutdownReason::EarlyDrop
},
cpu_usage_ms,
);
}

Some(result) => {
terminate_fn();
return (result, cpu_usage_ms);
}
None => continue,
}
}
}
1 change: 0 additions & 1 deletion crates/base/test_cases/main/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ Deno.serve((req: Request) => {
const workerTimeoutMs = parseIntFromHeadersOrDefault(req, "x-worker-timeout-ms", 10 * 60 * 1000);
const cpuTimeSoftLimitMs = parseIntFromHeadersOrDefault(req, "x-cpu-time-soft-limit-ms", 10 * 60 * 1000);
const cpuTimeHardLimitMs = parseIntFromHeadersOrDefault(req, "x-cpu-time-hard-limit-ms", 10 * 60 * 1000);
console.log(cpuTimeSoftLimitMs);
const noModuleCache = false;
const importMapPath = null;
const envVarsObj = Deno.env.toObject();
Expand Down

0 comments on commit 5707665

Please sign in to comment.