Skip to content

Instantly share code, notes, and snippets.

@TheWaWaR
Last active July 25, 2024 09:43
Show Gist options
  • Save TheWaWaR/3624c5e8f92f4207613fcae7b111bf2e to your computer and use it in GitHub Desktop.
Save TheWaWaR/3624c5e8f92f4207613fcae7b111bf2e to your computer and use it in GitHub Desktop.
"Ghost" timer callback in macos
const std = @import("std");
const mem = std.mem;
const math = std.math;
const Allocator = mem.Allocator;
const assert = std.debug.assert;
/// Double-ended queue ported from Rust's standard library, which is provided under MIT License.
/// It can be found at https://github.com/rust-lang/rust/blob/master/LICENSE-MIT
pub fn Deque(comptime T: type) type {
return struct {
/// tail and head are pointers into the buffer. Tail always points
/// to the first element that could be read, Head always points
/// to where data should be written.
/// If tail == head the buffer is empty. The length of the ringbuffer
/// is defined as the distance between the two.
tail: usize,
head: usize,
/// Users should **NOT** use this field directly.
/// In order to access an item with an index, use `get` method.
/// If you want to iterate over the items, call `iterator` method to get an iterator.
buf: []T,
allocator: Allocator,
const Self = @This();
const INITIAL_CAPACITY = 7; // 2^3 - 1
const MINIMUM_CAPACITY = 1; // 2 - 1
/// Creates an empty deque.
/// Deinitialize with `deinit`.
pub fn init(allocator: Allocator) Allocator.Error!Self {
return initCapacity(allocator, INITIAL_CAPACITY);
}
/// Creates an empty deque with space for at least `capacity` elements.
///
/// Note that there is no guarantee that the created Deque has the specified capacity.
/// If it is too large, this method gives up meeting the capacity requirement.
/// In that case, it will instead create a Deque with the default capacity anyway.
///
/// Deinitialize with `deinit`.
pub fn initCapacity(allocator: Allocator, capacity: usize) Allocator.Error!Self {
const effective_cap =
math.ceilPowerOfTwo(usize, @max(capacity +| 1, MINIMUM_CAPACITY + 1)) catch
math.ceilPowerOfTwoAssert(usize, INITIAL_CAPACITY + 1);
const buf = try allocator.alloc(T, effective_cap);
return Self{
.tail = 0,
.head = 0,
.buf = buf,
.allocator = allocator,
};
}
/// Release all allocated memory.
pub fn deinit(self: Self) void {
self.allocator.free(self.buf);
}
/// Returns the length of the already-allocated buffer.
pub fn cap(self: Self) usize {
return self.buf.len;
}
/// Returns the number of elements in the deque.
pub fn len(self: Self) usize {
return count(self.tail, self.head, self.cap());
}
/// Gets the pointer to the element with the given index, if any.
/// Otherwise it returns `null`.
pub fn get(self: Self, index: usize) ?*T {
if (index >= self.len()) return null;
const idx = self.wrapAdd(self.tail, index);
return &self.buf[idx];
}
/// Gets the pointer to the first element, if any.
pub fn front(self: Self) ?*T {
return self.get(0);
}
/// Gets the pointer to the last element, if any.
pub fn back(self: Self) ?*T {
const last_idx = math.sub(usize, self.len(), 1) catch return null;
return self.get(last_idx);
}
/// Adds the given element to the back of the deque.
pub fn pushBack(self: *Self, item: T) Allocator.Error!void {
if (self.isFull()) {
try self.grow();
}
const head = self.head;
self.head = self.wrapAdd(self.head, 1);
self.buf[head] = item;
}
/// Adds the given element to the front of the deque.
pub fn pushFront(self: *Self, item: T) Allocator.Error!void {
if (self.isFull()) {
try self.grow();
}
self.tail = self.wrapSub(self.tail, 1);
const tail = self.tail;
self.buf[tail] = item;
}
/// Pops and returns the last element of the deque.
pub fn popBack(self: *Self) ?T {
if (self.len() == 0) return null;
self.head = self.wrapSub(self.head, 1);
const head = self.head;
const item = self.buf[head];
self.buf[head] = undefined;
return item;
}
/// Pops and returns the first element of the deque.
pub fn popFront(self: *Self) ?T {
if (self.len() == 0) return null;
const tail = self.tail;
self.tail = self.wrapAdd(self.tail, 1);
const item = self.buf[tail];
self.buf[tail] = undefined;
return item;
}
/// Adds all the elements in the given slice to the back of the deque.
pub fn appendSlice(self: *Self, items: []const T) Allocator.Error!void {
for (items) |item| {
try self.pushBack(item);
}
}
/// Adds all the elements in the given slice to the front of the deque.
pub fn prependSlice(self: *Self, items: []const T) Allocator.Error!void {
if (items.len == 0) return;
var i: usize = items.len - 1;
while (true) : (i -= 1) {
const item = items[i];
try self.pushFront(item);
if (i == 0) break;
}
}
/// Returns an iterator over the deque.
/// Modifying the deque may invalidate this iterator.
pub fn iterator(self: Self) Iterator {
return .{
.head = self.head,
.tail = self.tail,
.ring = self.buf,
};
}
pub fn format(self: *const Self, comptime _: []const u8, _: std.fmt.FormatOptions, writer: anytype) !void {
try writer.writeAll("Deque(");
try std.fmt.format(writer, "{}", .{T});
try writer.writeAll(") { .buf = [");
var it = self.iterator();
if (it.next()) |val| try writer.print("{any}", .{val});
while (it.next()) |val| try writer.print(", {any}", .{val});
try writer.writeAll("], .head = ");
try std.fmt.format(writer, "{}", .{self.head});
try writer.writeAll(", .tail = ");
try std.fmt.format(writer, "{}", .{self.tail});
try writer.writeAll(", .len = ");
try std.fmt.format(writer, "{}", .{self.len()});
try writer.writeAll(" }");
}
pub const Iterator = struct {
head: usize,
tail: usize,
ring: []T,
pub fn next(it: *Iterator) ?*T {
if (it.head == it.tail) return null;
const tail = it.tail;
it.tail = wrapIndex(it.tail +% 1, it.ring.len);
return &it.ring[tail];
}
pub fn nextBack(it: *Iterator) ?*T {
if (it.head == it.tail) return null;
it.head = wrapIndex(it.head -% 1, it.ring.len);
return &it.ring[it.head];
}
};
/// Returns `true` if the buffer is at full capacity.
fn isFull(self: Self) bool {
return self.cap() - self.len() == 1;
}
fn grow(self: *Self) Allocator.Error!void {
assert(self.isFull());
const old_cap = self.cap();
// Reserve additional space to accomodate more items
self.buf = try self.allocator.realloc(self.buf, old_cap * 2);
// Update `tail` and `head` pointers accordingly
self.handleCapacityIncrease(old_cap);
assert(self.cap() >= old_cap * 2);
assert(!self.isFull());
}
/// Updates `tail` and `head` values to handle the fact that we just reallocated the internal buffer.
fn handleCapacityIncrease(self: *Self, old_capacity: usize) void {
const new_capacity = self.cap();
// Move the shortest contiguous section of the ring buffer.
// There are three cases to consider:
//
// (A) No need to update
// T H
// before: [o o o o o o o . ]
//
// after : [o o o o o o o . . . . . . . . . ]
// T H
//
//
// (B) [..H] needs to be moved
// H T
// before: [o o . o o o o o ]
// ---
// |_______________.
// |
// v
// ---
// after : [. . . o o o o o o o . . . . . . ]
// T H
//
//
// (C) [T..old_capacity] needs to be moved
// H T
// before: [o o o o o . o o ]
// ---
// |_______________.
// |
// v
// ---
// after : [o o o o o . . . . . . . . . o o ]
// H T
if (self.tail <= self.head) {
// (A), Nop
} else if (self.head < old_capacity - self.tail) {
// (B)
self.copyNonOverlapping(old_capacity, 0, self.head);
self.head += old_capacity;
assert(self.head > self.tail);
} else {
// (C)
const new_tail = new_capacity - (old_capacity - self.tail);
self.copyNonOverlapping(new_tail, self.tail, old_capacity - self.tail);
self.tail = new_tail;
assert(self.head < self.tail);
}
assert(self.head < self.cap());
assert(self.tail < self.cap());
}
fn copyNonOverlapping(self: *Self, dest: usize, src: usize, length: usize) void {
assert(dest + length <= self.cap());
assert(src + length <= self.cap());
@memcpy(self.buf[dest .. dest + length], self.buf[src .. src + length]);
}
fn wrapAdd(self: Self, idx: usize, addend: usize) usize {
return wrapIndex(idx +% addend, self.cap());
}
fn wrapSub(self: Self, idx: usize, subtrahend: usize) usize {
return wrapIndex(idx -% subtrahend, self.cap());
}
};
}
fn count(tail: usize, head: usize, size: usize) usize {
assert(math.isPowerOfTwo(size));
return (head -% tail) & (size - 1);
}
fn wrapIndex(index: usize, size: usize) usize {
assert(math.isPowerOfTwo(size));
return index & (size - 1);
}
test "Deque works" {
const testing = std.testing;
var deque = try Deque(usize).init(testing.allocator);
defer deque.deinit();
// empty deque
try testing.expectEqual(@as(usize, 0), deque.len());
try testing.expect(deque.get(0) == null);
try testing.expect(deque.front() == null);
try testing.expect(deque.back() == null);
try testing.expect(deque.popBack() == null);
try testing.expect(deque.popFront() == null);
// pushBack
try deque.pushBack(101);
try testing.expectEqual(@as(usize, 1), deque.len());
try testing.expectEqual(@as(usize, 101), deque.get(0).?.*);
try testing.expectEqual(@as(usize, 101), deque.front().?.*);
try testing.expectEqual(@as(usize, 101), deque.back().?.*);
// pushFront
try deque.pushFront(100);
try testing.expectEqual(@as(usize, 2), deque.len());
try testing.expectEqual(@as(usize, 100), deque.get(0).?.*);
try testing.expectEqual(@as(usize, 100), deque.front().?.*);
try testing.expectEqual(@as(usize, 101), deque.get(1).?.*);
try testing.expectEqual(@as(usize, 101), deque.back().?.*);
// more items
{
var i: usize = 99;
while (true) : (i -= 1) {
try deque.pushFront(i);
if (i == 0) break;
}
}
{
var i: usize = 102;
while (i < 200) : (i += 1) {
try deque.pushBack(i);
}
}
try testing.expectEqual(@as(usize, 200), deque.len());
{
var i: usize = 0;
while (i < deque.len()) : (i += 1) {
try testing.expectEqual(i, deque.get(i).?.*);
}
}
{
var i: usize = 0;
var it = deque.iterator();
while (it.next()) |val| : (i += 1) {
try testing.expectEqual(i, val.*);
}
try testing.expectEqual(@as(usize, 200), i);
}
}
test "initCapacity with too large capacity" {
const testing = std.testing;
var deque = try Deque(i32).initCapacity(testing.allocator, math.maxInt(usize));
defer deque.deinit();
// The specified capacity `math.maxInt(usize)` was too large.
// Internally this is just ignored, and the default capacity is used instead.
try testing.expectEqual(@as(usize, 8), deque.buf.len);
}
test "appendSlice and prependSlice" {
const testing = std.testing;
var deque = try Deque(usize).init(testing.allocator);
defer deque.deinit();
try deque.prependSlice(&[_]usize{ 1, 2, 3, 4, 5, 6 });
try deque.appendSlice(&[_]usize{ 7, 8, 9 });
try deque.prependSlice(&[_]usize{0});
try deque.appendSlice(&[_]usize{ 10, 11, 12, 13, 14 });
{
var i: usize = 0;
while (i <= 14) : (i += 1) {
try testing.expectEqual(i, deque.get(i).?.*);
}
}
}
test "format" {
const testing = std.testing;
var deque = try Deque(usize).init(testing.allocator);
defer deque.deinit();
try deque.pushBack(69);
try deque.pushBack(420);
std.debug.print("{}\n", .{deque});
}
test "nextBack" {
const testing = std.testing;
var deque = try Deque(usize).init(testing.allocator);
defer deque.deinit();
try deque.appendSlice(&[_]usize{ 5, 4, 3, 2, 1, 0 });
{
var i: usize = 0;
var it = deque.iterator();
while (it.nextBack()) |val| : (i += 1) {
try testing.expectEqual(i, val.*);
}
}
}
test "code sample in README" {
var deque = try Deque(usize).init(std.testing.allocator);
defer deque.deinit();
try deque.pushBack(1);
try deque.pushBack(2);
try deque.pushFront(0);
std.debug.assert(deque.get(0).?.* == @as(usize, 0));
std.debug.assert(deque.get(1).?.* == @as(usize, 1));
std.debug.assert(deque.get(2).?.* == @as(usize, 2));
std.debug.assert(deque.get(3) == null);
var it = deque.iterator();
var sum: usize = 0;
while (it.next()) |val| {
sum += val.*;
}
std.debug.assert(sum == 3);
std.debug.assert(deque.popFront().? == @as(usize, 0));
std.debug.assert(deque.popBack().? == @as(usize, 2));
}
const std = @import("std");
const flags = @import("flags");
const xev = @import("xev");
const mqtt = @import("mqtt");
const deque = @import("deque.zig");
const Instant = std.time.Instant;
const Utf8View = std.unicode.Utf8View;
const mem = std.mem;
const net = std.net;
const posix = std.posix;
const assert = std.debug.assert;
const Packet = mqtt.v3.Packet;
const Connect = mqtt.v3.Connect;
pub const std_options = .{
// Set the log level to info
.log_level = .info,
};
pub fn main() !void {
var general_purpose_allocator = std.heap.GeneralPurposeAllocator(.{}){};
const gpa = general_purpose_allocator.allocator();
var args = try std.process.argsWithAllocator(gpa);
defer args.deinit();
const cli = flags.parse(&args, Cli, .{});
const addrs = try std.net.getAddressList(gpa, cli.host, cli.port);
std.log.info("Server: {s}:{}, addrs: {any}", .{ cli.host, cli.port, addrs.addrs });
if (addrs.addrs.len == 0) {
std.log.err("Can't resolve host: {s}", .{cli.host});
return;
}
const addr = addrs.addrs[0];
var loop = try xev.Loop.init(.{});
defer loop.deinit();
for (0..cli.connections) |idx| {
// Create a TCP client socket
const client_conn = try posix.socket(
addr.any.family,
posix.SOCK.NONBLOCK | posix.SOCK.STREAM | posix.SOCK.CLOEXEC,
0,
);
errdefer posix.close(client_conn);
std.log.info("[{}] {} connect to {any}", .{ idx, client_conn, addr });
const conn = Connection.init(gpa, idx, client_conn, cli.keep_alive);
// Accept
conn.ctrl_comp = .{
.op = .{ .connect = .{ .socket = client_conn, .addr = addr } },
.userdata = conn,
.callback = connectCallback,
};
loop.add(&conn.ctrl_comp);
for (0..50) |_| {
loop.run(.once) catch unreachable;
if (conn.connected) {
std.log.info("[{}] {} connected", .{ idx, client_conn });
break;
}
}
}
// Run the loop until there are no more completions.
try loop.run(.until_done);
}
const Cli = struct {
pub const name = "mqtt-bench";
pub const help =
\\A MQTT benchmark tool written in Zig.
\\Based on io_uring, it is blazing fast!
;
host: []const u8 = "localhost",
port: u16 = 1883,
keep_alive: u16 = 5,
connections: u32 = 1000,
pub const descriptions = .{
.host = "Host address",
.port = "Host port",
.keep_alive = "Keep alive seconds",
.connections = "The number of connections",
};
pub const switches = .{
.host = 'H',
.keep_alive = 'k',
.connections = 'c',
};
};
const PacketQueue = deque.Deque(Packet);
const Connection = struct {
id: usize,
fd: posix.socket_t,
last_ping: i64 = 0,
closing: bool = false,
connected: bool = false,
keep_alive: u16,
client_id: [64]u8 = undefined,
read_buf: [256]u8 = undefined,
write_buf: [256]u8 = undefined,
write_len: usize = 0,
// TODO: sizeof(xev.Completion) == 256
read_comp: xev.Completion = .{},
write_comp: xev.Completion = .{},
keep_alive_comp: xev.Completion = .{},
ctrl_comp: xev.Completion = .{},
pending_packets: PacketQueue,
allocator: mem.Allocator,
pub fn init(
allocator: mem.Allocator,
id: usize,
new_fd: posix.socket_t,
keep_alive: u16,
) *Connection {
const conn = allocator.create(Connection) catch unreachable;
conn.* = .{
.id = id,
.fd = new_fd,
.last_ping = std.time.milliTimestamp(),
.keep_alive = keep_alive,
.pending_packets = PacketQueue.init(allocator) catch unreachable,
.allocator = allocator,
};
return conn;
}
pub fn deinit(self: *Connection) void {
self.pending_packets.deinit();
self.allocator.destroy(self);
}
pub fn read(self: *Connection, loop: *xev.Loop) void {
if (self.read_comp.flags.state == .dead) {
self.read_comp = .{
.op = .{
.recv = .{
.fd = self.fd,
.buffer = .{ .slice = self.read_buf[0..] },
},
},
.userdata = self,
.callback = recvCallback,
};
loop.add(&self.read_comp);
}
}
pub fn write(self: *Connection, loop: *xev.Loop, pkt: Packet) void {
if (self.write_comp.flags.state != .dead or self.write_len > 0) {
std.log.info("add to pending: {any}, {}/{}", .{ pkt, self.write_comp.flags.state, self.write_len });
self.pending_packets.pushBack(pkt) catch unreachable;
} else {
std.log.debug("write: {any}", .{pkt});
var len: usize = 0;
pkt.encode(self.write_buf[0..], &len) catch unreachable;
self.write_comp = .{
.op = .{
.send = .{
.fd = self.fd,
.buffer = .{ .slice = self.write_buf[0..len] },
},
},
.userdata = self,
.callback = sendCallback,
};
if (len == 0) {
@panic("zero len");
}
self.write_len = len;
loop.add(&self.write_comp);
}
}
pub fn close(self: *Connection, loop: *xev.Loop) void {
self.closing = true;
self.ctrl_comp = .{
.op = .{ .close = .{ .fd = self.fd } },
.userdata = self,
.callback = closeCallback,
};
loop.add(&self.ctrl_comp);
}
};
fn connectCallback(
ud: ?*anyopaque,
loop: *xev.Loop,
_: *xev.Completion,
result: xev.Result,
) xev.CallbackAction {
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?)));
std.log.info("[connect][{}] result: {any}", .{ conn.fd, result });
const ts_ms = std.time.milliTimestamp();
const client_id = std.fmt.bufPrint(
conn.client_id[0..],
"{}-{}",
.{ ts_ms, conn.fd },
) catch unreachable;
const pkt = Packet{ .connect = Connect{
.protocol = .V311,
.clean_session = true,
.keep_alive = conn.keep_alive,
.client_id = Utf8View.initUnchecked(client_id),
} };
conn.read(loop);
conn.write(loop, pkt);
loop.timer(&conn.keep_alive_comp, conn.keep_alive * 1000, conn, keepAliveCallback);
return .disarm;
}
fn recvCallback(
ud: ?*anyopaque,
loop: *xev.Loop,
comp: *xev.Completion,
result: xev.Result,
) xev.CallbackAction {
std.log.debug("[ recv ] result: {any}", .{result});
const recv = comp.op.recv;
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?)));
if (conn.closing) {
std.log.debug("[send] connection already closed", .{});
return .disarm;
}
const read_len = result.recv catch {
std.log.debug("close conn when recv", .{});
conn.close(loop);
return .disarm;
};
const pkt = Packet.decode(recv.buffer.slice[0..read_len]) catch unreachable;
std.log.debug(
"Recv from {} ({} bytes): {any}, {any}",
.{ recv.fd, read_len, recv.buffer.slice[0..read_len], pkt },
);
if (!conn.connected) {
if (pkt.connack.code != .accepted) {
std.log.info("[{}] Invalid connack: {}", .{ conn.fd, pkt.connack.code });
conn.close(loop);
return .disarm;
}
conn.connected = true;
}
return .rearm;
}
fn sendCallback(
ud: ?*anyopaque,
loop: *xev.Loop,
comp: *xev.Completion,
result: xev.Result,
) xev.CallbackAction {
std.log.debug("[ send ] result: {any}", .{result});
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?)));
if (conn.closing) {
std.log.debug("[send] connection already closed", .{});
return .disarm;
}
const send = comp.op.send;
const send_len = result.send catch {
std.log.debug("close conn when send", .{});
conn.close(loop);
return .disarm;
};
if (comp.flags.state != .dead) {
std.log.err(
"[{}] Invalid state={}, send_len={}, write_len={}",
.{ conn.fd, comp.flags.state, send_len, conn.write_len },
);
}
std.log.debug(
"Send to {} ({} bytes): {any}",
.{ send.fd, send_len, send.buffer.slice[0..send_len] },
);
if (conn.write_len != send_len) {
std.log.err(
"[{}] Invalid send len: {}/{} (send_len/write_len)",
.{ conn.fd, send_len, conn.write_len },
);
}
conn.write_len -= send_len;
if (conn.write_len == 0) {
// FIXME: this is a bug of libxev (backend = kqueue)
// comp.* = .{};
if (conn.pending_packets.popFront()) |pkt| {
conn.write(loop, pkt);
}
return .disarm;
}
return .rearm;
}
fn keepAliveCallback(
ud: ?*anyopaque,
loop: *xev.Loop,
comp: *xev.Completion,
result: xev.Result,
) xev.CallbackAction {
std.log.debug("[ timer ] comp: {} result: {any}", .{ comp.flags.state, result });
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?)));
if (conn.closing) {
std.log.debug("[timer] connection already closed", .{});
return .disarm;
}
conn.write(loop, .pingreq);
const now_ms = std.time.milliTimestamp();
const next_ms = conn.keep_alive * 1000;
if (now_ms - conn.last_ping >= 1000 + next_ms) {
std.log.err(
"[{}] Ping delay more than 1 seconds ({}ms)",
.{ conn.fd, now_ms - conn.last_ping },
);
}
conn.last_ping = now_ms;
loop.timer(&conn.keep_alive_comp, next_ms, conn, keepAliveCallback);
return .disarm;
}
fn closeCallback(
ud: ?*anyopaque,
loop: *xev.Loop,
_: *xev.Completion,
result: xev.Result,
) xev.CallbackAction {
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?)));
const now_ms = std.time.milliTimestamp();
std.log.info(
"[ close ][{}] result: {any}, last_ping: {}, now_ms: {} ({})",
.{ conn.fd, result, conn.last_ping, now_ms, now_ms - conn.last_ping },
);
conn.ctrl_comp = .{
.op = .{ .cancel = .{ .c = &conn.keep_alive_comp } },
.userdata = conn,
.callback = cancelCallback,
};
loop.add(&conn.ctrl_comp);
return .disarm;
}
fn cancelCallback(
ud: ?*anyopaque,
_: *xev.Loop,
_: *xev.Completion,
result: xev.Result,
) xev.CallbackAction {
const conn = @as(*Connection, @ptrCast(@alignCast(ud.?)));
std.log.info("[ cancel][{}] result: {any}", .{ conn.fd, result });
conn.deinit();
return .disarm;
}
@TheWaWaR
Copy link
Author

The libxev commit id: 1dd3c9015a542757b049f6d33beb8941f57bce1f

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment