Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix worker event loop ref/unref + leak #4114

Merged
merged 13 commits into from
Aug 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
worker changes
  • Loading branch information
paperclover committed Aug 12, 2023
commit 8c4fece6ceb140a98849450319d35e54ec0181ea
25 changes: 25 additions & 0 deletions src/bun.js/base.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3285,6 +3285,15 @@ pub const PollRef = struct {
this.status = .inactive;
vm.uws_event_loop.?.unref();
}

/// From another thread, Prevent a poll from keeping the process alive.
pub fn unrefConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .active)
return;
this.status = .inactive;
vm.uws_event_loop.?.unrefConcurrently();
}

/// Prevent a poll from keeping the process alive on the next tick.
pub fn unrefOnNextTick(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .active)
Expand All @@ -3293,13 +3302,29 @@ pub const PollRef = struct {
vm.pending_unref_counter +|= 1;
}

/// From another thread, prevent a poll from keeping the process alive on the next tick.
pub fn unrefOnNextTickConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .active)
return;
this.status = .inactive;
_ = @atomicRmw(@TypeOf(vm.pending_unref_counter), &vm.pending_unref_counter, .Add, 1, .Monotonic);
}

/// Allow a poll to keep the process alive.
pub fn ref(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .inactive)
return;
this.status = .active;
vm.uws_event_loop.?.ref();
}

/// Allow a poll to keep the process alive.
pub fn refConcurrently(this: *PollRef, vm: *JSC.VirtualMachine) void {
if (this.status != .inactive)
return;
this.status = .active;
vm.uws_event_loop.?.refConcurrently();
}
};

const KQueueGenerationNumber = if (Environment.isMac and Environment.allow_assert) usize else u0;
Expand Down
2 changes: 1 addition & 1 deletion src/bun.js/bindings/webcore/JSWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSWorkerDOMConstructor::const
EnsureStillAliveScope argument1 = callFrame->argument(1);

auto options = WorkerOptions {};
options.bun.unref = false;
options.bun.unref = true;

if (JSObject* optionsObject = JSC::jsDynamicCast<JSC::JSObject*>(argument1.value())) {
if (auto nameValue = optionsObject->getIfPropertyExists(lexicalGlobalObject, Identifier::fromString(vm, "name"_s))) {
Expand Down
10 changes: 5 additions & 5 deletions src/bun.js/module_loader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2021,11 +2021,11 @@ pub const ModuleLoader = struct {
} else if (HardcodedModule.Map.getWithEql(specifier, bun.String.eqlComptime)) |hardcoded| {
switch (hardcoded) {
.@"bun:main" => {
defer {
if (jsc_vm.worker) |worker| {
worker.queueInitialTask();
}
}
// defer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker.queueInitialTask was removing an eventloop ref way too early. since the ref was now based on the worker's event loop, the initial task to unref isn't needed i think. did not mean to leave as commented code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code here was removed instead of commented

// if (jsc_vm.worker) |worker| {
// worker.queueInitialTask();
// }
// }

return ResolvedSource{
.allocator = null,
Expand Down
83 changes: 31 additions & 52 deletions src/bun.js/web_worker.zig
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ pub const WebWorker = struct {
cpp_worker: *anyopaque,
allowed_to_exit: bool = false,
mini: bool = false,
parent_poll_ref: JSC.PollRef = .{},
initial_poll_ref: JSC.PollRef = .{},
did_send_initial_task: bool = false,
user_poll_ref: JSC.PollRef = .{},
eventloop_poll_ref: JSC.PollRef = .{},
paperclover marked this conversation as resolved.
Show resolved Hide resolved

extern fn WebWorker__dispatchExit(?*JSC.JSGlobalObject, *anyopaque, i32) void;
extern fn WebWorker__dispatchOnline(this: *anyopaque, *JSC.JSGlobalObject) void;
Expand All @@ -43,9 +42,6 @@ pub const WebWorker = struct {
.{worker},
) catch {
worker.deinit();
worker.parent_poll_ref.unref(worker.parent);
worker.initial_poll_ref.unref(worker.parent);
bun.default_allocator.destroy(worker);
return false;
};
thread.detach();
Expand Down Expand Up @@ -80,6 +76,7 @@ pub const WebWorker = struct {
default_unref: bool,
) callconv(.C) ?*WebWorker {
JSC.markBinding(@src());
log("init WebWorker id={d}", .{this_context_id});
var spec_slice = specifier_str.toUTF8(bun.default_allocator);
defer spec_slice.deinit();
var prev_log = parent.bundler.log;
Expand Down Expand Up @@ -117,33 +114,16 @@ pub const WebWorker = struct {
},
};

worker.initial_poll_ref.ref(parent);
worker.eventloop_poll_ref.ref(parent);

if (!default_unref) {
worker.allowed_to_exit = false;
worker.parent_poll_ref.ref(parent);
worker.user_poll_ref.ref(parent);
}

return worker;
}

pub fn queueInitialTask(this: *WebWorker) void {
if (this.did_send_initial_task) return;
this.did_send_initial_task = true;

const Unref = struct {
pub fn unref(worker: *WebWorker) void {
worker.initial_poll_ref.unref(worker.parent);
}
};

const AnyTask = JSC.AnyTask.New(WebWorker, Unref.unref);
var any_task = bun.default_allocator.create(JSC.AnyTask) catch @panic("OOM");
any_task.* = AnyTask.init(this);
var concurrent_task = bun.default_allocator.create(JSC.ConcurrentTask) catch @panic("OOM");
this.parent.eventLoop().enqueueTaskConcurrent(concurrent_task.from(any_task, .auto_deinit));
}

pub fn startWithErrorHandling(
this: *WebWorker,
) void {
Expand All @@ -162,7 +142,6 @@ pub const WebWorker = struct {
}

if (this.requested_terminate) {
this.queueInitialTask();
this.deinit();
return;
}
Expand Down Expand Up @@ -206,7 +185,11 @@ pub const WebWorker = struct {
}

fn deinit(this: *WebWorker) void {
log("deinit WebWorker id={d}", .{this.execution_context_id});
this.user_poll_ref.unref(this.parent);
this.eventloop_poll_ref.unref(this.parent);
bun.default_allocator.free(this.specifier);
// bun.default_allocator.destroy(this);
}

fn flushLogs(this: *WebWorker) void {
Expand Down Expand Up @@ -269,16 +252,16 @@ pub const WebWorker = struct {
var promise = vm.loadEntryPointForWebWorker(this.specifier) catch {
this.flushLogs();
this.onTerminate();
this.deinit();
return;
};

this.queueInitialTask();

if (promise.status(vm.global.vm()) == .Rejected) {
vm.onUnhandledError(vm.global, promise.result(vm.global.vm()));

vm.exit_handler.exit_code = 1;
this.onTerminate();
this.deinit();

return;
}
Expand All @@ -303,30 +286,29 @@ pub const WebWorker = struct {
// always doing a first tick so we call CppTask without delay after dispatchOnline
vm.tick();

{
while (true) {
while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0) {
vm.tick();

vm.eventLoop().autoTickActive();
while (true) {
while (vm.eventLoop().tasks.count > 0 or vm.active_tasks > 0 or vm.uws_event_loop.?.active > 0) {
vm.tick();
// When the worker is done, the VM is cleared, but we don't free
// the entire worker object, because this loop is still active and UAF will happen.
if (this.vm == null) {
this.deinit();
return;
}

if (!this.allowed_to_exit) {
this.flushLogs();
vm.eventLoop().tickPossiblyForever();
continue;
}

vm.onBeforeExit();

if (!this.allowed_to_exit)
continue;
vm.eventLoop().autoTickActive();
}

break;
if (this.vm == null) {
this.deinit();
return;
}

this.flushLogs();
this.onTerminate();
this.eventloop_poll_ref.unrefConcurrently(this.parent);
vm.eventLoop().tickPossiblyForever();
this.parent.eventLoop().wakeup();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a race condition here where the parent event loop is unref'd

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the case where the parent event loop is not the main thread

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i did not think about workers starting other workers

this.eventloop_poll_ref.refConcurrently(this.parent);
}
}

Expand All @@ -346,15 +328,15 @@ pub const WebWorker = struct {

pub fn setRef(this: *WebWorker, value: bool) callconv(.C) void {
if (this.requested_terminate and !value) {
this.parent_poll_ref.unref(this.parent);
this.user_poll_ref.unref(this.parent);
return;
}

this.allowed_to_exit = !value;
if (this.allowed_to_exit) {
this.parent_poll_ref.unref(this.parent);
this.user_poll_ref.unref(this.parent);
} else {
this.parent_poll_ref.ref(this.parent);
this.user_poll_ref.ref(this.parent);
}

if (this.vm) |vm| {
Expand All @@ -364,7 +346,6 @@ pub const WebWorker = struct {

fn onTerminate(this: *WebWorker) void {
log("onTerminate", .{});

this.reallyExit();
}

Expand Down Expand Up @@ -397,8 +378,6 @@ pub const WebWorker = struct {
this.arena.deinit();
}
WebWorker__dispatchExit(globalObject, cpp_worker, exit_code);

this.deinit();
}

fn requestTerminate(this: *WebWorker) bool {
Expand Down
5 changes: 2 additions & 3 deletions src/js/node/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,8 @@ class Worker extends EventEmitter {
this.emit("exit", e.code);
}

#onError(event: ErrorEvent) {
// TODO: is this right?
this.emit("error", event.error);
#onError(error: ErrorEvent) {
this.emit("error", error);
}

#onMessage(event: MessageEvent) {
Expand Down
Loading