Skip to content

Commit

Permalink
new waking mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
kprotty committed Feb 19, 2021
1 parent 07f691c commit ec9bc14
Showing 1 changed file with 49 additions and 58 deletions.
107 changes: 49 additions & 58 deletions src/runtime/Pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -140,32 +140,29 @@ pub const Runnable = struct {
const Counter = struct {
idle: u16 = 0,
spawned: u16 = 0,
notified: bool = false,
state: State = .pending,
waking: bool = false,
polling: bool = false,

const State = enum(u2) {
pending = 0,
waking,
notified,
signalled,
shutdown,
};

fn pack(self: Counter) u32 {
return ((@as(u32, @boolToInt(self.waking)) << 0) |
(@as(u32, @boolToInt(self.polling)) << 1) |
(@as(u32, @enumToInt(self.state)) << 2) |
(@as(u32, @intCast(u14, self.idle)) << 4) |
(@as(u32, @intCast(u14, self.spawned)) << 18));
return ((@as(u32, @boolToInt(self.notified)) << 0) |
(@as(u32, @enumToInt(self.state)) << 1) |
(@as(u32, @intCast(u14, self.idle)) << 3) |
(@as(u32, @intCast(u14, self.spawned)) << (3+14)));
}

fn unpack(value: u32) Counter {
return .{
.waking = (value & (1 << 0)) != 0,
.polling = (value & (1 << 1)) != 0,
.state = @intToEnum(State, @truncate(u2, value >> 2)),
.idle = @truncate(u14, value >> 4),
.spawned = @truncate(u14, value >> 18),
.notified = value & (1 << 0) != 0,
.state = @intToEnum(State, @truncate(u2, value >> 1)),
.idle = @truncate(u14, value >> 3),
.spawned = @truncate(u14, value >> (3+14)),
};
}
};
Expand All @@ -185,40 +182,37 @@ fn notifyWorkers(self: *Pool, _is_waking: bool) void {
return;
}

if (is_waking and !counter.waking)
std.debug.panic("is_waking when counter isnt", .{});
var do_wake = false;
var do_spawn = false;

var new_counter = counter;
new_counter.notified = true;
if (is_waking) {
if (attempts > 0 and did_spawn) {
new_counter.state = .notified;
new_counter.waking = true;
} else if (attempts > 0 and counter.idle > 0) {
new_counter.state = .signalled;
new_counter.waking = true;
new_counter.idle -= 1;
} else if (attempts > 0 and counter.spawned < max_threads) {
if (counter.state != .waking)
std.debug.panic("notifyWorkers(waking) when counter is not", .{});
if (attempts > 0 and counter.idle > 0) {
if (did_spawn)
new_counter.spawned -= 1;
new_counter.state = .notified;
new_counter.waking = true;
new_counter.spawned += 1;
do_wake = true;
} else if (attempts > 0 and (did_spawn or counter.spawned < max_threads)) {
if (!did_spawn)
new_counter.spawned += 1;
do_spawn = true;
} else {
new_counter.state = .notified;
new_counter.waking = false;
if (did_spawn)
new_counter.spawned -= 1;
new_counter.state = .pending;
}
} else {
if (!counter.waking and counter.idle > 0) {
new_counter.state = .signalled;
new_counter.waking = true;
new_counter.idle -= 1;
} else if (!counter.waking and counter.spawned < max_threads) {
if (counter.state == .pending and counter.idle > 0) {
new_counter.state = .notified;
new_counter.waking = true;
do_wake = true;
} else if (counter.state == .pending and counter.spawned < max_threads) {
new_counter.state = .waking;
new_counter.spawned += 1;
} else if (counter.state == .pending) {
new_counter.state = .notified;
} else {
do_spawn = true;
} else if (counter.notified) {
return;
}
}
Expand All @@ -237,17 +231,15 @@ fn notifyWorkers(self: *Pool, _is_waking: bool) void {
}

is_waking = true;
if (new_counter.state == .signalled) {
if (do_wake) {
Futex.wake(&self.counter, 1);
return;
}

did_spawn = did_spawn or (new_counter.spawned > counter.spawned);
if (did_spawn) {
if (Thread.spawn(Worker.entry, @ptrToInt(self))) {
return;
}
} else {
did_spawn = true;
if (do_spawn and Thread.spawn(Worker.entry, @ptrToInt(self))) {
return;
} else if (!do_spawn) {
return;
}

Expand All @@ -268,23 +260,22 @@ fn suspendWorker(self: *Pool, worker: *Worker) ?bool {
return null;
}

if (!is_suspended or counter.state == .signalled) {
if (counter.notified or !is_suspended) {
var new_counter = counter;
if (counter.state == .signalled) {
if (is_waking)
std.debug.panic("signalled when suspend(waking)", .{});
new_counter.state = .pending;
new_counter.waking = true;
} else if (counter.state == .notified) {
new_counter.state = .pending;
new_counter.notified = false;
if (counter.state == .notified) {
if (is_suspended)
new_counter.idle -= 1;
new_counter.state = .waking;
} else if (counter.notified) {
if (is_suspended)
new_counter.idle -= 1;
if (is_waking)
new_counter.waking = true;
new_counter.state = .waking;
} else {
if (is_suspended)
std.debug.panic("trying to suspend when already suspended", .{});
new_counter.idle += 1;
if (is_waking)
new_counter.waking = false;
new_counter.state = .pending;
new_counter.idle += 1;
}

if (@cmpxchgWeak(
Expand All @@ -300,9 +291,9 @@ fn suspendWorker(self: *Pool, worker: *Worker) ?bool {
continue;
}

if (counter.state == .signalled)
return true;
if (counter.state == .notified)
return true;
if (counter.notified)
return is_waking;

is_waking = false;
Expand Down

0 comments on commit ec9bc14

Please sign in to comment.