Skip to content

Instantly share code, notes, and snippets.

@andrewrk
Created June 27, 2018 00:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewrk/23cf64e0b151d2d64f644ea6ce553324 to your computer and use it in GitHub Desktop.
Save andrewrk/23cf64e0b151d2d64f644ea6ce553324 to your computer and use it in GitHub Desktop.
Channel implementation in zig
pub const Loop = struct {
allocator: *mem.Allocator,
epollfd: i32,
keep_running: bool,
next_tick_queue: std.atomic.Queue(promise),
pub const NextTickNode = std.atomic.Queue(promise).Node;
/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
/// when buffer is empty, consumers suspend and are resumed by producers
/// when buffer is full, producers suspend and are resumed by consumers
pub fn Channel(comptime T: type) void {
return struct {
loop: *Loop,
getters: std.atomic.Queue(GetNode),
putters: std.atomic.Queue(PutNode),
get_count: usize,
put_count: usize,
dispatch_lock: u8, // TODO make this a bool
need_dispatch: u8, // TODO make this a bool
// simple fixed size ring buffer
buffer_nodes: []std.atomic.Queue(T).Node,
buffer_index: usize,
buffer_len: usize,
const SelfChannel = this;
const GetNode = struct {
ptr: *T,
handle: promise->T,
};
const PutNode = struct {
data: T,
handle: promise->void,
};
pub fn init(loop: *Loop, capacity: usize) !*SelfChannel {
const buffer_nodes = try loop.allocator.alloc(std.atomic.Queue(T).Node, capacity);
errdefer loop.allocator.free(buffer_nodes);
const self = try loop.allocator.create(SelfChannel{
.loop = loop,
.buffer_len = 0,
.buffer_nodes = buffer_nodes,
.buffer_index = 0,
.dispatch_lock = 0,
.need_dispatch = 0,
.getters = undefined,
.putters = undefined,
.get_count = 0,
.put_count = 0,
});
errdefer loop.allocator.destroy(self);
std.atomic.Queue(GetNode).init(&self.getters);
std.atomic.Queue(GetNode).init(&self.putters);
return self;
}
/// puts a data item in the channel. The promise completes when the value has been added to the
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
pub async fn put(self: *SelfChannel, data: T) void {
// TODO should be able to group memory allocation failure before first suspend point
// so that the async invocation catches it
const dispatch_handle = async self.dispatch() catch unreachable;
suspend |handle| {
var queue_node = std.atomic.Queue(PutNode).Node{
.data = PutNode{
.handle = handle,
.data = data,
},
.next = undefined,
};
self.putters.put(&queue_node);
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
var tick_node = NextTickNode{
.next = undefined,
.data = dispatch_handle,
};
self.loop.onNextTick(&tick_node);
}
}
/// await this function to get an item from the channel. If the buffer is empty, the promise will
/// complete when the next item is put in the channel.
pub async fn get(self: *SelfChannel) T {
// TODO should be able to group memory allocation failure before first suspend point
// so that the async invocation catches it
const dispatch_handle = async self.dispatch() catch unreachable;
// TODO integrate this function with named return values
// so we can get rid of this extra result copy
var result: T = undefined;
suspend |handle| {
var queue_node = std.atomic.Queue(GetNode).Node{
.data = GetNode{
.ptr = &result,
.handle = handle,
},
.next = undefined,
};
self.getters.put(&queue_node);
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
var tick_node = NextTickNode{
.next = undefined,
.data = dispatch_handle,
};
self.loop.onNextTick(&tick_node);
}
return result;
}
async fn dispatch(self: *SelfChannel) void {
suspend; // resumed by onNextTick
// set the "need dispatch" flag
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
lock: while (true) {
// set the lock flag
const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
if (prev_lock != 0) return;
// clear the need_dispatch flag since we're about to do it
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
while (true) {
one_dispatch: {
// later we correct these extra subtractions
var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
// transfer self.buffer to self.getters
while (self.buffer_len != 0) {
if (get_count == 0) break :one_dispatch;
const get_node = self.getters.get().?;
get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
self.loop.onNextTick(get_node.handle);
self.buffer_len -= 1;
get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
}
// direct transfer self.putters to self.getters
while (get_count != 0 and put_count != 0) {
const get_node = self.getters.get().?;
const put_node = self.putters.get().?;
get_node.ptr.* = put_node.data;
self.loop.onNextTick(get_node.handle);
self.loop.onNextTick(put_node.handle);
get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
}
// transfer self.putters to self.buffer
while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
const put_node = self.putters.get().?;
self.buffer_nodes[self.buffer_index] = put_node.data;
self.loop.onNextTick(put_node.handle);
self.buffer_index +%= 1;
self.buffer_len += 1;
}
}
// undo the extra subtractions
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
// clear need-dispatch flag
const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
if (need_dispatch != 0) continue;
const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
assert(my_lock != 0);
// we have to check again now that we unlocked
if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock;
return;
}
}
}
};
}
fn init(allocator: *mem.Allocator) !Loop {
const epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC);
return Loop{
.keep_running = true,
.allocator = allocator,
.epollfd = epollfd,
};
}
pub fn addFd(self: *Loop, fd: i32, prom: promise) !void {
var ev = std.os.linux.epoll_event{
.events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
.data = std.os.linux.epoll_data{ .ptr = @ptrToInt(prom) },
};
try std.os.linuxEpollCtl(self.epollfd, std.os.linux.EPOLL_CTL_ADD, fd, &ev);
}
pub fn removeFd(self: *Loop, fd: i32) void {
std.os.linuxEpollCtl(self.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
}
async fn waitFd(self: *Loop, fd: i32) !void {
defer self.removeFd(fd);
suspend |p| {
try self.addFd(fd, p);
}
}
pub fn stop(self: *Loop) void {
// TODO make atomic
self.keep_running = false;
// TODO activate an fd in the epoll set which should cancel all the promises
}
/// bring your own linked list node. this means it can't fail.
pub fn onNextTick(self: *Loop, node: *Node) void {
self.next_tick_queue.put(node);
}
pub fn run(self: *Loop) void {
while (self.keep_running) {
// TODO multiplex the next tick queue and the epoll event results onto a thread pool
while (self.next_tick_queue.get()) |node| {
resume node.data;
}
var events: [16]std.os.linux.epoll_event = undefined;
const count = std.os.linuxEpollWait(self.epollfd, events[0..], -1);
for (events[0..count]) |ev| {
const p = @intToPtr(promise, ev.data.ptr);
resume p;
}
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment