Skip to content

Instantly share code, notes, and snippets.

@rlapz
Last active November 24, 2022 17:52
Show Gist options
  • Save rlapz/b6a7e6e91f08eff25f8e67b68627e630 to your computer and use it in GitHub Desktop.
Save rlapz/b6a7e6e91f08eff25f8e67b68627e630 to your computer and use it in GitHub Desktop.
BufferQueue
const std = @import("std");
const Allocator = std.mem.Allocator;
const Thread = std.Thread;
const Mutex = Thread.Mutex;
const Condition = Thread.Condition;
pub fn BufferQueue(comptime T: type, comptime size: usize) type {
return struct {
allocator: Allocator,
is_alive: bool,
head: usize,
tail: usize,
count: usize,
buffer: []T,
condv: Condition,
mutex: Mutex,
const Self = @This();
pub fn init(allocator: Allocator) !Self {
return .{
.allocator = allocator,
.is_alive = true,
.head = 0,
.tail = 0,
.count = 0,
.buffer = try allocator.alloc(T, size),
.condv = .{},
.mutex = .{},
};
}
pub fn deinit(self: *Self) void {
self.kill();
self.allocator.free(self.buffer);
}
pub fn produce(self: *Self, data: T) !void {
self.mutex.lock();
defer self.mutex.unlock();
if (self.count == size)
return error.NoSpaceLeft;
var head = self.head;
self.buffer[head] = data;
head += 1;
self.head = if (head == size) 0 else head;
self.count += 1;
self.condv.signal();
}
pub fn consume(self: *Self) !T {
const ret = try self.peek();
try self.seen();
return ret;
}
pub fn peek(self: *Self) !T {
self.mutex.lock();
defer self.mutex.unlock();
if (self.count == 0)
return error.NoData;
return self.buffer[self.tail];
}
pub fn seen(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
if (self.count > 0) {
const tail = self.tail + 1;
self.tail = if (tail == size) 0 else tail;
self.count -= 1;
}
self.condv.signal();
}
// wait until space available (count < size)
pub fn produceWait(self: *Self, data: T, timeout_ns: u64) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.count == size and self.is_alive)
self.condv.timedWait(&self.mutex, timeout_ns) catch {};
if (!self.is_alive)
return;
var head = self.head;
self.buffer[head] = data;
head += 1;
self.head = if (head == size) 0 else head;
self.count += 1;
self.condv.signal();
}
// wait until item available (count > 0)
pub fn consumeWait(self: *Self, timeout_ns: u64) T {
const ret = self.peekWait(timeout_ns);
self.seen();
return ret;
}
pub fn peekWait(self: *Self, timeout_ns: u64) T {
self.mutex.lock();
defer self.mutex.unlock();
while (self.is_alive and self.count == 0)
self.condv.timedWait(&self.mutex, timeout_ns) catch {};
return self.buffer[self.tail];
}
pub fn isAlive(self: *Self) bool {
self.mutex.lock();
defer self.mutex.unlock();
return self.is_alive;
}
pub fn hasItem(self: *Self) bool {
self.mutex.lock();
defer self.mutex.unlock();
return self.count > 0;
}
pub fn kill(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
self.is_alive = false;
self.condv.broadcast();
}
};
}
//
// test
//
const time = std.time;
const Atomic = std.atomic.Atomic;
const dprint = std.debug.print;
const Buff = BufferQueue(u32, 10);
var num = Atomic(u32).init(0);
fn producer(buf: *Buff, id: u32) void {
while (buf.isAlive()) {
const __num = num.load(.Acquire);
if (__num == 9)
num.store(0, .Release)
else
num.store(__num + 1, .Release);
buf.produceWait(__num, time.ns_per_s);
dprint(" {} -> [{}]\n", .{ __num, id });
time.sleep(time.ms_per_s);
}
dprint("producer: {}: Exiting...\n", .{id});
}
fn consumer(buf: *Buff, id: u32) void {
var i: u32 = 0;
while (buf.hasItem()) {
const ret = buf.consumeWait(time.ns_per_s);
dprint("[{}] <- {}\n", .{ id, ret });
time.sleep(time.ns_per_s);
i += 1;
if (i == 9)
buf.kill();
}
dprint("consumer: {}: Exiting...\n", .{id});
}
test "4 producers, 2 consumers" {
dprint("\n", .{});
var buf = try Buff.init(std.testing.allocator);
defer buf.deinit();
var p1 = try Thread.spawn(.{}, producer, .{ &buf, 0 });
defer p1.join();
var p2 = try Thread.spawn(.{}, producer, .{ &buf, 1 });
defer p2.join();
var p3 = try Thread.spawn(.{}, producer, .{ &buf, 2 });
defer p3.join();
var p4 = try Thread.spawn(.{}, producer, .{ &buf, 3 });
defer p4.join();
var c1 = try Thread.spawn(.{}, consumer, .{ &buf, 0 });
defer c1.join();
var c2 = try Thread.spawn(.{}, consumer, .{ &buf, 1 });
defer c2.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment