-
-
Save TheWaWaR/3624c5e8f92f4207613fcae7b111bf2e to your computer and use it in GitHub Desktop.
"Ghost" timer callback in macos
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const mem = std.mem; | |
const math = std.math; | |
const Allocator = mem.Allocator; | |
const assert = std.debug.assert; | |
/// Double-ended queue ported from Rust's standard library, which is provided under MIT License. | |
/// It can be found at https://github.com/rust-lang/rust/blob/master/LICENSE-MIT | |
pub fn Deque(comptime T: type) type { | |
return struct { | |
/// tail and head are pointers into the buffer. Tail always points | |
/// to the first element that could be read, Head always points | |
/// to where data should be written. | |
/// If tail == head the buffer is empty. The length of the ringbuffer | |
/// is defined as the distance between the two. | |
tail: usize, | |
head: usize, | |
/// Users should **NOT** use this field directly. | |
/// In order to access an item with an index, use `get` method. | |
/// If you want to iterate over the items, call `iterator` method to get an iterator. | |
buf: []T, | |
allocator: Allocator, | |
const Self = @This(); | |
const INITIAL_CAPACITY = 7; // 2^3 - 1 | |
const MINIMUM_CAPACITY = 1; // 2 - 1 | |
/// Creates an empty deque. | |
/// Deinitialize with `deinit`. | |
pub fn init(allocator: Allocator) Allocator.Error!Self { | |
return initCapacity(allocator, INITIAL_CAPACITY); | |
} | |
/// Creates an empty deque with space for at least `capacity` elements. | |
/// | |
/// Note that there is no guarantee that the created Deque has the specified capacity. | |
/// If it is too large, this method gives up meeting the capacity requirement. | |
/// In that case, it will instead create a Deque with the default capacity anyway. | |
/// | |
/// Deinitialize with `deinit`. | |
pub fn initCapacity(allocator: Allocator, capacity: usize) Allocator.Error!Self { | |
const effective_cap = | |
math.ceilPowerOfTwo(usize, @max(capacity +| 1, MINIMUM_CAPACITY + 1)) catch | |
math.ceilPowerOfTwoAssert(usize, INITIAL_CAPACITY + 1); | |
const buf = try allocator.alloc(T, effective_cap); | |
return Self{ | |
.tail = 0, | |
.head = 0, | |
.buf = buf, | |
.allocator = allocator, | |
}; | |
} | |
/// Release all allocated memory. | |
pub fn deinit(self: Self) void { | |
self.allocator.free(self.buf); | |
} | |
/// Returns the length of the already-allocated buffer. | |
pub fn cap(self: Self) usize { | |
return self.buf.len; | |
} | |
/// Returns the number of elements in the deque. | |
pub fn len(self: Self) usize { | |
return count(self.tail, self.head, self.cap()); | |
} | |
/// Gets the pointer to the element with the given index, if any. | |
/// Otherwise it returns `null`. | |
pub fn get(self: Self, index: usize) ?*T { | |
if (index >= self.len()) return null; | |
const idx = self.wrapAdd(self.tail, index); | |
return &self.buf[idx]; | |
} | |
/// Gets the pointer to the first element, if any. | |
pub fn front(self: Self) ?*T { | |
return self.get(0); | |
} | |
/// Gets the pointer to the last element, if any. | |
pub fn back(self: Self) ?*T { | |
const last_idx = math.sub(usize, self.len(), 1) catch return null; | |
return self.get(last_idx); | |
} | |
/// Adds the given element to the back of the deque. | |
pub fn pushBack(self: *Self, item: T) Allocator.Error!void { | |
if (self.isFull()) { | |
try self.grow(); | |
} | |
const head = self.head; | |
self.head = self.wrapAdd(self.head, 1); | |
self.buf[head] = item; | |
} | |
/// Adds the given element to the front of the deque. | |
pub fn pushFront(self: *Self, item: T) Allocator.Error!void { | |
if (self.isFull()) { | |
try self.grow(); | |
} | |
self.tail = self.wrapSub(self.tail, 1); | |
const tail = self.tail; | |
self.buf[tail] = item; | |
} | |
/// Pops and returns the last element of the deque. | |
pub fn popBack(self: *Self) ?T { | |
if (self.len() == 0) return null; | |
self.head = self.wrapSub(self.head, 1); | |
const head = self.head; | |
const item = self.buf[head]; | |
self.buf[head] = undefined; | |
return item; | |
} | |
/// Pops and returns the first element of the deque. | |
pub fn popFront(self: *Self) ?T { | |
if (self.len() == 0) return null; | |
const tail = self.tail; | |
self.tail = self.wrapAdd(self.tail, 1); | |
const item = self.buf[tail]; | |
self.buf[tail] = undefined; | |
return item; | |
} | |
/// Adds all the elements in the given slice to the back of the deque. | |
pub fn appendSlice(self: *Self, items: []const T) Allocator.Error!void { | |
for (items) |item| { | |
try self.pushBack(item); | |
} | |
} | |
/// Adds all the elements in the given slice to the front of the deque. | |
pub fn prependSlice(self: *Self, items: []const T) Allocator.Error!void { | |
if (items.len == 0) return; | |
var i: usize = items.len - 1; | |
while (true) : (i -= 1) { | |
const item = items[i]; | |
try self.pushFront(item); | |
if (i == 0) break; | |
} | |
} | |
/// Returns an iterator over the deque. | |
/// Modifying the deque may invalidate this iterator. | |
pub fn iterator(self: Self) Iterator { | |
return .{ | |
.head = self.head, | |
.tail = self.tail, | |
.ring = self.buf, | |
}; | |
} | |
pub fn format(self: *const Self, comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void { | |
try writer.writeAll("Deque("); | |
try std.fmt.format(writer, "{}", .{T}); | |
try writer.writeAll(") { .buf = ["); | |
var it = self.iterator(); | |
if (it.next()) |val| try writer.print("{any}", .{val}); | |
while (it.next()) |val| try writer.print(", {any}", .{val}); | |
try writer.writeAll("], .head = "); | |
try std.fmt.format(writer, "{}", .{self.head}); | |
try writer.writeAll(", .tail = "); | |
try std.fmt.format(writer, "{}", .{self.tail}); | |
try writer.writeAll(", .len = "); | |
try std.fmt.format(writer, "{}", .{self.len()}); | |
try writer.writeAll(" }"); | |
} | |
pub const Iterator = struct { | |
head: usize, | |
tail: usize, | |
ring: []T, | |
pub fn next(it: *Iterator) ?*T { | |
if (it.head == it.tail) return null; | |
const tail = it.tail; | |
it.tail = wrapIndex(it.tail +% 1, it.ring.len); | |
return &it.ring[tail]; | |
} | |
pub fn nextBack(it: *Iterator) ?*T { | |
if (it.head == it.tail) return null; | |
it.head = wrapIndex(it.head -% 1, it.ring.len); | |
return &it.ring[it.head]; | |
} | |
}; | |
/// Returns `true` if the buffer is at full capacity. | |
fn isFull(self: Self) bool { | |
return self.cap() - self.len() == 1; | |
} | |
fn grow(self: *Self) Allocator.Error!void { | |
assert(self.isFull()); | |
const old_cap = self.cap(); | |
// Reserve additional space to accomodate more items | |
self.buf = try self.allocator.realloc(self.buf, old_cap * 2); | |
// Update `tail` and `head` pointers accordingly | |
self.handleCapacityIncrease(old_cap); | |
assert(self.cap() >= old_cap * 2); | |
assert(!self.isFull()); | |
} | |
/// Updates `tail` and `head` values to handle the fact that we just reallocated the internal buffer. | |
fn handleCapacityIncrease(self: *Self, old_capacity: usize) void { | |
const new_capacity = self.cap(); | |
// Move the shortest contiguous section of the ring buffer. | |
// There are three cases to consider: | |
// | |
// (A) No need to update | |
// T H | |
// before: [o o o o o o o . ] | |
// | |
// after : [o o o o o o o . . . . . . . . . ] | |
// T H | |
// | |
// | |
// (B) [..H] needs to be moved | |
// H T | |
// before: [o o . o o o o o ] | |
// --- | |
// |_______________. | |
// | | |
// v | |
// --- | |
// after : [. . . o o o o o o o . . . . . . ] | |
// T H | |
// | |
// | |
// (C) [T..old_capacity] needs to be moved | |
// H T | |
// before: [o o o o o . o o ] | |
// --- | |
// |_______________. | |
// | | |
// v | |
// --- | |
// after : [o o o o o . . . . . . . . . o o ] | |
// H T | |
if (self.tail <= self.head) { | |
// (A), Nop | |
} else if (self.head < old_capacity - self.tail) { | |
// (B) | |
self.copyNonOverlapping(old_capacity, 0, self.head); | |
self.head += old_capacity; | |
assert(self.head > self.tail); | |
} else { | |
// (C) | |
const new_tail = new_capacity - (old_capacity - self.tail); | |
self.copyNonOverlapping(new_tail, self.tail, old_capacity - self.tail); | |
self.tail = new_tail; | |
assert(self.head < self.tail); | |
} | |
assert(self.head < self.cap()); | |
assert(self.tail < self.cap()); | |
} | |
fn copyNonOverlapping(self: *Self, dest: usize, src: usize, length: usize) void { | |
assert(dest + length <= self.cap()); | |
assert(src + length <= self.cap()); | |
@memcpy(self.buf[dest .. dest + length], self.buf[src .. src + length]); | |
} | |
fn wrapAdd(self: Self, idx: usize, addend: usize) usize { | |
return wrapIndex(idx +% addend, self.cap()); | |
} | |
fn wrapSub(self: Self, idx: usize, subtrahend: usize) usize { | |
return wrapIndex(idx -% subtrahend, self.cap()); | |
} | |
}; | |
} | |
fn count(tail: usize, head: usize, size: usize) usize { | |
assert(math.isPowerOfTwo(size)); | |
return (head -% tail) & (size - 1); | |
} | |
fn wrapIndex(index: usize, size: usize) usize { | |
assert(math.isPowerOfTwo(size)); | |
return index & (size - 1); | |
} | |
test "Deque works" { | |
const testing = std.testing; | |
var deque = try Deque(usize).init(testing.allocator); | |
defer deque.deinit(); | |
// empty deque | |
try testing.expectEqual(@as(usize, 0), deque.len()); | |
try testing.expect(deque.get(0) == null); | |
try testing.expect(deque.front() == null); | |
try testing.expect(deque.back() == null); | |
try testing.expect(deque.popBack() == null); | |
try testing.expect(deque.popFront() == null); | |
// pushBack | |
try deque.pushBack(101); | |
try testing.expectEqual(@as(usize, 1), deque.len()); | |
try testing.expectEqual(@as(usize, 101), deque.get(0).?.*); | |
try testing.expectEqual(@as(usize, 101), deque.front().?.*); | |
try testing.expectEqual(@as(usize, 101), deque.back().?.*); | |
// pushFront | |
try deque.pushFront(100); | |
try testing.expectEqual(@as(usize, 2), deque.len()); | |
try testing.expectEqual(@as(usize, 100), deque.get(0).?.*); | |
try testing.expectEqual(@as(usize, 100), deque.front().?.*); | |
try testing.expectEqual(@as(usize, 101), deque.get(1).?.*); | |
try testing.expectEqual(@as(usize, 101), deque.back().?.*); | |
// more items | |
{ | |
var i: usize = 99; | |
while (true) : (i -= 1) { | |
try deque.pushFront(i); | |
if (i == 0) break; | |
} | |
} | |
{ | |
var i: usize = 102; | |
while (i < 200) : (i += 1) { | |
try deque.pushBack(i); | |
} | |
} | |
try testing.expectEqual(@as(usize, 200), deque.len()); | |
{ | |
var i: usize = 0; | |
while (i < deque.len()) : (i += 1) { | |
try testing.expectEqual(i, deque.get(i).?.*); | |
} | |
} | |
{ | |
var i: usize = 0; | |
var it = deque.iterator(); | |
while (it.next()) |val| : (i += 1) { | |
try testing.expectEqual(i, val.*); | |
} | |
try testing.expectEqual(@as(usize, 200), i); | |
} | |
} | |
test "initCapacity with too large capacity" { | |
const testing = std.testing; | |
var deque = try Deque(i32).initCapacity(testing.allocator, math.maxInt(usize)); | |
defer deque.deinit(); | |
// The specified capacity `math.maxInt(usize)` was too large. | |
// Internally this is just ignored, and the default capacity is used instead. | |
try testing.expectEqual(@as(usize, 8), deque.buf.len); | |
} | |
test "appendSlice and prependSlice" { | |
const testing = std.testing; | |
var deque = try Deque(usize).init(testing.allocator); | |
defer deque.deinit(); | |
try deque.prependSlice(&[_]usize{ 1, 2, 3, 4, 5, 6 }); | |
try deque.appendSlice(&[_]usize{ 7, 8, 9 }); | |
try deque.prependSlice(&[_]usize{0}); | |
try deque.appendSlice(&[_]usize{ 10, 11, 12, 13, 14 }); | |
{ | |
var i: usize = 0; | |
while (i <= 14) : (i += 1) { | |
try testing.expectEqual(i, deque.get(i).?.*); | |
} | |
} | |
} | |
test "format" { | |
const testing = std.testing; | |
var deque = try Deque(usize).init(testing.allocator); | |
defer deque.deinit(); | |
try deque.pushBack(69); | |
try deque.pushBack(420); | |
std.debug.print("{}\n", .{deque}); | |
} | |
test "nextBack" { | |
const testing = std.testing; | |
var deque = try Deque(usize).init(testing.allocator); | |
defer deque.deinit(); | |
try deque.appendSlice(&[_]usize{ 5, 4, 3, 2, 1, 0 }); | |
{ | |
var i: usize = 0; | |
var it = deque.iterator(); | |
while (it.nextBack()) |val| : (i += 1) { | |
try testing.expectEqual(i, val.*); | |
} | |
} | |
} | |
test "code sample in README" { | |
var deque = try Deque(usize).init(std.testing.allocator); | |
defer deque.deinit(); | |
try deque.pushBack(1); | |
try deque.pushBack(2); | |
try deque.pushFront(0); | |
std.debug.assert(deque.get(0).?.* == @as(usize, 0)); | |
std.debug.assert(deque.get(1).?.* == @as(usize, 1)); | |
std.debug.assert(deque.get(2).?.* == @as(usize, 2)); | |
std.debug.assert(deque.get(3) == null); | |
var it = deque.iterator(); | |
var sum: usize = 0; | |
while (it.next()) |val| { | |
sum += val.*; | |
} | |
std.debug.assert(sum == 3); | |
std.debug.assert(deque.popFront().? == @as(usize, 0)); | |
std.debug.assert(deque.popBack().? == @as(usize, 2)); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const flags = @import("flags"); | |
const xev = @import("xev"); | |
const mqtt = @import("mqtt"); | |
const deque = @import("deque.zig"); | |
const Instant = std.time.Instant; | |
const Utf8View = std.unicode.Utf8View; | |
const mem = std.mem; | |
const net = std.net; | |
const posix = std.posix; | |
const assert = std.debug.assert; | |
const Packet = mqtt.v3.Packet; | |
const Connect = mqtt.v3.Connect; | |
pub const std_options = .{ | |
// Set the log level to info | |
.log_level = .info, | |
}; | |
pub fn main() !void { | |
var general_purpose_allocator = std.heap.GeneralPurposeAllocator(.{}){}; | |
const gpa = general_purpose_allocator.allocator(); | |
var args = try std.process.argsWithAllocator(gpa); | |
defer args.deinit(); | |
const cli = flags.parse(&args, Cli, .{}); | |
const addrs = try std.net.getAddressList(gpa, cli.host, cli.port); | |
std.log.info("Server: {s}:{}, addrs: {any}", .{ cli.host, cli.port, addrs.addrs }); | |
if (addrs.addrs.len == 0) { | |
std.log.err("Can't resolve host: {s}", .{cli.host}); | |
return; | |
} | |
const addr = addrs.addrs[0]; | |
var loop = try xev.Loop.init(.{}); | |
defer loop.deinit(); | |
for (0..cli.connections) |idx| { | |
// Create a TCP client socket | |
const client_conn = try posix.socket( | |
addr.any.family, | |
posix.SOCK.NONBLOCK | posix.SOCK.STREAM | posix.SOCK.CLOEXEC, | |
0, | |
); | |
errdefer posix.close(client_conn); | |
std.log.info("[{}] {} connect to {any}", .{ idx, client_conn, addr }); | |
const conn = Connection.init(gpa, idx, client_conn, cli.keep_alive); | |
// Accept | |
conn.ctrl_comp = .{ | |
.op = .{ .connect = .{ .socket = client_conn, .addr = addr } }, | |
.userdata = conn, | |
.callback = connectCallback, | |
}; | |
loop.add(&conn.ctrl_comp); | |
for (0..50) |_| { | |
loop.run(.once) catch unreachable; | |
if (conn.connected) { | |
std.log.info("[{}] {} connected", .{ idx, client_conn }); | |
break; | |
} | |
} | |
} | |
// Run the loop until there are no more completions. | |
try loop.run(.until_done); | |
} | |
const Cli = struct { | |
pub const name = "mqtt-bench"; | |
pub const help = | |
\\A MQTT benchmark tool written in Zig. | |
\\Based on io_uring, it is blazing fast! | |
; | |
host: []const u8 = "localhost", | |
port: u16 = 1883, | |
keep_alive: u16 = 5, | |
connections: u32 = 1000, | |
pub const descriptions = .{ | |
.host = "Host address", | |
.port = "Host port", | |
.keep_alive = "Keep alive seconds", | |
.connections = "The number of connections", | |
}; | |
pub const switches = .{ | |
.host = 'H', | |
.keep_alive = 'k', | |
.connections = 'c', | |
}; | |
}; | |
const PacketQueue = deque.Deque(Packet); | |
const Connection = struct { | |
id: usize, | |
fd: posix.socket_t, | |
last_ping: i64 = 0, | |
closing: bool = false, | |
connected: bool = false, | |
keep_alive: u16, | |
client_id: [64]u8 = undefined, | |
read_buf: [256]u8 = undefined, | |
write_buf: [256]u8 = undefined, | |
write_len: usize = 0, | |
// TODO: sizeof(xev.Completion) == 256 | |
read_comp: xev.Completion = .{}, | |
write_comp: xev.Completion = .{}, | |
keep_alive_comp: xev.Completion = .{}, | |
ctrl_comp: xev.Completion = .{}, | |
pending_packets: PacketQueue, | |
allocator: mem.Allocator, | |
pub fn init( | |
allocator: mem.Allocator, | |
id: usize, | |
new_fd: posix.socket_t, | |
keep_alive: u16, | |
) *Connection { | |
const conn = allocator.create(Connection) catch unreachable; | |
conn.* = .{ | |
.id = id, | |
.fd = new_fd, | |
.last_ping = std.time.milliTimestamp(), | |
.keep_alive = keep_alive, | |
.pending_packets = PacketQueue.init(allocator) catch unreachable, | |
.allocator = allocator, | |
}; | |
return conn; | |
} | |
pub fn deinit(self: *Connection) void { | |
self.pending_packets.deinit(); | |
self.allocator.destroy(self); | |
} | |
pub fn read(self: *Connection, loop: *xev.Loop) void { | |
if (self.read_comp.flags.state == .dead) { | |
self.read_comp = .{ | |
.op = .{ | |
.recv = .{ | |
.fd = self.fd, | |
.buffer = .{ .slice = self.read_buf[0..] }, | |
}, | |
}, | |
.userdata = self, | |
.callback = recvCallback, | |
}; | |
loop.add(&self.read_comp); | |
} | |
} | |
pub fn write(self: *Connection, loop: *xev.Loop, pkt: Packet) void { | |
if (self.write_comp.flags.state != .dead or self.write_len > 0) { | |
std.log.info("add to pending: {any}, {}/{}", .{ pkt, self.write_comp.flags.state, self.write_len }); | |
self.pending_packets.pushBack(pkt) catch unreachable; | |
} else { | |
std.log.debug("write: {any}", .{pkt}); | |
var len: usize = 0; | |
pkt.encode(self.write_buf[0..], &len) catch unreachable; | |
self.write_comp = .{ | |
.op = .{ | |
.send = .{ | |
.fd = self.fd, | |
.buffer = .{ .slice = self.write_buf[0..len] }, | |
}, | |
}, | |
.userdata = self, | |
.callback = sendCallback, | |
}; | |
if (len == 0) { | |
@panic("zero len"); | |
} | |
self.write_len = len; | |
loop.add(&self.write_comp); | |
} | |
} | |
pub fn close(self: *Connection, loop: *xev.Loop) void { | |
self.closing = true; | |
self.ctrl_comp = .{ | |
.op = .{ .close = .{ .fd = self.fd } }, | |
.userdata = self, | |
.callback = closeCallback, | |
}; | |
loop.add(&self.ctrl_comp); | |
} | |
}; | |
fn connectCallback( | |
ud: ?*anyopaque, | |
loop: *xev.Loop, | |
_: *xev.Completion, | |
result: xev.Result, | |
) xev.CallbackAction { | |
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?))); | |
std.log.info("[connect][{}] result: {any}", .{ conn.fd, result }); | |
const ts_ms = std.time.milliTimestamp(); | |
const client_id = std.fmt.bufPrint( | |
conn.client_id[0..], | |
"{}-{}", | |
.{ ts_ms, conn.fd }, | |
) catch unreachable; | |
const pkt = Packet{ .connect = Connect{ | |
.protocol = .V311, | |
.clean_session = true, | |
.keep_alive = conn.keep_alive, | |
.client_id = Utf8View.initUnchecked(client_id), | |
} }; | |
conn.read(loop); | |
conn.write(loop, pkt); | |
loop.timer(&conn.keep_alive_comp, conn.keep_alive * 1000, conn, keepAliveCallback); | |
return .disarm; | |
} | |
fn recvCallback( | |
ud: ?*anyopaque, | |
loop: *xev.Loop, | |
comp: *xev.Completion, | |
result: xev.Result, | |
) xev.CallbackAction { | |
std.log.debug("[ recv ] result: {any}", .{result}); | |
const recv = comp.op.recv; | |
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?))); | |
if (conn.closing) { | |
std.log.debug("[send] connection already closed", .{}); | |
return .disarm; | |
} | |
const read_len = result.recv catch { | |
std.log.debug("close conn when recv", .{}); | |
conn.close(loop); | |
return .disarm; | |
}; | |
const pkt = Packet.decode(recv.buffer.slice[0..read_len]) catch unreachable; | |
std.log.debug( | |
"Recv from {} ({} bytes): {any}, {any}", | |
.{ recv.fd, read_len, recv.buffer.slice[0..read_len], pkt }, | |
); | |
if (!conn.connected) { | |
if (pkt.connack.code != .accepted) { | |
std.log.info("[{}] Invalid connack: {}", .{ conn.fd, pkt.connack.code }); | |
conn.close(loop); | |
return .disarm; | |
} | |
conn.connected = true; | |
} | |
return .rearm; | |
} | |
fn sendCallback( | |
ud: ?*anyopaque, | |
loop: *xev.Loop, | |
comp: *xev.Completion, | |
result: xev.Result, | |
) xev.CallbackAction { | |
std.log.debug("[ send ] result: {any}", .{result}); | |
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?))); | |
if (conn.closing) { | |
std.log.debug("[send] connection already closed", .{}); | |
return .disarm; | |
} | |
const send = comp.op.send; | |
const send_len = result.send catch { | |
std.log.debug("close conn when send", .{}); | |
conn.close(loop); | |
return .disarm; | |
}; | |
if (comp.flags.state != .dead) { | |
std.log.err( | |
"[{}] Invalid state={}, send_len={}, write_len={}", | |
.{ conn.fd, comp.flags.state, send_len, conn.write_len }, | |
); | |
} | |
std.log.debug( | |
"Send to {} ({} bytes): {any}", | |
.{ send.fd, send_len, send.buffer.slice[0..send_len] }, | |
); | |
if (conn.write_len != send_len) { | |
std.log.err( | |
"[{}] Invalid send len: {}/{} (send_len/write_len)", | |
.{ conn.fd, send_len, conn.write_len }, | |
); | |
} | |
conn.write_len -= send_len; | |
if (conn.write_len == 0) { | |
// FIXME: this is a bug of libxev (backend = kqueue) | |
// comp.* = .{}; | |
if (conn.pending_packets.popFront()) |pkt| { | |
conn.write(loop, pkt); | |
} | |
return .disarm; | |
} | |
return .rearm; | |
} | |
fn keepAliveCallback( | |
ud: ?*anyopaque, | |
loop: *xev.Loop, | |
comp: *xev.Completion, | |
result: xev.Result, | |
) xev.CallbackAction { | |
std.log.debug("[ timer ] comp: {} result: {any}", .{ comp.flags.state, result }); | |
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?))); | |
if (conn.closing) { | |
std.log.debug("[timer] connection already closed", .{}); | |
return .disarm; | |
} | |
conn.write(loop, .pingreq); | |
const now_ms = std.time.milliTimestamp(); | |
const next_ms = conn.keep_alive * 1000; | |
if (now_ms - conn.last_ping >= 1000 + next_ms) { | |
std.log.err( | |
"[{}] Ping delay more than 1 seconds ({}ms)", | |
.{ conn.fd, now_ms - conn.last_ping }, | |
); | |
} | |
conn.last_ping = now_ms; | |
loop.timer(&conn.keep_alive_comp, next_ms, conn, keepAliveCallback); | |
return .disarm; | |
} | |
fn closeCallback( | |
ud: ?*anyopaque, | |
loop: *xev.Loop, | |
_: *xev.Completion, | |
result: xev.Result, | |
) xev.CallbackAction { | |
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?))); | |
const now_ms = std.time.milliTimestamp(); | |
std.log.info( | |
"[ close ][{}] result: {any}, last_ping: {}, now_ms: {} ({})", | |
.{ conn.fd, result, conn.last_ping, now_ms, now_ms - conn.last_ping }, | |
); | |
conn.ctrl_comp = .{ | |
.op = .{ .cancel = .{ .c = &conn.keep_alive_comp } }, | |
.userdata = conn, | |
.callback = cancelCallback, | |
}; | |
loop.add(&conn.ctrl_comp); | |
return .disarm; | |
} | |
fn cancelCallback( | |
ud: ?*anyopaque, | |
_: *xev.Loop, | |
_: *xev.Completion, | |
result: xev.Result, | |
) xev.CallbackAction { | |
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?))); | |
std.log.info("[ cancel][{}] result: {any}", .{ conn.fd, result }); | |
conn.deinit(); | |
return .disarm; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The libxev commit id:
1dd3c9015a542757b049f6d33beb8941f57bce1f