Skip to content

Instantly share code, notes, and snippets.

@worldOneo
Created October 15, 2023 11:41
Show Gist options
  • Save worldOneo/52c8629b9014baf4ed3d735296133e0b to your computer and use it in GitHub Desktop.
Save worldOneo/52c8629b9014baf4ed3d735296133e0b to your computer and use it in GitHub Desktop.
Epoll based Event Loop chat application
const std = @import("std");
// primitive of the event loop.
// Ctx is typically a pointer to *This and data a pointer to the Users data.
// destroyData is called after the event has finished and must clean up the event to avoid leaks.
const Event = struct {
ctx: *anyopaque,
data: *anyopaque,
name: []const u8,
destroyData: *const fn (*anyopaque, *anyopaque) void,
};
// event handler receive events.
// handlerfn is called with handler, event, bus
const EventHandler = struct {
handlerfn: *const fn (*anyopaque, *anyopaque, *EventBus) void,
handler: *anyopaque,
event: []const u8,
};
// EventBus and EventLoop
// The event loop is a queue of all event and it just spins in a circle navigating all the events to handlers
const EventBus = struct {
handlerlist: std.StringHashMap(std.ArrayList(EventHandler)),
pending: std.TailQueue(Event),
allocator: std.mem.Allocator,
const This = @This();
pub fn init(alloc: std.mem.Allocator) !This {
return This{
.handlerlist = std.StringHashMap(std.ArrayList(EventHandler)).init(alloc),
.pending = std.TailQueue(Event){},
.allocator = alloc,
};
}
// addHandler registers a new event handler
pub fn addHandler(this: *This, handler: EventHandler) void {
var get_or_put = this.handlerlist.getOrPut(handler.event) catch unreachable;
if (!get_or_put.found_existing) {
get_or_put.value_ptr.* = std.ArrayList(EventHandler).init(this.allocator);
}
get_or_put.value_ptr.append(handler) catch unreachable;
}
// emit a new event into the event loop.
// the order of execution is equall to the order of emission.
pub fn emit(this: *This, event: Event) void {
var node = this.allocator.create(std.TailQueue(Event).Node) catch unreachable;
node.data = event;
this.pending.append(node);
}
pub fn run(this: *This) void {
while (this.pending.popFirst()) |event_node| {
const event = event_node.data;
const name = event.name;
if (this.handlerlist.get(name)) |list| {
for (list.items) |handler| {
handler.handlerfn(handler.handler, event.data, this);
}
}
this.allocator.destroy(event_node);
event.destroyData(event.ctx, event.data);
}
}
};
// mostly just epoll implementation here
const Epoll = struct {
epfd: i32,
sock: std.net.StreamServer,
allocator: std.mem.Allocator,
const linux = std.os.linux;
// The events this epoll implementation exposes.
// This implementation doesn't emit close.
const EpollAccept = struct { fd: i32, userctx: **allowzero anyopaque };
const EpollDataReady = struct { fd: i32, data: *std.ArrayList(u8), userctx: **allowzero anyopaque };
const EpollWrite = struct { fd: i32, data: std.ArrayList(u8) };
const This = @This();
pub fn init(alloc: std.mem.Allocator) !This {
var sock = std.net.StreamServer.init(.{});
sock.reuse_port = true;
try sock.listen(try std.net.Address.resolveIp("0.0.0.0", 5151));
const epfd = try std.os.epoll_create1(0);
const ctx = try Ctx.init(alloc);
ctx.client_fd = epfd;
var event = linux.epoll_event{
.events = linux.EPOLL.IN,
.data = .{ .ptr = @intFromPtr(ctx) },
};
try std.os.epoll_ctl(epfd, linux.EPOLL.CTL_ADD, sock.sockfd.?, &event);
return This{
.epfd = epfd,
.sock = sock,
.allocator = alloc,
};
}
const Ctx = struct {
readbuffer: std.ArrayList(u8),
client_fd: i32,
userctx: *allowzero anyopaque,
pub fn init(alloc: std.mem.Allocator) !*@This() {
var ctx = try alloc.create(@This());
ctx.readbuffer = std.ArrayList(u8).init(alloc);
return ctx;
}
pub fn deinit(this: *@This()) void {
this.readbuffer.deinit();
}
};
fn epoll_add(this: *This, fd: i32, events: u32) !void {
var ctx = try Ctx.init(this.allocator);
ctx.client_fd = fd;
var epoll_event: linux.epoll_event = undefined;
epoll_event.events = events;
epoll_event.data.ptr = @intFromPtr(ctx);
try std.os.epoll_ctl(this.epfd, linux.EPOLL.CTL_ADD, fd, &epoll_event);
}
fn destroyEvent(comptime E: type) *const fn (*anyopaque, *anyopaque) void {
return struct {
fn destroy(opaque_this: *anyopaque, opaque_event: *anyopaque) void {
var this: *This = @ptrCast(@alignCast(opaque_this));
var event: *E = @ptrCast(@alignCast(opaque_event));
this.allocator.destroy(event);
}
}.destroy;
}
// create a event that if emitted will send data to the fd
pub fn writeEvent(this: *This, fd: i32, data: std.ArrayList(u8)) Event {
var e = this.allocator.create(EpollWrite) catch unreachable;
e.* = EpollWrite{ .fd = fd, .data = data };
return Event{
.ctx = this,
.destroyData = destroyEvent(EpollWrite),
.data = e,
.name = "Epoll.Write",
};
}
pub fn readEvent(this: *This) Event {
var fake: EpollAccept = undefined;
return Event{
.ctx = this,
.data = @alignCast(@ptrCast(&fake)),
.destroyData = struct {
fn f(_: *anyopaque, _: *anyopaque) void {}
}.f,
.name = "Epoll.Read",
};
}
// just the usuall epoll stuff here
fn read(ctx_this: *anyopaque, _: *anyopaque, bus: *EventBus) void {
var this: *This = @ptrCast(@alignCast(ctx_this));
const server_fd = this.sock.sockfd orelse unreachable;
var events: [16]linux.epoll_event = undefined;
const rc = linux.epoll_wait(this.epfd, &events, 16, 5);
switch (std.os.errno(rc)) {
.SUCCESS => {
for (events[0..rc]) |event| {
const ctx = @as(*Ctx, @ptrFromInt(event.data.ptr));
if (ctx.client_fd == this.epfd) {
var addr: linux.sockaddr = undefined;
var size: linux.socklen_t = @sizeOf(linux.sockaddr);
const err_client_fd = linux.accept(server_fd, &addr, &size);
const client_fd: i32 = @intCast(err_client_fd);
// set non blocking
_ = linux.fcntl(client_fd, linux.F.SETFD, linux.fcntl(client_fd, linux.F.GETFD, 0) | linux.O.NONBLOCK);
this.epoll_add(client_fd, linux.EPOLL.IN | linux.EPOLL.HUP | linux.EPOLL.RDHUP) catch |err| {
std.log.err("Failed to configure {} because {}", .{ client_fd, err });
continue;
};
var data = this.allocator.create(EpollAccept) catch unreachable;
data.* = EpollAccept{ .fd = client_fd, .userctx = &ctx.userctx };
bus.emit(Event{
.data = data,
.name = "Epoll.Accept",
.ctx = this,
.destroyData = destroyEvent(EpollAccept),
});
} else if (event.events & linux.EPOLL.IN != 0) {
var buff: [1024]u8 = undefined;
const bytes_read = linux.read(ctx.client_fd, buff[0..1024], 1024);
ctx.readbuffer.appendSlice(buff[0..bytes_read]) catch unreachable;
var data = this.allocator.create(EpollDataReady) catch unreachable;
data.* = EpollDataReady{
.fd = ctx.client_fd,
.userctx = &ctx.userctx,
.data = &ctx.readbuffer,
};
bus.emit(Event{
.data = data,
.name = "Epoll.Data",
.ctx = this,
.destroyData = destroyEvent(EpollDataReady),
});
} else if (event.events & (linux.EPOLL.HUP | linux.EPOLL.RDHUP) != 0) {
_ = linux.epoll_ctl(this.epfd, linux.EPOLL.CTL_DEL, ctx.client_fd, null);
_ = linux.close(ctx.client_fd);
ctx.deinit();
this.allocator.destroy(ctx);
}
}
},
.INVAL => {
std.log.err("errno = INVAL", .{});
std.os.exit(1);
},
else => unreachable,
}
// "recursive" tail call
bus.emit(this.readEvent());
}
fn write(_: *anyopaque, ev: *anyopaque, _: *EventBus) void {
var data: *EpollWrite = @ptrCast(@alignCast(ev));
_ = linux.write(data.fd, @ptrCast(data.data.items), data.data.items.len);
}
pub fn deinit(this: *This) void {
this.sock.deinit();
std.os.close(this.epfd);
}
};
const Server = struct {
allocator: std.mem.Allocator,
epoll: *Epoll,
const This = @This();
pub fn init(alloc: std.mem.Allocator, e: *Epoll) This {
return This{
.allocator = alloc,
.epoll = e,
};
}
pub const LineReceieved = struct {
fd: i32,
line: []const u8,
slice: []const u8,
line_allocator: std.mem.Allocator,
pub fn destroyData(opaque_this: *anyopaque, opaque_event: *anyopaque) void {
var this: *This = @ptrCast(@alignCast(opaque_this));
var event: *@This() = @ptrCast(@alignCast(opaque_event));
event.line_allocator.free(event.slice);
this.allocator.destroy(event);
}
};
pub fn dataReady(opaque_this: *anyopaque, opaque_event: *anyopaque, bus: *EventBus) void {
var this: *This = @ptrCast(@alignCast(opaque_this));
var event: *Epoll.EpollDataReady = @ptrCast(@alignCast(opaque_event));
while (true) {
// parse the protocol. In this case it is a terminated protocol with \n at the end.
var i: usize = 0;
for (event.data.items, 0..) |x, n| {
if (x == '\n') {
i = n;
}
}
if (i == 0) {
return;
}
// slicing out the line we received. It could be possible to receive one and a half lines or multiple line so we work out the line.
var slice = event.data.toOwnedSlice() catch unreachable;
var line = slice[0..i];
event.data.appendSlice(slice[(i + 1)..slice.len]) catch unreachable;
var data = this.allocator.create(LineReceieved) catch unreachable;
data.* = LineReceieved{
.fd = event.fd,
.line = line,
.slice = slice,
.line_allocator = event.data.allocator,
};
// emitting the line to the event bus
bus.emit(Event{
.ctx = this,
.destroyData = LineReceieved.destroyData,
.data = data,
.name = "Server.Line",
});
}
}
};
const Application = struct {
user: std.AutoHashMap(i32, []const u8),
allocator: std.mem.Allocator,
epoll: *Epoll,
const This = @This();
pub fn init(allocator: std.mem.Allocator, epoll: *Epoll) This {
return This{
.user = std.AutoHashMap(i32, []const u8).init(allocator),
.allocator = allocator,
.epoll = epoll,
};
}
const MessageSentEvent = struct {
msg: std.ArrayList(u8),
pub fn destroyData(opaque_this: *anyopaque, opaque_event: *anyopaque) void {
var this: *This = @ptrCast(@alignCast(opaque_this));
var event: *@This() = @ptrCast(@alignCast(opaque_event));
event.msg.deinit();
this.allocator.destroy(event);
}
};
fn messageSent(this: *This, msg: std.ArrayList(u8)) Event {
var data = this.allocator.create(MessageSentEvent) catch unreachable;
data.* = MessageSentEvent{
.msg = msg,
};
return Event{
.ctx = this,
.data = data,
.destroyData = MessageSentEvent.destroyData,
.name = "Application.MessageSent",
};
}
pub fn lineReceieved(opaque_this: *anyopaque, opaque_event: *anyopaque, bus: *EventBus) void {
var this: *This = @ptrCast(@alignCast(opaque_this));
var event: *Server.LineReceieved = @ptrCast(@alignCast(opaque_event));
// check if user has been registered
if (this.user.get(event.fd)) |name| {
// format the message
var message = std.ArrayList(u8).init(this.allocator);
message.appendSlice(name) catch unreachable;
message.appendSlice(" -> ") catch unreachable;
message.appendSlice(event.line) catch unreachable;
message.appendSlice("\n") catch unreachable;
// broadcast the message to all users
var users = this.user.iterator();
while (users.next()) |user| {
bus.emit(this.epoll.writeEvent(user.key_ptr.*, message));
}
// this event could be used in other parts of the application
bus.emit(this.messageSent(message));
} else {
// register user
// for that we must clone the name, because event.line is owned by the event.
var name = this.allocator.alloc(u8, event.line.len) catch unreachable;
@memcpy(name, event.line);
this.user.put(event.fd, name) catch unreachable;
std.debug.print("User {s} created for fd {}\n", .{ name, event.fd });
return;
}
}
};
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
var allocator = gpa.allocator();
var bus = try EventBus.init(allocator);
var epoll = try Epoll.init(allocator);
var server = Server.init(allocator, &epoll);
var application = Application.init(allocator, &epoll);
// Register all listeners.
// might be useful to do in the init function to keep things neat.
bus.addHandler(EventHandler{
.event = "Epoll.Read",
.handler = &epoll,
.handlerfn = Epoll.read,
});
bus.addHandler(EventHandler{
.event = "Epoll.Write",
.handler = &epoll,
.handlerfn = Epoll.write,
});
bus.addHandler(EventHandler{
.event = "Epoll.Data",
.handler = &server,
.handlerfn = Server.dataReady,
});
bus.addHandler(EventHandler{
.event = "Server.Line",
.handler = &application,
.handlerfn = Application.lineReceieved,
});
// prepare the I/O loop
bus.emit(epoll.readEvent());
// blocks indefinitely running the event loop
bus.run();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment