Created
June 27, 2018 00:46
-
-
Save andrewrk/23cf64e0b151d2d64f644ea6ce553324 to your computer and use it in GitHub Desktop.
Channel implementation in zig
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub const Loop = struct { | |
allocator: *mem.Allocator, | |
epollfd: i32, | |
keep_running: bool, | |
next_tick_queue: std.atomic.Queue(promise), | |
pub const NextTickNode = std.atomic.Queue(promise).Node; | |
/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size | |
/// when buffer is empty, consumers suspend and are resumed by producers | |
/// when buffer is full, producers suspend and are resumed by consumers | |
pub fn Channel(comptime T: type) void { | |
return struct { | |
loop: *Loop, | |
getters: std.atomic.Queue(GetNode), | |
putters: std.atomic.Queue(PutNode), | |
get_count: usize, | |
put_count: usize, | |
dispatch_lock: u8, // TODO make this a bool | |
need_dispatch: u8, // TODO make this a bool | |
// simple fixed size ring buffer | |
buffer_nodes: []std.atomic.Queue(T).Node, | |
buffer_index: usize, | |
buffer_len: usize, | |
const SelfChannel = this; | |
const GetNode = struct { | |
ptr: *T, | |
handle: promise->T, | |
}; | |
const PutNode = struct { | |
data: T, | |
handle: promise->void, | |
}; | |
pub fn init(loop: *Loop, capacity: usize) !*SelfChannel { | |
const buffer_nodes = try loop.allocator.alloc(std.atomic.Queue(T).Node, capacity); | |
errdefer loop.allocator.free(buffer_nodes); | |
const self = try loop.allocator.create(SelfChannel{ | |
.loop = loop, | |
.buffer_len = 0, | |
.buffer_nodes = buffer_nodes, | |
.buffer_index = 0, | |
.dispatch_lock = 0, | |
.need_dispatch = 0, | |
.getters = undefined, | |
.putters = undefined, | |
.get_count = 0, | |
.put_count = 0, | |
}); | |
errdefer loop.allocator.destroy(self); | |
std.atomic.Queue(GetNode).init(&self.getters); | |
std.atomic.Queue(GetNode).init(&self.putters); | |
return self; | |
} | |
/// puts a data item in the channel. The promise completes when the value has been added to the | |
/// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter. | |
pub async fn put(self: *SelfChannel, data: T) void { | |
// TODO should be able to group memory allocation failure before first suspend point | |
// so that the async invocation catches it | |
const dispatch_handle = async self.dispatch() catch unreachable; | |
suspend |handle| { | |
var queue_node = std.atomic.Queue(PutNode).Node{ | |
.data = PutNode{ | |
.handle = handle, | |
.data = data, | |
}, | |
.next = undefined, | |
}; | |
self.putters.put(&queue_node); | |
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); | |
var tick_node = NextTickNode{ | |
.next = undefined, | |
.data = dispatch_handle, | |
}; | |
self.loop.onNextTick(&tick_node); | |
} | |
} | |
/// await this function to get an item from the channel. If the buffer is empty, the promise will | |
/// complete when the next item is put in the channel. | |
pub async fn get(self: *SelfChannel) T { | |
// TODO should be able to group memory allocation failure before first suspend point | |
// so that the async invocation catches it | |
const dispatch_handle = async self.dispatch() catch unreachable; | |
// TODO integrate this function with named return values | |
// so we can get rid of this extra result copy | |
var result: T = undefined; | |
suspend |handle| { | |
var queue_node = std.atomic.Queue(GetNode).Node{ | |
.data = GetNode{ | |
.ptr = &result, | |
.handle = handle, | |
}, | |
.next = undefined, | |
}; | |
self.getters.put(&queue_node); | |
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); | |
var tick_node = NextTickNode{ | |
.next = undefined, | |
.data = dispatch_handle, | |
}; | |
self.loop.onNextTick(&tick_node); | |
} | |
return result; | |
} | |
async fn dispatch(self: *SelfChannel) void { | |
suspend; // resumed by onNextTick | |
// set the "need dispatch" flag | |
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); | |
lock: while (true) { | |
// set the lock flag | |
const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst); | |
if (prev_lock != 0) return; | |
// clear the need_dispatch flag since we're about to do it | |
_ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); | |
while (true) { | |
one_dispatch: { | |
// later we correct these extra subtractions | |
var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); | |
var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); | |
// transfer self.buffer to self.getters | |
while (self.buffer_len != 0) { | |
if (get_count == 0) break :one_dispatch; | |
const get_node = self.getters.get().?; | |
get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len]; | |
self.loop.onNextTick(get_node.handle); | |
self.buffer_len -= 1; | |
get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); | |
} | |
// direct transfer self.putters to self.getters | |
while (get_count != 0 and put_count != 0) { | |
const get_node = self.getters.get().?; | |
const put_node = self.putters.get().?; | |
get_node.ptr.* = put_node.data; | |
self.loop.onNextTick(get_node.handle); | |
self.loop.onNextTick(put_node.handle); | |
get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); | |
put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst); | |
} | |
// transfer self.putters to self.buffer | |
while (self.buffer_len != self.buffer_nodes.len and put_count != 0) { | |
const put_node = self.putters.get().?; | |
self.buffer_nodes[self.buffer_index] = put_node.data; | |
self.loop.onNextTick(put_node.handle); | |
self.buffer_index +%= 1; | |
self.buffer_len += 1; | |
} | |
} | |
// undo the extra subtractions | |
_ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); | |
_ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst); | |
// clear need-dispatch flag | |
const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); | |
if (need_dispatch != 0) continue; | |
const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst); | |
assert(my_lock != 0); | |
// we have to check again now that we unlocked | |
if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock; | |
return; | |
} | |
} | |
} | |
}; | |
} | |
fn init(allocator: *mem.Allocator) !Loop { | |
const epollfd = try std.os.linuxEpollCreate(std.os.linux.EPOLL_CLOEXEC); | |
return Loop{ | |
.keep_running = true, | |
.allocator = allocator, | |
.epollfd = epollfd, | |
}; | |
} | |
pub fn addFd(self: *Loop, fd: i32, prom: promise) !void { | |
var ev = std.os.linux.epoll_event{ | |
.events = std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET, | |
.data = std.os.linux.epoll_data{ .ptr = @ptrToInt(prom) }, | |
}; | |
try std.os.linuxEpollCtl(self.epollfd, std.os.linux.EPOLL_CTL_ADD, fd, &ev); | |
} | |
pub fn removeFd(self: *Loop, fd: i32) void { | |
std.os.linuxEpollCtl(self.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {}; | |
} | |
async fn waitFd(self: *Loop, fd: i32) !void { | |
defer self.removeFd(fd); | |
suspend |p| { | |
try self.addFd(fd, p); | |
} | |
} | |
pub fn stop(self: *Loop) void { | |
// TODO make atomic | |
self.keep_running = false; | |
// TODO activate an fd in the epoll set which should cancel all the promises | |
} | |
/// bring your own linked list node. this means it can't fail. | |
pub fn onNextTick(self: *Loop, node: *Node) void { | |
self.next_tick_queue.put(node); | |
} | |
pub fn run(self: *Loop) void { | |
while (self.keep_running) { | |
// TODO multiplex the next tick queue and the epoll event results onto a thread pool | |
while (self.next_tick_queue.get()) |node| { | |
resume node.data; | |
} | |
var events: [16]std.os.linux.epoll_event = undefined; | |
const count = std.os.linuxEpollWait(self.epollfd, events[0..], -1); | |
for (events[0..count]) |ev| { | |
const p = @intToPtr(promise, ev.data.ptr); | |
resume p; | |
} | |
} | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment