Skip to content

Instantly share code, notes, and snippets.

@kprotty
Last active March 20, 2024 15:42
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save kprotty/0d2dc3da4840341d6ff361b27bdac7dc to your computer and use it in GitHub Desktop.
Save kprotty/0d2dc3da4840341d6ff361b27bdac7dc to your computer and use it in GitHub Desktop.
Small & Fast synchronization primitives for Zig
const std = @import("std");
const system = std.os.system;
// https://vorbrodt.blog/2019/02/27/advanced-thread-pool/
pub fn main() !void {
return benchPool(DistributedPool);
}
const REPS = 1;
const SPREAD = 100;
const COUNT = 10_000_000;
fn benchPool(comptime Pool: type) !void {
const Task = struct {
index: usize,
runnable: Pool.Runnable = Pool.Runnable.init(run),
fn run(runnable: *Pool.Runnable) void {
const self = @fieldParentPtr(@This(), "runnable", runnable);
var prng = std.rand.DefaultPrng.init(self.index);
const rng = &prng.random;
var x: usize = undefined;
var reps: usize = REPS + (REPS * rng.uintLessThan(usize, 5));
while (reps > 0) : (reps -= 1) {
x = self.index + rng.int(usize);
}
var keep: usize = undefined;
@atomicStore(usize, &keep, x, .SeqCst);
}
};
const Spawner = struct {
index: usize,
tasks: []Task,
runnable: Pool.Runnable = Pool.Runnable.init(run),
fn run(runnable: *Pool.Runnable) void {
const self = @fieldParentPtr(@This(), "runnable", runnable);
for (self.tasks) |*task, offset| {
task.* = Task{ .index = self.index + offset };
Pool.schedule(&task.runnable);
}
}
};
const Root = struct {
tasks: []Task,
spawners: []Spawner,
runnable: Pool.Runnable = Pool.Runnable.init(run),
fn run(runnable: *Pool.Runnable) void {
const self = @fieldParentPtr(@This(), "runnable", runnable);
var offset: usize = 0;
const chunk = self.tasks.len / self.spawners.len;
for (self.spawners) |*spawner, index| {
spawner.* = Spawner{
.index = index,
.tasks = self.tasks[offset..][0..chunk],
};
offset += chunk;
Pool.schedule(&spawner.runnable);
}
}
};
const allocator = std.heap.page_allocator;
const tasks = try allocator.alloc(Task, COUNT);
defer allocator.free(tasks);
const spawners = try allocator.alloc(Spawner, SPREAD);
defer allocator.free(spawners);
var root = Root{
.tasks = tasks,
.spawners = spawners,
};
try Pool.run(&root.runnable);
}
const BasicPool = struct {
run_queue: ?*Runnable = null,
const Runnable = struct {
next: ?*Runnable = null,
callback: fn(*Runnable) void,
fn init(callback: fn(*Runnable) void) Runnable {
return .{ .callback = callback };
}
};
var tls_pool: ?*BasicPool = null;
fn run(runnable: *Runnable) !void {
var pool = BasicPool{};
pool.push(runnable);
const old = tls_pool;
tls_pool = &pool;
defer tls_pool = old;
while (pool.pop()) |next|
(next.callback)(next);
}
fn schedule(runnable: *Runnable) void {
tls_pool.?.push(runnable);
}
fn push(self: *BasicPool, runnable: *Runnable) void {
runnable.next = self.run_queue;
self.run_queue = runnable;
}
fn pop(self: *BasicPool) ?*Runnable {
const runnable = self.run_queue orelse return null;
self.run_queue = runnable.next;
return runnable;
}
};
const SharedPool = struct {
mutex: Mutex,
cond: Condvar,
run_queue: ?*Runnable,
running: usize,
const Runnable = struct {
next: ?*Runnable = null,
callback: fn(*Runnable) void,
fn init(callback: fn(*Runnable) void) Runnable {
return .{ .callback = callback };
}
};
var tls_pool: ?*SharedPool = null;
fn run(runnable: *Runnable) !void {
var self = SharedPool{
.mutex = Mutex.init(),
.cond = Condvar.init(),
.run_queue = runnable,
.running = std.math.max(1, std.Thread.cpuCount() catch 1),
};
const old_pool = tls_pool;
tls_pool = &self;
defer tls_pool = old_pool;
defer {
self.mutex.deinit();
self.cond.deinit();
}
const allocator = std.heap.page_allocator;
const threads = try allocator.alloc(*std.Thread, self.running - 1);
defer allocator.free(threads);
for (threads) |*thread|
thread.* = try std.Thread.spawn(&self, runWorker);
defer for (threads) |thread|
thread.wait();
self.runWorker();
}
fn schedule(runnable: *Runnable) void {
const self = tls_pool.?;
self.mutex.lock();
defer self.mutex.unlock();
runnable.next = self.run_queue;
self.run_queue = runnable;
self.cond.signal();
}
fn runWorker(self: *SharedPool) void {
self.mutex.lock();
defer self.mutex.unlock();
while (true) {
if (self.run_queue) |runnable| {
self.run_queue = runnable.next;
self.mutex.unlock();
(runnable.callback)(runnable);
self.mutex.lock();
continue;
}
self.running -= 1;
if (self.running == 0) {
self.cond.broadcast();
return;
}
self.cond.wait(&self.mutex);
if (self.running == 0) {
break;
} else {
self.running += 1;
}
}
}
};
const DistributedPool = struct {
idle: usize = 0,
workers: []Worker,
run_queue: UnboundedQueue,
semaphore: Semaphore = Semaphore.init(0),
const Runnable = struct {
next: ?*Runnable = null,
callback: fn(*Runnable) void,
fn init(callback: fn(*Runnable) void) Runnable {
return .{ .callback = callback };
}
};
fn run(runnable: *Runnable) !void {
const threads = std.math.max(1, std.Thread.cpuCount() catch 1);
const allocator = std.heap.page_allocator;
var self = DistributedPool{
.workers = try allocator.alloc(Worker, threads),
.run_queue = UnboundedQueue.init(),
};
defer {
self.semaphore.deinit();
self.run_queue.deinit();
allocator.free(self.workers);
}
for (self.workers) |*worker|
worker.* = Worker.init(&self);
defer for (self.workers) |*worker|
worker.deinit();
self.run_queue.pushFront(Batch.from(runnable));
for (self.workers[1..]) |*worker|
worker.thread = try std.Thread.spawn(worker, Worker.run);
defer for (self.workers[1..]) |*worker|
worker.thread.wait();
self.workers[0].run();
}
fn schedule(runnable: *Runnable) void {
const worker = Worker.current.?;
if (worker.run_queue.push(runnable)) |overflowed|
worker.run_queue_overflow.pushFront(overflowed);
worker.pool.notify(false);
}
const Idle = struct {
state: State,
waiting: usize,
const State = enum(u2){
pending = 0,
notified,
waking,
signalled,
};
fn pack(self: Idle) usize {
return @enumToInt(self.state) | (self.waiting << 2);
}
fn unpack(value: usize) Idle {
return Idle{
.state = @intToEnum(State, @truncate(u2, value)),
.waiting = value >> 2,
};
}
};
const Wait = enum {
retry,
waking,
shutdown,
};
fn wait(self: *DistributedPool, is_waking: bool) Wait {
var idle = Idle.unpack(@atomicLoad(usize, &self.idle, .SeqCst));
while (true) {
if (idle.waiting == self.workers.len - 1) {
self.semaphore.post();
return Wait.shutdown;
}
const notified = switch (idle.state) {
.notified => true,
.signalled => is_waking,
else => false,
};
var new_idle = idle;
if (notified) {
new_idle.state = if (is_waking) .waking else .pending;
} else {
new_idle.waiting += 1;
new_idle.state = if (is_waking) .notified else idle.state;
}
if (@cmpxchgWeak(
usize,
&self.idle,
idle.pack(),
new_idle.pack(),
.SeqCst,
.SeqCst,
)) |updated| {
idle = Idle.unpack(updated);
continue;
}
if (notified and is_waking)
return Wait.waking;
if (notified)
return Wait.retry;
self.semaphore.wait();
return Wait.waking;
}
}
fn notify(self: *DistributedPool, is_waking: bool) void {
var idle = Idle.unpack(@atomicLoad(usize, &self.idle, .SeqCst));
while (true) {
if (!is_waking and (idle.state == .notified or idle.state == .signalled))
return;
var new_idle = idle;
if (idle.waiting > 0 and (is_waking or idle.state == .pending)) {
new_idle.waiting -= 1;
new_idle.state = .waking;
} else if (!is_waking and idle.state == .waking) {
new_idle.state = .signalled;
} else {
new_idle.state = .notified;
}
if (@cmpxchgWeak(
usize,
&self.idle,
idle.pack(),
new_idle.pack(),
.SeqCst,
.SeqCst,
)) |updated| {
idle = Idle.unpack(updated);
continue;
}
if (idle.waiting > new_idle.waiting)
self.semaphore.post();
return;
}
}
const Worker = struct {
run_queue: BoundedQueue,
run_queue_overflow: UnboundedQueue,
thread: *std.Thread = undefined,
pool: *DistributedPool,
threadlocal var current: ?*Worker = null;
fn init(pool: *DistributedPool) Worker {
return Worker{
.pool = pool,
.run_queue = BoundedQueue.init(),
.run_queue_overflow = UnboundedQueue.init(),
};
}
fn deinit(self: *Worker) void {
self.run_queue.deinit();
self.run_queue_overflow.deinit();
}
fn run(self: *Worker) void {
const old = current;
current = self;
defer current = old;
var waking = false;
var tick = @ptrToInt(self);
var prng = @truncate(u32, tick >> @sizeOf(usize));
while (true) {
if (self.poll(.{
.tick = tick,
.rand = &prng,
})) |runnable| {
if (waking)
self.pool.notify(waking);
tick +%= 1;
waking = false;
(runnable.callback)(runnable);
continue;
}
waking = switch (self.pool.wait(waking)) {
.retry => false,
.waking => true,
.shutdown => break,
};
}
}
fn poll(self: *Worker, args: anytype) ?*Runnable {
if (args.tick % 256 == 0) {
if (self.steal(args)) |runnable|
return runnable;
}
if (args.tick % 128 == 0) {
if (self.run_queue.stealUnbounded(&self.pool.run_queue)) |runnable|
return runnable;
}
if (args.tick % 64 == 0) {
if (self.run_queue.stealUnbounded(&self.run_queue_overflow)) |runnable|
return runnable;
}
if (self.run_queue.pop()) |runnable|
return runnable;
if (self.run_queue.stealUnbounded(&self.run_queue_overflow)) |runnable|
return runnable;
var attempts: u8 = 32;
while (attempts > 0) : (attempts -= 1) {
if (self.steal(args)) |runnable|
return runnable;
if (self.run_queue.stealUnbounded(&self.pool.run_queue)) |runnable|
return runnable;
std.os.sched_yield() catch spinLoopHint();
}
return null;
}
fn steal(self: *Worker, args: anytype) ?*Runnable {
var workers = self.pool.workers;
var index = blk: {
var x = args.rand.*;
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
args.rand.* = x;
break :blk x % workers.len;
};
var iter = workers.len;
while (iter > 0) : (iter -= 1) {
const worker = &workers[index];
index += 1;
if (index == workers.len)
index = 0;
if (worker == self)
continue;
if (self.run_queue.stealBounded(&worker.run_queue)) |runnable|
return runnable;
if (self.run_queue.stealUnbounded(&worker.run_queue_overflow)) |runnable|
return runnable;
}
return null;
}
};
const Batch = struct {
size: usize = 0,
head: *Runnable = undefined,
tail: *Runnable = undefined,
fn isEmpty(self: Batch) bool {
return self.size == 0;
}
fn from(runnable: *Runnable) Batch {
runnable.next = null;
return Batch{
.size = 1,
.head = runnable,
.tail = runnable,
};
}
fn pushBack(self: *Batch, batch: Batch) void {
if (batch.isEmpty())
return;
if (self.isEmpty()) {
self.* = batch;
} else {
self.tail.next = batch.head;
self.tail = batch.tail;
self.size += batch.size;
}
}
fn pushFront(self: *Batch, batch: Batch) void {
if (batch.isEmpty())
return;
if (self.isEmpty()) {
self.* = batch;
} else {
batch.tail.next = self.head;
self.head = batch.head;
self.size += batch.size;
}
}
fn popFront(self: *Batch) ?*Runnable {
if (self.isEmpty())
return null;
const runnable = self.head;
self.head = runnable.next orelse undefined;
self.size -= 1;
return runnable;
}
};
const UnboundedQueue = struct {
mutex: Mutex,
batch: Batch = .{},
size: usize = 0,
fn init() UnboundedQueue {
return UnboundedQueue{ .mutex = Mutex.init() };
}
fn deinit(self: *UnboundedQueue) void {
self.mutex.deinit();
self.* = undefined;
}
fn pushBack(self: *UnboundedQueue, batch: Batch) void {
self.push(batch, .back);
}
fn pushFront(self: *UnboundedQueue, batch: Batch) void {
self.push(batch, .front);
}
fn push(self: *UnboundedQueue, batch: Batch, side: enum{ front, back }) void {
if (batch.isEmpty())
return;
self.mutex.lock();
defer self.mutex.unlock();
switch (side) {
.front => self.batch.pushFront(batch),
.back => self.batch.pushBack(batch),
}
@atomicStore(usize, &self.size, self.batch.size, .Release);
}
fn tryAcquireConsumer(self: *UnboundedQueue) ?Consumer {
if (@atomicLoad(usize, &self.size, .Acquire) == 0)
return null;
self.mutex.lock();
if (self.size == 0) {
self.mutex.unlock();
return null;
}
return Consumer{
.queue = self,
};
}
const Consumer = struct {
queue: *UnboundedQueue,
fn release(self: Consumer) void {
@atomicStore(usize, &self.queue.size, self.queue.batch.size, .Release);
self.queue.mutex.unlock();
}
fn pop(self: *Consumer) ?*Runnable {
return self.queue.batch.popFront();
}
};
};
const BoundedQueue = struct {
head: usize = 0,
tail: usize = 0,
buffer: [256]*Runnable = undefined,
fn init() BoundedQueue {
return .{};
}
fn deinit(self: *BoundedQueue) void {
self.* = undefined;
}
fn push(self: *BoundedQueue, runnable: *Runnable) ?Batch {
var tail = self.tail;
var head = @atomicLoad(usize, &self.head, .Monotonic);
while (true) {
if (tail -% head < self.buffer.len) {
@atomicStore(*Runnable, &self.buffer[tail % self.buffer.len], runnable, .Unordered);
@atomicStore(usize, &self.tail, tail +% 1, .Release);
return null;
}
const new_head = head +% (self.buffer.len / 2);
if (@cmpxchgWeak(
usize,
&self.head,
head,
new_head,
.Acquire,
.Monotonic,
)) |updated| {
head = updated;
continue;
}
var batch = Batch{};
while (head != new_head) : (head +%= 1)
batch.pushBack(Batch.from(self.buffer[head % self.buffer.len]));
batch.pushBack(Batch.from(runnable));
return batch;
}
}
fn pop(self: *BoundedQueue) ?*Runnable {
var tail = self.tail;
var head = @atomicLoad(usize, &self.head, .Monotonic);
while (tail != head) {
head = @cmpxchgWeak(
usize,
&self.head,
head,
head +% 1,
.Acquire,
.Monotonic,
) orelse return self.buffer[head % self.buffer.len];
}
return null;
}
fn stealUnbounded(self: *BoundedQueue, target: *UnboundedQueue) ?*Runnable {
var consumer = target.tryAcquireConsumer() orelse return null;
defer consumer.release();
const first_runnable = consumer.pop();
const head = @atomicLoad(usize, &self.head, .Monotonic);
const tail = self.tail;
var new_tail = tail;
while (new_tail -% head < self.buffer.len) {
const runnable = consumer.pop() orelse break;
@atomicStore(*Runnable, &self.buffer[new_tail % self.buffer.len], runnable, .Unordered);
new_tail +%= 1;
}
if (new_tail != tail)
@atomicStore(usize, &self.tail, new_tail, .Release);
return first_runnable;
}
fn stealBounded(self: *BoundedQueue, target: *BoundedQueue) ?*Runnable {
if (self == target)
return self.pop();
const head = @atomicLoad(usize, &self.head, .Monotonic);
const tail = self.tail;
if (tail != head)
return self.pop();
var target_head = @atomicLoad(usize, &target.head, .Monotonic);
while (true) {
const target_tail = @atomicLoad(usize, &target.tail, .Acquire);
const target_size = target_tail -% target_head;
if (target_size == 0)
return null;
var steal = target_size - (target_size / 2);
if (steal > target.buffer.len / 2) {
spinLoopHint();
target_head = @atomicLoad(usize, &target.head, .Monotonic);
continue;
}
const first_runnable = @atomicLoad(*Runnable, &target.buffer[target_head % target.buffer.len], .Unordered);
var new_target_head = target_head +% 1;
var new_tail = tail;
steal -= 1;
while (steal > 0) : (steal -= 1) {
const runnable = @atomicLoad(*Runnable, &target.buffer[new_target_head % target.buffer.len], .Unordered);
new_target_head +%= 1;
@atomicStore(*Runnable, &self.buffer[new_tail % self.buffer.len], runnable, .Unordered);
new_tail +%= 1;
}
if (@cmpxchgWeak(
usize,
&target.head,
target_head,
new_target_head,
.AcqRel,
.Monotonic,
)) |updated| {
target_head = updated;
continue;
}
if (new_tail != tail)
@atomicStore(usize, &self.tail, new_tail, .Release);
return first_runnable;
}
}
};
};
pub const Semaphore = struct {
mutex: Mutex,
cond: Condvar,
permits: usize,
pub fn init(permits: usize) Semaphore {
return .{
.mutex = Mutex.init(),
.cond = Condvar.init(),
.permits = permits,
};
}
pub fn deinit(self: *Semaphore) void {
self.mutex.deinit();
self.cond.deinit();
self.* = undefined;
}
pub fn wait(self: *Semaphore) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.permits == 0)
self.cond.wait(&self.mutex);
self.permits -= 1;
if (self.permits > 0)
self.cond.signal();
}
pub fn post(self: *Semaphore) void {
self.mutex.lock();
defer self.mutex.unlock();
self.permits += 1;
self.cond.signal();
}
};
pub const Mutex = if (std.builtin.os.tag == .windows)
struct {
srwlock: SRWLOCK,
pub fn init() Mutex {
return .{ .srwlock = SRWLOCK_INIT };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return TryAcquireSRWLockExclusive(&self.srwlock) != system.FALSE;
}
pub fn lock(self: *Mutex) void {
AcquireSRWLockExclusive(&self.srwlock);
}
pub fn unlock(self: *Mutex) void {
ReleaseSRWLockExclusive(&self.srwlock);
}
const SRWLOCK = usize;
const SRWLOCK_INIT: SRWLOCK = 0;
extern "kernel32" fn TryAcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) system.BOOL;
extern "kernel32" fn AcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void;
extern "kernel32" fn ReleaseSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void;
}
else if (std.builtin.link_libc)
struct {
mutex: if (std.builtin.link_libc) std.c.pthread_mutex_t else void,
pub fn init() Mutex {
return .{ .mutex = std.c.PTHREAD_MUTEX_INITIALIZER };
}
pub fn deinit(self: *Mutex) void {
const safe_rc = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const rc = std.c.pthread_mutex_destroy(&self.mutex);
std.debug.assert(rc == 0 or rc == safe_rc);
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return pthread_mutex_trylock(&self.mutex) == 0;
}
pub fn lock(self: *Mutex) void {
const rc = std.c.pthread_mutex_lock(&self.mutex);
if (rc != 0)
std.debug.panic("pthread_mutex_lock() = {}\n", .{rc});
}
pub fn unlock(self: *Mutex) void {
const rc = std.c.pthread_mutex_unlock(&self.mutex);
if (rc != 0)
std.debug.panic("pthread_mutex_unlock() = {}\n", .{rc});
}
extern "c" fn pthread_mutex_trylock(m: *std.c.pthread_mutex_t) callconv(.C) c_int;
}
else if (std.builtin.os.tag == .linux)
struct {
state: State,
const State = enum(i32) {
unlocked,
locked,
waiting,
};
pub fn init() Mutex {
return .{ .state = .unlocked };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return @cmpxchgStrong(
State,
&self.state,
.unlocked,
.locked,
.Acquire,
.Monotonic,
) == null;
}
pub fn lock(self: *Mutex) void {
switch (@atomicRmw(State, &self.state, .Xchg, .locked, .Acquire)) {
.unlocked => {},
else => |s| self.lockSlow(s),
}
}
fn lockSlow(self: *Mutex, current_state: State) void {
@setCold(true);
var new_state = current_state;
var spin: u8 = 0;
while (spin < 100) : (spin += 1) {
const state = @cmpxchgWeak(
State,
&self.state,
.unlocked,
new_state,
.Acquire,
.Monotonic,
) orelse return;
switch (state) {
.unlocked => {},
.locked => {},
.waiting => break,
}
var iter = std.math.min(32, spin + 1);
while (iter > 0) : (iter -= 1)
spinLoopHint();
}
new_state = .waiting;
while (true) {
switch (@atomicRmw(State, &self.state, .Xchg, new_state, .Acquire)) {
.unlocked => return,
else => {},
}
Futex.wait(
@ptrCast(*const i32, &self.state),
@enumToInt(new_state),
);
}
}
pub fn unlock(self: *Mutex) void {
switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) {
.unlocked => unreachable,
.locked => {},
.waiting => self.unlockSlow(),
}
}
fn unlockSlow(self: *Mutex) void {
@setCold(true);
Futex.wake(@ptrCast(*const i32, &self.state));
}
}
else
struct {
is_locked: bool,
pub fn init() Mutex {
return .{ .is_locked = false };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return @atomicRmw(bool, &self.is_locked, .Xchg, true, .Acquire) == false;
}
pub fn lock(self: *Mutex) void {
while (!self.tryLock())
spinLoopHint();
}
pub fn unlock(self: *Mutex) void {
@atomicStore(bool, &self.is_locked, false, .Release);
}
};
pub const Condvar = if (std.builtin.os.tag == .windows)
struct {
cond: CONDITION_VARIABLE,
pub fn init() Condvar {
return .{ .cond = CONDITION_VARIABLE_INIT };
}
pub fn deinit(self: *Condvar) void {
self.* = undefined;
}
pub fn wait(self: *Condvar, mutex: *Mutex) void {
const rc = SleepConditionVariableSRW(
&self.cond,
&mutex.srwlock,
system.INFINITE,
@as(system.ULONG, 0),
);
std.debug.assert(rc != system.FALSE);
}
pub fn signal(self: *Condvar) void {
WakeConditionVariable(&self.cond);
}
pub fn broadcast(self: *Condvar) void {
WakeAllConditionVariable(&self.cond);
}
const SRWLOCK = usize;
const CONDITION_VARIABLE = usize;
const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = 0;
extern "kernel32" fn WakeAllConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void;
extern "kernel32" fn WakeConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void;
extern "kernel32" fn SleepConditionVariableSRW(
c: *CONDITION_VARIABLE,
s: *SRWLOCK,
t: system.DWORD,
f: system.ULONG,
) callconv(system.WINAPI) system.BOOL;
}
else if (std.builtin.link_libc)
struct {
cond: if (std.builtin.link_libc) std.c.pthread_cond_t else void,
pub fn init() Condvar {
return .{ .cond = std.c.PTHREAD_COND_INITIALIZER };
}
pub fn deinit(self: *Condvar) void {
const safe_rc = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const rc = std.c.pthread_cond_destroy(&self.cond);
std.debug.assert(rc == 0 or rc == safe_rc);
self.* = undefined;
}
pub fn wait(self: *Condvar, mutex: *Mutex) void {
const rc = std.c.pthread_cond_wait(&self.cond, &mutex.mutex);
std.debug.assert(rc == 0);
}
pub fn signal(self: *Condvar) void {
const rc = std.c.pthread_cond_signal(&self.cond);
std.debug.assert(rc == 0);
}
pub fn broadcast(self: *Condvar) void {
const rc = std.c.pthread_cond_broadcast(&self.cond);
std.debug.assert(rc == 0);
}
}
else
struct {
pending: bool,
queue_mutex: Mutex,
queue_list: std.SinglyLinkedList(struct {
futex: i32 = 0,
fn wait(self: *@This()) void {
while (@atomicLoad(i32, &self.futex, .Acquire) == 0) {
if (@hasDecl(Futex, "wait")) {
Futex.wait(&self.futex, 0);
} else {
spinLoopHint();
}
}
}
fn notify(self: *@This()) void {
@atomicStore(i32, &self.futex, 1, .Release);
if (@hasDecl(Futex, "wake"))
Futex.wake(&self.futex);
}
}),
pub fn init() Condvar {
return .{
.pending = false,
.queue_mutex = Mutex.init(),
.queue_list = .{},
};
}
pub fn deinit(self: *Condvar) void {
self.queue_mutex.deinit();
self.* = undefined;
}
pub fn wait(self: *Condvar, mutex: *Mutex) void {
var waiter = @TypeOf(self.queue_list).Node{ .data = .{} };
{
self.queue_mutex.lock();
defer self.queue_mutex.unlock();
self.queue_list.prepend(&waiter);
@atomicStore(bool, &self.pending, true, .SeqCst);
}
mutex.unlock();
waiter.data.wait();
mutex.lock();
}
pub fn signal(self: *Condvar) void {
if (@atomicLoad(bool, &self.pending, .SeqCst) == false)
return;
const maybe_waiter = blk: {
self.queue_mutex.lock();
defer self.queue_mutex.unlock();
const maybe_waiter = self.queue_list.popFirst();
@atomicStore(bool, &self.pending, self.queue_list.first != null, .SeqCst);
break :blk maybe_waiter;
};
if (maybe_waiter) |waiter|
waiter.data.notify();
}
pub fn broadcast(self: *Condvar) void {
if (@atomicLoad(bool, &self.pending, .SeqCst) == false)
return;
@atomicStore(bool, &self.pending, false, .SeqCst);
var waiters = blk: {
self.queue_mutex.lock();
defer self.queue_mutex.unlock();
const waiters = self.queue_list;
self.queue_list = .{};
break :blk waiters;
};
while (waiters.popFirst()) |waiter|
waiter.data.notify();
}
};
const Futex = switch (std.builtin.os.tag) {
.linux => struct {
fn wait(ptr: *const i32, cmp: i32) void {
switch (system.getErrno(system.futex_wait(
ptr,
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT,
cmp,
null,
))) {
0 => {},
std.os.EINTR => {},
std.os.EAGAIN => {},
else => unreachable,
}
}
fn wake(ptr: *const i32) void {
switch (system.getErrno(system.futex_wake(
ptr,
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE,
@as(i32, 1),
))) {
0 => {},
std.os.EFAULT => {},
else => unreachable,
}
}
},
else => void,
};
fn spinLoopHint() void {
switch (std.builtin.arch) {
.i386, .x86_64 => asm volatile("pause" ::: "memory"),
.arm, .aarch64 => asm volatile("yield" ::: "memory"),
else => {},
}
}
pub fn ParkingLot(comptime Config: type) type {
return struct {
pub const Lock: type = Config.Lock;
pub const Event: type = Config.Event;
pub const nanotime: fn() u64 = switch (@hasDecl(Config, "nanotime")) {
true => Config.nanotime,
else => struct {
fn stub() u64 {
return 0;
}
}.stub,
};
const bucket_count: usize = switch (@hasDecl(Config, "bucket_count")) {
true => Config.bucket_count,
else => std.meta.bitCount(usize) << 2,
};
const tick_frequency_range: usize = switch (@hasDecl(Config, "tick_frequency_range")) {
true => Config.tick_frequency_range,
else => 1 * std.time.ns_per_ms,
};
const Bucket = struct {
lock: Lock = .{},
treap: usize = 0,
waiters: usize = 0,
next_tick: u64 = 0,
var table = [_]Bucket{Bucket{}} ** std.math.max(1, bucket_count);
fn from(address: usize) *Bucket {
return &table[address % table.len];
}
};
const Queue = struct {
head: ?*Waiter,
parent: ?*Waiter,
address: usize,
bucket: *Bucket,
fn find(bucket: *Bucket, address: usize) Queue {
@compileError("TODO: binary-search the bucket for the address");
}
fn link(self: *Queue, head: *Waiter) void {
self.head = head;
head.address = self.address;
head.ticket = self.rand(usize) | 1;
head.left = null;
head.right = null;
head.parent = self.parent;
if (self.parent) |parent| {
if (parent.address < head.address) {
parent.right = head;
} else {
parent.left = head;
}
} else {
if (self.bucket.treap & 1 != 0) {
head.prng = @truncate(u16, self.bucket.treap >> 1);
}
self.bucket.treap = @ptrToInt(head);
}
while (head.parent) |parent| {
if (parent.ticket > head.ticket) {
if (parent.left == head) {
self.rotate(parent, .right);
} else if (parent.right == head) {
self.rotate(parent, .left);
} else {
unreachable;
}
} else {
break;
}
}
}
fn relink(self: *Queue, noalias head: *Waiter, noalias new_head: *Waiter) void {
new_head.prng = head.prng;
new_head.ticket = head.ticket;
new_head.left = parent.left;
new_head.right = parent.right;
new_head.parent = head.parent;
if (head.left) |left| {
left.parent = new_head;
}
if (head.right) |right| {
right.parent = new_head;
}
if (head.parent) |parent| {
if (parent.address < head.address) {
parent.right = new_head;
} else {
parent.left = new_head;
}
}
}
fn unlink(self: *Queue, head: *Waiter) void {
assert(self.head == head);
self.head = null;
while ((head.right orelse head.left) != null) {
var direction = Direciton.left;
defer self.rotate(head, direction);
if (head.right) |right| {
if (head.left) |left| {
if (left.tick < right.ticket) {
direction = .right;
}
}
} else {
direction = .right;
}
}
if (head.parent) |parent| {
if (parent.left == head) {
parent.left = null;
} else {
parent.right = null;
}
} else {
assert(self.bucket.treap & 1 == 0);
assert(self.bucket.treap != 0);
self.bucket.treap = (@as(usize, head.prng) << 16) | 1;
}
}
fn isEmpty(self: Queue) bool {
return self.head == null;
}
fn isWaiting(waiter: *Waiter) bool {
return waiter.token != 0;
}
fn push(self: *Queue, waiter: *Waiter) void {
waiter.prev = null;
waiter.next = null;
waiter.ticket = 1;
const head = self.head orelse {
waiter.len = 1;
waiter.tail = null;
self.link(waiter);
return;
};
if (head.tail) |tail| {
waiter.prev = tail;
tail.next = waiter;
}
head.tail = waiter;
head.len += 1;
}
fn pop(self: *Queue) ?*Waiter {
const waiter = self.head orelse return null;
self.remove(waiter);
return waiter;
}
fn remove(self: *Queue, waiter: *Waiter) void {
assert(isWaiting(waiter));
assert(!self.isEmpty());
if (waiter.next) |next| {
next.prev = waiter.prev;
}
if (waiter.prev) |prev| {
prev.next = waiter.next;
}
const head = self.head orelse unreachable;
head.len -= 1;
if (waiter == head) {
if (waiter.next) |new_head| {
new_head.len = head.len;
new_head.tail = head.tail;
self.relink(head, new_head);
} else {
self.unlink(head);
}
} else if (waiter == head.tail) {
head.tail = waiter.prev;
}
waiter.prev = null;
waiter.next = null;
waiter.tail = null;
waiter.ticket = 0;
}
fn steal(noalias self: *Queue, noalias target: *Queue) void {
assert(!target.isEmpty());
const target_head = target.head orelse unreachable;
const target_len = target_head.len;
target.unlink(target_head);
if (self.head) |head| {
if (head.tail) |tail| {
tail.next = target_head;
target_head.prev = tail;
}
head.tail = target_head.tail;
head.len += target_len;
} else {
self.link(target_head);
}
if (target.bucket != self.bucket) {
_ = atomic.fetchSub(&target.bucket.waiters, target_len, .Relaxed);
_ = atomic.fetchAdd(&self.bucket.waiters, target_len, .Relaxed);
}
}
fn iter() Iter {
return Iter{ .waiter = self.head };
}
const Iter = struct {
waiter: ?*Waiter,
fn isEmpty(self: Iter) bool {
return self.waiter == null;
}
fn next(self: *Iter) ?*Waiter {
const waiter = self.waiter orelse return null;
self.waiter = waiter.next;
return waiter;
}
};
fn pollTick(self: Queue) bool {
const expires_ptr = &self.bucket.next_tick;
const expires = expires_ptr.*;
const timestamp: u64 = nanotime();
if (expires > 0 and (timestamp < expires)) {
return false;
}
const min_delay = tick_frequency_range - (tick_frequency_range / 2);
const timeout = min_deleay + (self.rand(u64) % min_delay);
expires_ptr.* = timestamp + timeout;
return true;
}
fn rand(self: *Queue, comptime Int: type) Int {
const prng = blk: {
const bucket = self.bucket;
assert(bucket.treap != 0);
if (bucket.treap & 1 == 0) {
break :blk &@intToPtr(*Waiter, bucket.treap).prng;
} else {
break :blk &@ptrCast([*]u16, &bucket.treap)[1];
}
};
if (prng.* == 0) {
var seed: u16 = 0;
prng.* = @truncate(u16, @ptrToInt(&seed) >> @sizeOf(u16)) | 1;
}
var value: Int = 0;
for (@ptrCast([*]u16, &rng)[0..(@sizeOf(Int) / @sizeOf(u16))]) |*chunk| {
var xorshift = prng.*;
xorshift ^= xorshift << 7;
xorshift ^= xorshift >> 9;
xorshift ^= xorshift << 8;
prng.* = xorshift;
chunk.* = xorshift;
}
return value;
}
};
const Waiter = struct {
prev: ?*Waiter,
next: ?*Waiter,
tail: ?*Waiter,
left: ?*Waiter,
right: ?*Waiter,
parent: ?*Waiter,
address: Address,
ticket: usize,
prng: u16,
len: usize,
token: usize,
event: Event,
};
pub fn park(
address: usize,
cancellation: ?Event.Cancellation,
context: anytype,
) error{Invalidated, Cancelled}!usize {
var waiter: Waiter = undefined;
var held: Lock.Held = undefined;
const bucket = Bucket.from(address);
{
bucket.lock.acquire(&held);
defer bucket.lock.release(&held);
_ = atomic.fetchAdd(&bucket.waiters, 1, .Relaxed);
waiter.token = context.onValidate() orelse {
_ = atomic.fetchSub(&bucket.waiters, 1, .Relaxed);
return error.Invalidated;
};
var queue = Queue.find(bucket, address);
queue.push(&waiter);
waiter.event.init();
}
_ = context.onBeforeWait();
var wait_result: bool = waiter.event.wait(cancellation);
var is_cancelled = wait_result == false;
if (is_cancelled) {
{
bucket.lock.acquire(&held);
defer bucket.lock.release(&held);
is_cancelled = Queue.isWaiting(waiter);
if (is_cancelled) {
var queue = Queue.find(bucket, address);
queue.remove(&waiter);
context.onTimeout(!queue.isEmpty());
_ = atomic.fetchSub(&bucket.waiters, 1, .Relaxed);
}
}
if (!is_cancelled) {
wait_result = waiter.event.wait(null);
assert(wait_result);
}
}
waiter.event.deinit();
if (is_cancelled) return error.Cancelled;
return waiter.token;
}
pub fn unpark(
address: usize,
requeue: ?usize,
context: anytype,
) void {
const bucket = Bucket.from(address);
if (atomic.load(&bucket.waiters, .Relaxed) == 0) {
return;
}
var unparked = blk: {
var held: Lock.Held = undefined;
bucket.lock.acquire(&held);
defer bucket.lock.release(&held);
var unparked = UnparkList{};
if (atomic.load(&bucket.waiters, .Relaxed) > 0) {
var queue = Queue.find(bucket, address);
if (requeue) |requeue_address| {
RequeueOp.apply(&queue, &unparked, requeue_address, context);
} else {
FilterOp.apply(&queue, &unparked, context);
}
}
if (unparked.len > 0) {
_ = atomic.fetchSub(&bucket.waiters, unparked.len, .Relaxed);
}
break :blk unparked;
};
while (unparked.pop()) |waiter| {
waiter.event.set();
}
}
const UnparkList = struct {
head: *Waiter = undefined,
tail: *Waiter = undefined,
len: usize = 0,
fn push(self: *UnparkList, waiter: *Waiter) void {
switch (self.len) {
0 => self.head = waiter,
else => self.tail.next = waiter,
}
waiter.next = null;
self.tail = waiter;
self.len += 1;
}
fn pop(self: *UnparkList) ?*Waiter {
if (self.len == 0) return null;
const waiter = self.head;
self.head = waiter.next orelse undefined;
self.len -= 1;
return waiter;
}
};
pub const RequeueOp = enum{
Abort,
UnparkOneLeaveRest,
UnparkOneRequeueRest,
RequeueOneLeaveRest,
RequeueAll,
fn apply(
queue: *Queue,
unparked: *UnparkList,
requeue_address: usize,
context: anytype,
) void {
const requeue_bucket = Bucket.from(requeue_address);
var requeue_held: Lock.Held = undefined;
requeue_bucket.lock.acquire(&requeue_held);
defer requeue_bucket.lock.release(&requeue_held);
const op: RequeueOp = context.onRequeue();
defer _ = context.onBeforeWake();
switch (op) {
.Abort => {},
.UnparkOneLeaveRest => {
if (queue.pop()) |waiter| {
unparked.push(waiter);
}
},
.UnparkOneRequeueRest => {
if (queue.pop()) |waiter| {
unparked.push(waiter);
}
if (!queue.isEmpty()) {
var target_queue = Queue.find(requeue_bucket, requeue_address);
target_queue.steal(queue);
}
},
.RequeueOneLeaveRest => {
if (queue.pop()) |waiter| {
var target_queue = Queue.find(requeue_bucket, requeue_address);
target_queue.push(waiter);
}
},
.RequeueAll => {
if (!queue.isEmpty()) {
var target_queue = Queue.find(requeue_bucket, requeue_address);
target_queue.steal(queue);
}
},
}
}
};
pub const FilterOp = union(enum) {
Stop,
Skip,
Unpark: usize,
pub const Context = struct {
token: usize,
has_more: bool,
queue: *Queue,
pub fn hasMore(self: Context) bool {
return self.has_more;
}
pub fn getToken(self: Context) usize {
return self.token;
}
pub fn didTick(self: Context) bool {
return queue.pollTick();
}
};
fn apply(
queue: *Queue,
unparked: *UnparkList,
context: anytype,
) void {
defer _ = context.onBeforeWake();
var iter = queue.iter();
while (iter.next()) |waiter| {
const op: FilterOp = context.onFilter(FilterOp.Context{
.token = waiter.token,
.has_more = !iter.isEmpty(),
.queue = queue,
});
switch (op) {
.Stop => break,
.Skip => continue,
.Unpark => |unpark_token| {
waiter.token = unpark_token;
queue.remove(waiter);
unparked.push(waiter);
},
}
}
}
};
};
}
// SPDX-License-Identifier: MIT
// Copyright (c) 2015-2020 Zig Contributors
// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
// The MIT license requires this copyright notice to be included in all copies
// and substantial portions of the software.
const std = @import("std");
const builtin = std.builtin;
const system = std.os.system;
const assert = std.debug.assert;
pub const atomic = struct {
pub fn spinLoopHint() void {
switch (builtin.arch) {
.i386, .x86_64 => asm volatile("pause"),
.arm, .aarch64 => asm volatile("yield"),
else => {},
}
}
pub const Ordering = enum {
unordered,
relaxed,
consume,
acquire,
release,
acq_rel,
seq_cst,
fn toBuiltin(comptime self: Ordering) builtin.AtomicOrder {
return switch (self) {
.unordered => .Unordered,
.relaxed => .Monotonic,
.consume => .Acquire,
.acquire => .Acquire,
.release => .Release,
.acq_rel => .AcqRel,
.seq_cst => .SeqCst,
};
}
};
pub fn fence(comptime ordering: Ordering) void {
@fence(comptime ordering.toBuiltin());
}
pub fn compilerFence(comptime ordering: Ordering) void {
switch (ordering) {
.unordered => @compileError("Unordered memory ordering can only be on atomic variables"),
.relaxed => @compileError("Relaxed memory ordering can only be on atomic variables"),
.consume => @compileError("Consume memory ordering can only be on atomic variables"),
else => asm volatile("" ::: "memory"),
}
}
pub fn load(ptr: anytype, comptime ordering: Ordering) @TypeOf(ptr.*) {
return @atomicLoad(@TypeOf(ptr.*), ptr, comptime ordering.toBuiltin());
}
pub fn store(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) void {
return @atomicStore(@TypeOf(ptr.*), ptr, value, comptime ordering.toBuiltin());
}
pub fn swap(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) @TypeOf(ptr.*) {
return atomicRmw(@TypeOf(ptr.*), ptr, .Xchg, value, ordering);
}
pub fn fetchAdd(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) @TypeOf(ptr.*) {
return atomicRmw(@TypeOf(ptr.*), ptr, .Add, value, ordering);
}
pub fn fetchSub(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) @TypeOf(ptr.*) {
return atomicRmw(@TypeOf(ptr.*), ptr, .Sub, value, ordering);
}
pub fn fetchAnd(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) @TypeOf(ptr.*) {
return atomicRmw(@TypeOf(ptr.*), ptr, .And, value, ordering);
}
pub fn fetchOr(ptr: anytype, value: @TypeOf(ptr.*), comptime ordering: Ordering) @TypeOf(ptr.*) {
return atomicRmw(@TypeOf(ptr.*), ptr, .Or, value, ordering);
}
inline fn atomicRmw(comptime T: type, ptr: *T, comptime op: builtin.AtomicRmwOp, value: T, comptime ordering: Ordering) T {
return @atomicRmw(T, ptr, op, value, comptime ordering.toBuiltin());
}
pub fn compareAndSwap(ptr: anytype, cmp: @TypeOf(ptr.*), xchg: @TypeOf(ptr.*), comptime success: Ordering, comptime failure: Ordering) ?@TypeOf(ptr.*) {
return @cmpxchgStrong(@TypeOf(ptr.*), ptr, cmp, xchg, comptime success.toBuiltin(), comptime failure.toBuiltin());
}
pub fn tryCompareAndSwap(ptr: anytype, cmp: @TypeOf(ptr.*), xchg: @TypeOf(ptr.*), comptime success: Ordering, comptime failure: Ordering) ?@TypeOf(ptr.*) {
return @cmpxchgStrong(@TypeOf(ptr.*), ptr, cmp, xchg, comptime success.toBuiltin(), comptime failure.toBuiltin());
}
};
pub fn nanotime() u64 {
if (builtin.os.tag == .windows) {
while (true) {
const now = @intToPtr(*volatile u64, 0x7FFE0000 + 0x8).*;
const high = @intToPtr(*volatile u32, 0x7FFE0000 + 0x8 + 8).*;
if (high == @truncate(u32, now >> 32))
return now * 100;
}
}
if (comptime std.Target.current.isDarwin()) {
var freq: system.mach_timebase_info_data = undefined;
system.mach_timebase_info(&freq);
var now = system.mach_absolute_time();
if (freq.numer != 1)
now *= freq.numer;
if (freq.denom != 1)
now *= freq.denom;
return now;
}
var ts: system.timespec = undefined;
std.os.clock_gettime(system.CLOCK_MONOTONIC, &ts) catch unreachable;
return @intCast(u64, ts.tv_sec) * @as(u64, std.time.ns_per_s) + @intCast(u64, ts.tv_nsec);
}
pub const Event =
if (comptime builtin.os.tag == .windows)
struct {
key: u8 align(4),
pub fn init(self: *Event) void {}
pub fn deinit(self: *Event) void {}
pub fn reset(self: *Event) void {}
pub fn wait(self: *Event, deadline: ?u64) error{TimedOut}!void {
var timeout: system.LARGE_INTEGER = undefined;
var timeout_ptr: ?*const @TypeOf(timeout) = null;
if (deadline) |deadline_ns| {
const now = nanotime();
if (now > deadline_ns)
return error.TimedOut;
timeout_ptr = &timeout;
timeout = -@intCast(system.LARGE_INTEGER, @divFloor(deadline_ns - now, 100));
}
const key = @ptrCast(*align(4) const c_void, &self.key);
return switch (NtWaitForKeyedEvent(null, key, system.FALSE, timeout_ptr)) {
.SUCCESS => {},
.TIMEOUT => error.TimedOut,
else => unreachable,
};
}
pub fn notify(self: *Event) void {
const key = @ptrCast(*align(4) const c_void, &self.key);
const status = NtReleaseKeyedEvent(null, key, system.FALSE, null);
assert(status == .SUCCESS);
}
pub fn yield(iteration: ?usize) bool {
const iter = iteration orelse {
system.kernel32.Sleep(0);
return false;
};
const max_iter = 4000;
if (iter > max_iter)
return false;
if (iter < 2000) {
atomic.spinLoopHint();
} else if (iter < 3000) {
_ = system.kernel32.SwitchToThread();
} else {
system.kernel32.Sleep(0);
}
return true;
}
extern "NtDll" fn NtWaitForKeyedEvent(
handle: ?system.HANDLE,
key: ?*align(4) const c_void,
alertable: system.BOOLEAN,
timeout: ?*const system.LARGE_INTEGER,
) callconv(system.WINAPI) system.NTSTATUS;
extern "NtDll" fn NtReleaseKeyedEvent(
handle: ?system.HANDLE,
key: ?*align(4) const c_void,
alertable: system.BOOLEAN,
timeout: ?*const system.LARGE_INTEGER,
) callconv(system.WINAPI) system.NTSTATUS;
}
else if (Futex.uses_os)
struct {
state: enum(u32) {
empty = 0,
waiting,
notified,
},
pub fn init(self: *Event) void {
self.reset();
}
pub fn deinit(self: *Event) void {
self.* = undefined;
}
pub fn reset(self: *Event) void {
self.state = .empty;
}
pub fn wait(self: *Event, deadline: ?u64) error{TimedOut}!void {
switch (atomic.swap(&self.state, .waiting, .acquire)) {
.empty => {},
.waiting => unreachable,
.notified => return,
}
while (true) {
var timeout: ?u64 = null;
if (deadline) |deadline_ns| {
const now = nanotime();
if (now > deadline_ns) {
return switch (atomic.swap(&self.state, .waiting, .acquire)) {
.empty => unreachable,
.waiting => error.TimedOut,
.notified => {},
};
} else {
timeout = deadline_ns - now;
}
}
Futex.wait(
@ptrCast(*const u32, &self.state),
@enumToInt(@TypeOf(self.state).waiting),
timeout,
) catch {};
switch (atomic.load(&self.state, .acquire)) {
.empty => unreachable,
.waiting => {},
.notified => return,
}
}
}
pub fn notify(self: *Event) void {
switch (atomic.swap(&self.state, .notified, .release)) {
.empty => return,
.waiting => {},
.notified => unreachable,
}
const notify_all = true;
Futex.wake(
@ptrCast(*const u32, &self.state),
notify_all,
);
}
pub fn yield(iteration: ?usize) bool {
return Futex.yield(iteration);
}
}
else if (builtin.link_libc)
struct {
state: enum{ empty, waiting, notified },
cond: if (!builtin.link_libc) void else system.pthread_cond_t,
mutex: if (!builtin.link_libc) void else system.pthread_mutex_t,
pub fn init(self: *Event) void {
self.* = .{
.state = .empty,
.cond = system.PTHREAD_COND_INITIALIZER,
.mutex = system.PTHREAD_MUTEX_INITIALIZER,
};
}
pub fn deinit(self: *Event) void {
const ok_err = switch (builtin.os.tag) {
.dragonfly, .openbsd => system.EINVAL,
else => 0,
};
const ret_cond = system.pthread_cond_destroy(&self.cond);
assert(ret_cond == 0 or ret_cond == ok_err);
const ret_mutex = system.pthread_mutex_destroy(&self.mutex);
assert(ret_mutex == 0 or ret_mutex == ok_err);
}
pub fn reset(self: *Event) void {
self.state = .empty;
}
pub fn wait(self: *Event, deadline: ?u64) error{TimedOut}!void {
assert(system.pthread_mutex_lock(&self.mutex) == 0);
defer assert(system.pthread_mutex_unlock(&self.mutex) == 0);
switch (self.state) {
.empty => self.state = .waiting,
.waiting => unreachable,
.notified => return,
}
while (true) {
switch (self.state) {
.empty => unreachable,
.waiting => {},
.notified => return,
}
const deadline_ns = deadline orelse {
assert(system.pthread_cond_wait(&self.cond, &self.mutex) == 0);
continue;
};
const now = nanotime();
if (now > deadline_ns)
return error.TimedOut;
var timeout = deadline_ns - now;
if (comptime std.Target.current.isDarwin()) {
var tv: system.timeval = undefined;
assert(system.gettimeofday(&tv, null) == 0);
timeout += @intCast(u64, tv.tv_sec) * std.time.ns_per_s;
timeout += @intCast(u64, tv.tv_usec) * std.time.ns_per_us;
} else {
var ts: system.timespec = undefined;
std.os.clock_gettime(system.CLOCK_REALTIME, &ts) catch unreachable;
timeout += @intCast(u64, ts.tv_sec) * std.time.ns_per_s;
timeout += @intCast(u64, ts.tv_nsec);
}
var ts: system.timespec = undefined;
ts.tv_sec = @intCast(@TypeOf(ts.tv_sec), @divFloor(timeout, std.time.ns_per_s));
ts.tv_nsec = @intCast(@TypeOf(ts.tv_nsec), @mod(timeout, std.time.ns_per_s));
switch (system.pthread_cond_timedwait(&self.cond, &self.mutex, &ts)) {
0 => {},
system.ETIMEDOUT => {},
system.EPERM => unreachable,
system.EINVAL => unreachable,
else => unreachable,
}
}
}
pub fn notify(self: *Event) void {
assert(system.pthread_mutex_lock(&self.mutex) == 0);
defer assert(system.pthread_mutex_unlock(&self.mutex) == 0);
switch (self.state) {
.empty => self.state = .notified,
.waiting => {
self.state = .notified;
assert(system.pthread_cond_signal(&self.cond) == 0);
},
.notified => unreachable,
}
}
pub fn yield(iteration: ?usize) bool {
const iter = iteration orelse {
_ = system.sched_yield();
return false;
};
if (iter < 100) {
atomic.spinLoopHint();
return true;
}
return false;
}
}
else
struct {
notified: bool,
pub fn init(self: *Event) void {
self.reset();
}
pub fn deinit(self: *Event) void {
self.* = undefined;
}
pub fn reset(self: *Event) void {
self.notified = false;
}
pub fn wait(self: *Event, deadline: ?u64) error{TimedOut}!void {
while (!atomic.load(&self.notified, .acquire))
atomic.spinLoopHint();
}
pub fn notify(self: *Event) void {
atomic.store(&self.notified, true, .release);
}
pub fn yield(iteration: usize) bool {
atomic.spinLoopHint();
return false;
}
};
pub const Lock =
if (std.builtin.os.tag == .windows)
struct {
srwlock: if (std.builtin.os.tag == .windows) SRWLOCK else void = SRWLOCK_INIT,
pub fn acquire(self: *Lock) void {
AcquireSRWLockExclusive(&self.srwlock);
}
pub fn release(self: *Lock) void {
ReleaseSRWLockExclusive(&self.srwlock);
}
const SRWLOCK = ?system.PVOID;
const SRWLOCK_INIT: SRWLOCK = null;
extern "kernel32" fn AcquireSRWLockExclusive(
srwlock: *SRWLOCK,
) callconv(system.WINAPI) void;
extern "kernel32" fn ReleaseSRWLockExclusive(
srwlock: *SRWLOCK,
) callconv(system.WINAPI) void;
}
else if (comptime std.Target.current.isDarwin())
struct {
lock: os_unfair_lock = OS_UNFAIR_LOCK_INIT,
pub fn acquire(self: *Lock) void {
os_unfair_lock_lock(&self.lock);
}
pub fn release(self: *Lock) void {
os_unfair_lock_unlock(&self.lock);
}
const os_unfair_lock = u32;
const os_unfair_lock_t = *os_unfair_lock;
const OS_UNFAIR_LOCK_INIT: os_unfair_lock = 0;
extern "c" fn os_unfair_lock_lock(
lock: os_unfair_lock_t,
) callconv(.C) void;
extern "c" fn os_unfair_lock_unlock(
lock: os_unfair_lock_t,
) callconv(.C) void;
}
else if (Futex.uses_os)
struct {
state: enum(u32){
unlocked = 0,
locked,
waiting,
} = .unlocked,
pub fn acquire(self: *Lock) void {
const state = atomic.swap(&self.state, .locked, .acquire);
if (state != .unlocked)
self.acquireSlow(state);
}
fn acquireSlow(self: *Lock, current_state: @TypeOf(@as(Lock, undefined).state)) void {
@setCold(true);
var spin_iter: usize = 0;
var lock_state = current_state;
while (true) {
while (true) {
switch (atomic.load(&self.state, .relaxed)) {
.unlocked => _ = atomic.tryCompareAndSwap(
&self.state,
.unlocked,
lock_state,
.acquire,
.relaxed,
) orelse return,
.locked => {},
.waiting => break,
}
if (Futex.yield(spin_iter)) {
spin_iter +%= 1;
} else {
break;
}
}
const state = atomic.swap(&self.state, .waiting, .acquire);
if (state == .unlocked)
return;
spin_iter = 0;
lock_state = .waiting;
Futex.wait(
@ptrCast(*const u32, &self.state),
@enumToInt(lock_state),
null,
) catch {};
}
}
pub fn release(self: *Lock) void {
switch (atomic.swap(&self.state, .unlocked, .release)) {
.unlocked => unreachable,
.locked => {},
.waiting => self.releaseSlow(),
}
}
fn releaseSlow(self: *Lock) void {
@setCold(true);
const notify_all = false;
Futex.wake(
@ptrCast(*const u32, &self.state),
notify_all,
);
}
}
else
struct {
state: usize = UNLOCKED,
const UNLOCKED = 0;
const LOCKED = 1;
const WAKING = 1 << 8;
const WAITING = ~@as(usize, (1 << 9) - 1);
const Waiter = struct {
prev: ?*Waiter align(std.math.max(@alignOf(usize), ~WAITING + 1)),
next: ?*Waiter,
tail: ?*Waiter,
event: Event,
};
inline fn tryAcquire(self: *Lock) bool {
return switch (builtin.arch) {
.i386, .x86_64 => asm volatile(
"lock btsw $0, %[ptr]"
: [ret] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.state)
: "cc", "memory"
) == 0,
else => atomic.swap(
@ptrCast(*u8, self.state),
LOCKED,
.acquire,
) == UNLOCKED,
};
}
pub fn acquire(self: *Lock) void {
if (!self.tryAcquire())
self.acquireSlow();
}
pub fn release(self: *Lock) void {
atomic.store(@ptrCast(*u8, &self.state), UNLOCKED, .release);
const state = atomic.load(&self.state, .relaxed);
if (state & WAITING != 0)
self.releaseSlow();
}
fn acquireSlow(self: *Lock) void {
@setCold(true);
var waiter: Waiter = undefined;
var has_event = false;
defer if (has_event)
waiter.event.deinit();
var spin_iter: usize = 0;
var state = atomic.load(&self.state, .relaxed);
while (true) {
if (state & LOCKED == 0) {
if (self.tryAcquire())
return;
std.os.sched_yield() catch unreachable;
state = atomic.load(&self.state, .relaxed);
continue;
}
const head = @intToPtr(?*Waiter, state & WAITING);
if (head == null and Event.yield(spin_iter)) {
spin_iter +%= 1;
state = atomic.load(&self.state, .relaxed);
continue;
}
waiter.prev = null;
waiter.next = head;
waiter.tail = if (head == null) &waiter else null;
if (!has_event) {
has_event = true;
waiter.event.init();
}
state = atomic.tryCompareAndSwap(
&self.state,
state,
(state & ~WAITING) | @ptrToInt(&waiter),
.release,
.relaxed,
) orelse blk: {
waiter.event.wait(null) catch unreachable;
waiter.event.reset();
spin_iter = 0;
break :blk atomic.load(&self.state, .relaxed);
};
}
}
fn releaseSlow(self: *Lock) void {
@setCold(true);
var state = atomic.load(&self.state, .relaxed);
while (true) {
if ((state & WAITING == 0) or (state & (LOCKED | WAKING) != 0))
return;
state = atomic.tryCompareAndSwap(
&self.state,
state,
state | WAKING,
.acquire,
.relaxed,
) orelse break;
}
state |= WAKING;
while (true) {
const head = @intToPtr(*Waiter, state & WAITING);
const tail = head.tail orelse blk: {
var current = head;
while (true) {
const next = current.next orelse unreachable;
next.prev = current;
current = next;
if (current.tail) |tail| {
head.tail = tail;
break :blk tail;
}
}
};
if (state & LOCKED != 0) {
state = atomic.tryCompareAndSwap(
&self.state,
state,
state & ~@as(usize, WAKING),
.release,
.acquire,
) orelse return;
continue;
}
if (tail.prev) |new_tail| {
head.tail = new_tail;
_ = atomic.fetchAnd(&self.state, ~@as(usize, WAKING), .release);
} else if (atomic.tryCompareAndSwap(
&self.state,
state,
UNLOCKED,
.release,
.acquire,
)) |updated| {
state = updated;
continue;
}
tail.event.notify();
return;
}
}
};
pub const parking_lot = struct {
const Bucket = struct {
lock: Lock = .{},
tree: Tree = .{},
fairness: Fairness = .{},
var array = [_]Bucket{Bucket{}} ** 256;
pub fn get(address: usize) *Bucket {
const seed = @truncate(usize, 0x9E3779B97F4A7C15);
const max = @popCount(usize, ~@as(usize, 0));
const bits = @ctz(usize, array.len);
const index = (address *% seed) >> (max - bits);
return &array[index];
}
};
const Fairness = struct {
xorshift: u32 = 0,
times_out: u64 = 0,
const interval = 1 * std.time.ns_per_ms;
pub fn expired(self: *Fairness) bool {
if (self.xorshift == 0)
self.xorshift = @truncate(u32, @ptrToInt(self) >> @sizeOf(u32));
const now = nanotime();
if (now < self.times_out)
return false;
self.xorshift ^= self.xorshift << 13;
self.xorshift ^= self.xorshift >> 17;
self.xorshift ^= self.xorshift << 5;
self.times_out = now + (self.xorshift % interval);
return true;
}
};
const Tree = struct {
tree_head: ?*Waiter = null,
pub fn insert(self: *Tree, address: usize, waiter: *Waiter) void {
waiter.address = address;
waiter.tree_next = null;
waiter.next = null;
waiter.tail = waiter;
if (self.lookup(address, &waiter.tree_prev)) |head| {
const tail = head.tail orelse unreachable;
tail.next = waiter;
waiter.prev = tail;
head.tail = waiter;
return;
}
waiter.prev = null;
if (waiter.tree_prev) |prev| {
prev.tree_next = waiter;
} else {
self.tree_head = waiter;
}
}
pub fn iter(self: *Tree, address: usize) Iter {
const head = self.lookup(address, null);
return .{
.head = head,
.iter = head,
.tree = self,
};
}
fn lookup(self: *Tree, address: usize, parent: ?*?*Waiter) ?*Waiter {
var waiter = self.tree_head;
if (parent) |p|
p.* = waiter;
while (true) {
const head = waiter orelse return null;
if (head.address == address)
return head;
waiter = head.tree_next;
if (parent) |p|
p.* = head;
}
}
fn replace(self: *Tree, waiter: *Waiter, new_waiter: *Waiter) void {
new_waiter.tree_next = waiter.tree_next;
new_waiter.tree_prev = waiter.tree_prev;
if (new_waiter.tree_prev) |prev|
prev.tree_next = new_waiter;
if (new_waiter.tree_next) |next|
next.tree_prev = new_waiter;
if (self.tree_head == waiter)
self.tree_head = new_waiter;
}
fn remove(self: *Tree, waiter: *Waiter) void {
if (waiter.tree_next) |next|
next.tree_prev = waiter.tree_prev;
if (waiter.tree_prev) |prev|
prev.tree_next = waiter.tree_next;
if (self.tree_head == waiter)
self.tree_head = null;
}
};
const Iter = struct {
head: ?*Waiter,
iter: ?*Waiter,
tree: *Tree,
pub fn isEmpty(self: Iter) bool {
return self.iter == null;
}
pub fn next(self: *Iter) ?*Waiter {
const waiter = self.iter orelse return null;
self.iter = waiter.next;
return waiter;
}
pub fn isQueueEmpty(self: Iter) bool {
return self.head == null;
}
pub fn tryQueueRemove(self: *Iter, waiter: *Waiter) bool {
const head = self.head orelse return false;
if (waiter.tail == null)
return false;
if (self.iter == waiter)
self.iter = waiter.next;
if (waiter.prev) |p|
p.next = waiter.next;
if (waiter.next) |n|
n.prev = waiter.prev;
if (waiter == head) {
self.head = waiter.next;
} else if (waiter == head.tail) {
head.tail = waiter.prev orelse unreachable;
}
if (waiter == head) {
if (self.head) |new_head| {
new_head.tail = waiter.tail;
self.tree.replace(waiter, new_head);
} else {
self.tree.remove(waiter);
}
}
waiter.tail = null;
return true;
}
};
const Waiter = struct {
tree_prev: ?*Waiter,
tree_next: ?*Waiter,
prev: ?*Waiter,
next: ?*Waiter,
tail: ?*Waiter,
address: usize,
token: usize,
event: Event,
};
pub const UnparkContext = struct {
token: *usize,
iter: *Iter,
fairness: *Fairness,
pub fn getToken(self: UnparkContext) usize {
return self.token.*;
}
pub fn hasMore(self: UnparkContext) bool {
return !self.iter.isEmpty();
}
pub fn beFair(self: UnparkContext) bool {
return self.fairness.expired();
}
};
pub fn parkConditionally(address: usize, deadline: ?u64, context: anytype) ?usize {
var bucket = Bucket.get(address);
bucket.lock.acquire();
const token: usize = context.onValidate() orelse {
bucket.lock.release();
return null;
};
var waiter: Waiter = undefined;
waiter.token = token;
bucket.tree.insert(address, &waiter);
waiter.event.init();
defer waiter.event.deinit();
bucket.lock.release();
var timed_out = false;
context.onBeforeWait();
waiter.event.wait(deadline) catch {
timed_out = true;
};
if (!timed_out)
return waiter.token;
bucket = Bucket.get(address);
bucket.lock.acquire();
var iter = bucket.tree.iter(address);
if (iter.tryQueueRemove(&waiter)) {
context.onTimeout(!iter.isEmpty());
bucket.lock.release();
return null;
}
bucket.lock.release();
waiter.event.wait(null) catch unreachable;
return waiter.token;
}
pub const UnparkFilter = union(enum) {
stop: void,
skip: void,
unpark: usize,
};
pub fn unparkFilter(address: usize, context: anytype) void {
const bucket = Bucket.get(address);
bucket.lock.acquire();
var wake_list: ?*Waiter = null;
var iter = bucket.tree.iter(address);
while (iter.next()) |waiter| {
switch (@as(UnparkFilter, context.onFilter(UnparkContext{
.token = &waiter.token,
.iter = &iter,
.fairness = &bucket.fairness,
}))) {
.stop => break,
.skip => continue,
.unpark => |new_token| {
assert(iter.tryQueueRemove(waiter));
waiter.token = new_token;
waiter.next = wake_list;
wake_list = waiter;
},
}
}
context.onBeforeWake();
bucket.lock.release();
while (true) {
const waiter = wake_list orelse break;
wake_list = waiter.next;
waiter.event.notify();
}
}
pub const UnparkResult = struct {
token: ?usize = null,
has_more: bool = false,
be_fair: bool = false,
};
pub fn unparkOne(address: usize, context: anytype) void {
const Context = @TypeOf(context);
const Filter = struct {
ctx: Context,
called_unpark: bool = false,
pub fn onFilter(this: *@This(), unpark_context: UnparkContext) UnparkFilter {
if (this.called_unpark)
return .stop;
const unpark_token: usize = this.ctx.onUnpark(UnparkResult{
.token = unpark_context.getToken(),
.has_more = unpark_context.hasMore(),
.be_fair = unpark_context.beFair(),
});
this.called_unpark = true;
return .{ .unpark = unpark_token };
}
pub fn onBeforeWake(this: @This()) void {
if (!this.called_unpark) {
_ = this.ctx.onUnpark(UnparkResult{});
}
}
};
var filter = Filter{ .ctx = context };
return unparkFilter(address, &filter);
}
pub fn unparkAll(address: usize) void {
const Filter = struct {
pub fn onBeforeWake(this: @This()) void {}
pub fn onFilter(this: @This(), _: UnparkContext) UnparkFilter {
return .{ .unpark = 0 };
}
};
var filter = Filter{};
return unparkFilter(address, filter);
}
};
pub const Futex =
if (std.builtin.os.tag == .linux)
struct {
pub const uses_os = true;
pub fn wait(ptr: *const u32, cmp: u32, timeout: ?u64) error{TimedOut}!void {
var ts: system.timespec = undefined;
var ts_ptr: ?*system.timespec = null;
if (timeout) |timeout_ns| {
ts_ptr = &ts;
ts.tv_sec = @intCast(isize, timeout_ns / std.time.ns_per_s);
ts.tv_nsec = @intCast(isize, timeout_ns % std.time.ns_per_s);
}
return switch (system.getErrno(std.os.linux.futex_wait(
@ptrCast(*const i32, ptr),
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT,
@bitCast(i32, cmp),
ts_ptr,
))) {
0 => {},
system.EINTR => {},
system.EAGAIN => {},
system.EFAULT => {},
system.ETIMEDOUT => error.TimedOut,
else => unreachable,
};
}
pub fn wake(ptr: *const u32, notify_all: bool) void {
return switch (system.getErrno(std.os.linux.futex_wake(
@ptrCast(*const i32, ptr),
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE,
if (notify_all) std.math.maxInt(i32) else 1,
))) {
0 => {},
system.EINVAL => {},
else => unreachable,
};
}
pub fn yield(iteration: ?usize) bool {
const cpu_yield = 4;
const thread_yield = 1;
const iter = iteration orelse cpu_yield;
if (iter < cpu_yield) {
var spin: u8 = 30;
while (spin > 0) : (spin -= 1)
atomic.spinLoopHint();
return true;
}
if (iter < cpu_yield + thread_yield) {
_ = system.sched_yield();
return true;
}
return false;
}
}
else if (comptime std.Target.current.isDarwin())
struct {
pub const uses_os = true;
pub fn wait(ptr: *const u32, cmp: u32, timeout: ?u64) error{TimedOut}!void {
var timeout_us = std.math.maxInt(u32);
if (timeout) |timeout_ns|
timeout_us = @intCast(u32, @divFloor(timeout_ns, std.time.ns_per_us));
const ret = __ulock_wait(
UL_COMPARE_AND_WAIT | ULF_NO_ERRNO,
@ptrCast(*c_void, ptr),
@as(u64, cmp),
timeout_us,
);
if (ret < 0) {
switch (-ret) {
system.EINTR => {},
system.EFAULT => {},
system.ETIMEDOUT => return error.TimedOut,
else => unreachable,
}
}
}
pub fn wake(ptr: *const u32, notify_all: bool) void {
var operation: u32 = UL_COMPARE_AND_WAIT | ULF_NO_ERRNO;
if (notify_all)
operation |= ULF_WAKE_ALL;
while (true) {
const ret = __ulock_wake(
operation,
@ptrCast(*c_void, ptr),
@as(u64, 0),
);
if (ret < 0) {
switch (-ret) {
system.ENOENT => {},
system.EINTR => continue,
else => unreachable,
}
}
return;
}
}
pub fn yield(iteration: ?usize) bool {
@compileError("TODO");
}
const UL_COMPARE_AND_WAIT = 1;
const ULF_WAKE_ALL = 0x100;
const ULF_NO_ERRNO = 0x1000000;
extern "c" fn __ulock_wait(
operation: u32,
address: ?*c_void,
value: u64,
timeout_us: u32,
) callconv(.C) c_int;
extern "c" fn __ulock_wake(
operation: u32,
address: ?*c_void,
wake_value: u64,
) callconv(.C) c_int;
}
else
struct {
pub const uses_os = false;
pub fn wait(ptr: *const u32, cmp: u32, timeout: ?u64) error{TimedOut}!void {
const Parker = struct {
pointer: *const u32,
compare: u32,
timed_out: bool = false,
pub fn onValidate(this: @This()) ?usize {
if (atomic.load(this.pointer, .seq_cst) != this.compare)
return null;
return 0;
}
pub fn onBeforeWait(this: @This()) void {}
pub fn onTimeout(this: *@This(), _: parking_lot) void {
self.timed_out = true;
}
};
var parker = Parker{
.pointer = ptr,
.compare = cmp,
};
_ = parking_lot.parkConditionally(@ptrToInt(address), &parker);
if (parker.timed_out)
return error.TimedOut;
}
pub fn wake(ptr: *const u32, notify_all: bool) void {
if (notify_all)
return parking_lot.unparkAll(@ptrToInt(ptr));
const Unparker = struct {
pub fn onUnpark(this: @This(), result: UnparkResult) usize {
return 0;
}
};
parking_lot.unparkOne(@ptrToInt(ptr), Unparker{});
}
pub fn yield(iteration: ?usize) bool {
return Event.yield(iteration);
}
};
pub const ResetEvent = struct {
is_set: bool = false,
pub fn isSet(self: *const ResetEvent) bool {
return atomic.load(&self.is_set, .seq_cst);
}
pub fn reset(self: *ResetEvent) void {
return atomic.store(&self.is_set, false, .seq_cst);
}
pub fn wait(self: *ResetEvent) void {
self.waitInner(null) catch unreachable;
}
pub fn tryWaitFor(self: *ResetEvent, duration: u64) error{TimedOut}!void {
return self.tryWaitUntil(nanotime() + duration);
}
pub fn tryWaitUntil(self: *ResetEvent, deadline: u64) error{TimedOut}!void {
return self.waitInner(deadline);
}
fn waitInner(self: *ResetEvent, deadline: ?u64) error{TimedOut}!void {
const Parker = struct {
event: *ResetEvent,
timed_out: bool = false,
pub fn onValidate(this: @This()) ?usize {
if (this.event.isSet())
return null;
return 0;
}
pub fn onBeforeWait(this: @This()) void {}
pub fn onTimeout(this: *@This(), has_more: bool) void {
self.timed_out = true;
}
};
var parker = Parker{ .event = self };
while (true) {
if (this.event.isSet())
return;
if (parker.timed_out)
return error.TimedOut;
_ = parking_lot.parkConditionally(@ptrToInt(self), deadline, &parker);
}
}
pub fn set(self: *ResetEvent) void {
atomic.store(&self.is_set, true, .seq_cst);
parking_lot.unparkAll(@ptrToInt(self));
}
};
pub const Mutex = struct {
state: u8 = UNLOCKED,
const UNLOCKED = 0;
const LOCKED = 1;
const PARKED = 2;
const RETRY = 0;
const HANDOFF = 1;
const TIMEOUT = 2;
const is_x86 = switch (builtin.arch) {
.i386, .x86_64 => true,
else => false,
};
/// Try to acquire the mutex by setting the lock bit using x86 specific optimizations.
inline fn tryAcquirex86(self: *Mutex) bool {
// "lock btsl" has a smaller instruction-cache hit than "lock cmpxchg" as below.
// Its safe to use a dword instruction to operate on byte memory as, on x86,
// if any part of the address is valid, then the instruction succeeds AFAIK.
return asm volatile(
"lock btsl $0, %[ptr]"
: [ret] "={@ccc}" (-> u8),
: [ptr] "*m" (&self.state)
: "cc", "memory"
) == 0;
}
pub fn tryAcquire(self: *Mutex) bool {
if (is_x86)
return self.tryAcquirex86();
var state = atomic.load(&self.state, .relaxed);
while (true) {
if (state & LOCKED != 0)
return false;
state = atomic.tryCompareAndSwap(
&self.state,
state,
state | LOCKED,
.acquire,
.relaxed,
) orelse return true;
}
}
pub fn acquire(self: *Mutex) void {
self.acquireInner(null) catch unreachable;
}
pub fn tryAcquireFor(self: *Mutex, duration: u64) error{TimedOut}!void {
return self.tryAcquireUntil(nanotime() + duration);
}
pub fn tryAcquireUntil(self: *Mutex, deadline: u64) error{TimedOut}!void {
return self.acquireInner(deadline);
}
pub inline fn release(self: *Mutex) void {
self.releaseInner(false);
}
pub inline fn releaseFair(self: *Mutex) void {
self.releaseInner(true);
}
inline fn acquireInner(self: *Mutex, deadline: ?u64) error{TimedOut}!void {
if (is_x86) {
if (self.tryAcquirex86())
return;
} else if (atomic.tryCompareAndSwap(
&self.state,
UNLOCKED,
LOCKED,
.acquire,
.relaxed,
) == null) {
return;
}
return self.acquireSlow(deadline);
}
inline fn releaseInner(self: *Mutex, be_fair: bool) void {
if (atomic.tryCompareAndSwap(
&self.state,
LOCKED,
UNLOCKED,
.release,
.relaxed,
)) |_| {
self.releaseSlow(be_fair);
}
}
fn acquireSlow(self: *Mutex, deadline: ?u64) error{TimedOut}!void {
@setCold(true);
var spin_iter: usize = 0;
var state = atomic.load(&self.state, .relaxed);
while (true) {
if (state & LOCKED == 0) {
if (is_x86) {
if (self.tryAcquirex86())
return;
} else if (atomic.tryCompareAndSwap(
&self.state,
state,
state | LOCKED,
.acquire,
.relaxed,
) == null) {
return;
}
_ = Event.yield(null);
state = atomic.load(&self.state, .relaxed);
continue;
}
if (state & PARKED == 0) {
if (Event.yield(spin_iter)) {
spin_iter +%= 1;
state = atomic.load(&self.state, .relaxed);
continue;
}
if (atomic.tryCompareAndSwap(
&self.state,
state,
state | PARKED,
.relaxed,
.relaxed,
)) |updated| {
state = updated;
continue;
}
}
const Parker = struct {
mutex: *Mutex,
timed_out: bool = false,
pub fn onValidate(this: @This()) ?usize {
if (atomic.load(&this.mutex.state, .relaxed) != (LOCKED | PARKED))
return null;
return 0;
}
pub fn onBeforeWait(this: @This()) void {}
pub fn onTimeout(this: *@This(), has_more: bool) void {
this.timed_out = true;
if (has_more) {
_ = atomic.fetchAnd(&this.mutex.state, ~@as(u8, PARKED), .relaxed);
}
}
};
var parker = Parker{ .mutex = self };
var token = parking_lot.parkConditionally(@ptrToInt(self), deadline, &parker);
if (parker.timed_out)
token = TIMEOUT;
switch (token orelse RETRY) {
RETRY => {},
TIMEOUT => return error.TimedOut,
HANDOFF => return,
else => unreachable,
}
spin_iter = 0;
state = atomic.load(&self.state, .relaxed);
}
}
fn releaseSlow(self: *Mutex, force_fair: bool) void {
@setCold(true);
var state = atomic.load(&self.state, .relaxed);
while (state == LOCKED) {
state = atomic.tryCompareAndSwap(
&self.state,
LOCKED,
UNLOCKED,
.release,
.relaxed,
) orelse return;
}
const Unparker = struct {
mutex: *Mutex,
force_fair: bool,
pub fn onUnpark(this: @This(), result: parking_lot.UnparkResult) usize {
if (result.token != null and (this.force_fair or result.be_fair)) {
if (!result.has_more)
atomic.store(&this.mutex.state, LOCKED, .relaxed);
return HANDOFF;
}
const new_state = if (result.token == null) @as(u8, UNLOCKED) else PARKED;
atomic.store(&this.mutex.state, new_state, .relaxed);
return RETRY;
}
};
parking_lot.unparkOne(@ptrToInt(self), Unparker{
.mutex = self,
.force_fair = force_fair,
});
}
};
pub const Condvar = struct {
has_waiters: bool = false,
pub fn wait(self: *Condvar, mutex: anytype) void {
return self.waitInner(mutex, null) catch unreachable;
}
pub fn tryWaitFor(self: *Condvar, mutex: anytype, duration: u64) error{TimedOut}!void {
return self.tryWaitUntil(mutex, nanotime() + duration);
}
pub fn tryWaitUntil(self: *Condvar, mutex: anytype, deadline: u64) error{TimedOut}!void {
return self.waitInner(mutex, deadline);
}
fn waitInner(self: *Condvar, mutex: anytype, deadline: ?u64) error{TimedOut}!void {
const CondMutex = @TypeOf(mutex);
const Parker = struct {
cond: *Condvar,
cond_mutex: CondMutex,
timed_out: bool = false,
pub fn onValidate(this: @This()) ?usize {
atomic.store(&this.cond.has_waiters, true, .seq_cst);
return 0;
}
pub fn onBeforeWait(this: @This()) void {
this.cond_mutex.release();
}
pub fn onTimeout(this: *@This(), has_more: bool) void {
this.timed_out = true;
}
};
var parker = Parker{
.cond = self,
.cond_mutex = mutex,
};
_ = parking_lot.parkConditionally(@ptrToInt(self), deadline, &parker);
mutex.acquire();
if (parker.timed_out)
return error.TimedOut;
}
pub fn notifyOne(self: *Condvar) void {
if (!atomic.load(&self.has_waiters, .seq_cst))
return;
const Unparker = struct {
cond: *Condvar,
pub fn onUnpark(this: @This(), result: parking_lot.UnparkResult) usize {
atomic.store(&this.cond.has_waiters, result.has_more, .seq_cst);
return 0;
}
};
parking_lot.unparkOne(@ptrToInt(self), Unparker{ .cond = self });
}
pub fn notifyAll(self: *Condvar) void {
if (!atomic.load(&self.has_waiters, .seq_cst))
return;
atomic.store(&self.has_waiters, false, .seq_cst);
parking_lot.unparkAll(@ptrToInt(self));
}
};
pub const WaitGroup = struct {
counter: usize = 0,
pub fn init(amount: usize) WaitGroup {
return .{ .counter = amount };
}
pub fn tryBegin(self: *WaitGroup, amount: usize) bool {
return self.apply(true, amount);
}
pub fn begin(self: *WaitGroup, amount: usize) void {
assert(self.tryBegin(amount));
}
pub fn tryEnd(self: *WaitGroup, amount: usize) bool {
return self.apply(false, amount);
}
pub fn end(self: *WaitGroup, amount: usize) void {
assert(self.tryEnd(amount));
}
pub fn tryUpdate(self: *WaitGroup, amount: isize) bool {
const is_add = amount > 0;
const value = @intCast(usize, if (add) amount else -amount);
return self.apply(is_add, value);
}
pub fn update(self: *WaitGroup, amount: isize) void {
assert(self.tryUpdate(amount));
}
fn apply(self: *WaitGroup, is_add: bool, amount: usize) bool {
const max = std.math.maxInt(usize);
if (amount == 0)
return true;
var counter = atomic.load(&self.counter, .seq_cst);
while (true) {
var new_counter: usize = undefined;
if (is_add) {
if (counter > max - amount)
return false;
new_counter = counter + amount;
} else {
if (amount > counter)
return false;
new_counter = counter - amount;
}
counter = atomic.tryCompareAndSwap(
&self.counter,
counter,
new_counter,
.seq_cst,
.seq_cst,
) orelse {
if (new_counter == 0)
parking_lot.unparkAll(@ptrToInt(self));
return true;
};
}
}
pub fn tryWait(self: *WaitGroup) bool {
return atomic.load(&self.counter, .relaxed);
}
pub fn wait(self: *WaitGroup) void {
return self.waitInner(null) catch unreachable;
}
pub fn tryWaitFor(self: *WaitGroup, duration: u64) error{TimedOut}!void {
return self.tryWaitUntil(nanotime() + duration);
}
pub fn tryWaitUntil(self: *WaitGroup, deadline: u64) error{TimedOut}!void {
return self.waitInner(deadline);
}
fn waitInner(self: *WaitGroup, deadline: ?u64) error{TimedOut}!void {
const Parker = struct {
wg: *WaitGroup,
timed_out: bool = false,
pub fn onValidate(this: @This()) ?usize {
if (atomic.load(&this.wg.counter, .seq_cst) == 0)
return null;
return 0;
}
pub fn onBeforeWait(this: @This()) void {}
pub fn onTimeout(this: *@This(), has_more: bool) void {
this.timed_out = true;
}
};
while (true) {
if (atomic.load(&self.counter, .seq_cst) == 0)
return;
var parker = Parker{ .wg = self };
_ = parking_lot.parkConditionally(@ptrToInt(self), deadline, &parker);
if (parker.timed_out)
return error.TimedOut;
}
}
};
pub const Semaphore = struct {
permits: usize = 0,
pub fn init(permits: usize) Semaphore {
return .{ .permits = permits };
}
pub fn tryAcquire(self: *Semaphore, permits: usize) bool {
var perms = atomic.load(&self.permits, .seq_cst);
while (true) {
if (perms < permits)
return false;
perms = atomic.tryCompareAndSwap(
&self.permits,
perms,
perms - permits,
.seq_cst,
.seq_cst,
) orelse return true;
}
}
pub fn acquire(self: *Semaphore, permits: usize) void {
self.acquireInner(permits, null) catch unreachable;
}
pub fn tryAcquireFor(self: *Semaphore, permits: usize, duration: u64) error{TimedOut}!void {
return self.tryAcquireUntil(permits, nanotime() + duration);
}
pub fn tryAcquireUntil(self: *Semaphore, permits: usize, deadline: u64) error{TimedOut}!void {
return self.acquireInner(permits, deadline);
}
fn acquireInner(self: *Semaphore, permits: usize, deadline: ?u64) error{TimedOut}!void {
const Parker = struct {
sema: *Semaphore,
perms: usize,
timed_out: bool = false,
pub fn onValidate(this: @This()) ?usize {
if (atomic.load(&this.sema.permits, .seq_cst) >= this.perms)
return null;
return this.perms;
}
pub fn onBeforeWait(this: @This()) void {}
pub fn onTimeout(this: *@This(), has_more: bool) void {
this.timed_out = true;
}
};
var parker = Parker{
.sema = self,
.perms = permits,
};
while (true) {
if (self.tryAcquire(permits))
return;
if (parker.timed_out)
return error.TimedOut;
_ = parking_lot.parkConditionally(@ptrToInt(self), deadline, &parker);
}
}
pub fn tryRelease(self: *Semaphore, permits: usize) bool {
var perms = atomic.load(&self.permits, .seq_cst);
while (true) {
if (perms > std.math.maxInt(usize) - permits)
return false;
perms = atomic.tryCompareAndSwap(
&self.permits,
perms,
perms + permits,
.seq_cst,
.seq_cst,
) orelse break;
}
const Filter = struct {
sema: *Semaphore,
consumed: usize = 0,
pub fn onBeforeWake(this: @This()) void {}
pub fn onFilter(this: *@This(), unpark_context: UnparkContext) UnparkFilter {
const waiter_perms = unpark_context.getToken();
const perms = atomic.load(&this.sema.permits, .seq_cst);
if (
(perms < this.consumed) or
((perms - this.consumed) < waiter_perms) or
(this.consumed > (std.math.maxInt(usize) - waiter_perms))
) {
return .stop;
}
this.consumed += waiter_perms;
return .{ .unpark = 0 };
}
};
var filter = Filter{ .sema = self };
parking_lot.unparkFilter(@ptrToInt(self), &filter);
return true;
}
pub fn release(self: *Semaphore, permits: usize) void {
assert(self.tryRelease(permits));
}
};
pub const RwLock = struct {
state: usize = 0,
// reader:
// - fast path: 0 -> ONE_READER
// - set has_parked & wait for !(has_parked | has_writer)
// - always handoff to read bit already added
//
// unlock:
// - dec ONE_READER, if readers != 0 or !has_parked, ret
// - unlock_common
//
// writer:
// - fast path: 0 -> HAS_WRITER
// - set has_parhed & wait for 0
// - always handoff to HAS_WRITER | ?(HAS_PARKED)
//
// unlock:
// - dec HAS_WRITER, if 0, ret
// - unlock_common
//
//
// unlock_common:
// - if
const HAS_WRITER = 1 << 0;
const HAS_PARKED = 1 << 1;
const ONE_READER = 1 << 2;
const READ_SHIFT = @ctz(usize, ONE_READER);
pub fn tryAcquireReader(self: *RwLock) bool {
var state = atomic.load(&self.state, .relaxed);
while (true) {
if (state & (HAS_PARKED | HAS_WRITER) != 0)
return false;
if ((state >> READ_SHIFT) == (std.math.maxInt(usize) >> READ_SHIFT))
return false;
state = atomic.tryCompareAndSwap(
&self.state,
state,
state + ONE_READER,
.acquire,
.relaxed,
) orelse return true;
}
}
pub fn acquireReader(self: *RwLock) void {
self.acquireReaderInner(null) catch unreachable;
}
pub fn tryAcquireReaderFor(self: *RwLock, duration: u64) error{TimedOut}!void {
return self.tryAcquireReaderUntil(nanotime() + duration);
}
pub fn tryAcquireReaderUntil(self: *RwLock, deadline: u64) error{TimedOut}!void {
return self.acquireReaderInner(deadline);
}
fn acquireReaderInner(self: *RwLock, deadline: ?u64) error{TimedOut}!void {
@compileError("TODO");
}
pub fn releaseReader(self: *RwLock) void {
@compileError("TODO");
}
pub fn tryAcquireWriter(self: *RwLock) bool {
@compileError("TODO");
}
pub fn acquireWriter(self: *RwLock) void {
@compileError("TODO");
}
pub fn tryAcquireWriterFor(self: *RwLock, duration: u64) error{TimedOut}!void {
@compileError("TODO");
}
pub fn tryAcquireWriterUntil(self: *RwLock, deadline: u64) error{TimedOut}!void {
@compileError("TODO");
}
pub fn releaseWriter(self: *RwLock) void {
@compileError("TODO");
}
};
pub const ThreadPool = struct {
lock: Mutex = .{},
cond: Condvar = .{},
allocator: *std.mem.Allocator,
is_running: bool = true,
spawned: usize = 0,
threads: []*std.Thread = &[_]*std.Thread{},
run_queue: std.SinglyLinkedList(fn(usize) void) = .{},
pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void {
self.* = .{ .allocator = allocator };
errdefer self.deinit();
const num_threads = std.math.max(1, std.Thread.cpuCount() catch 1);
self.threads = try allocator.alloc(*std.Thread, num_threads);
for (self.threads) |*thread| {
thread.* = try std.Thread.spawn(self, runWorker);
self.spawned += 1;
}
}
pub fn deinit(self: *ThreadPool) void {
{
self.lock.acquire();
defer self.lock.release();
self.is_running = false;
self.cond.notifyAll();
}
defer self.allocator.free(self.threads);
for (self.threads[0..self.spawned]) |thread|
thread.wait();
while (self.run_queue.popFirst()) |run_node|
(run_node.data)(@ptrToInt(run_node));
}
pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
const Args = @TypeOf(args);
const RunNode = @TypeOf(self.run_queue).Node;
const Closure = struct {
arguments: Args,
pool: *ThreadPool,
run_node: RunNode = .{ .data = run },
fn run(ptr: usize) void {
const run_node = @intToPtr(*RunNode, ptr);
const closure = @fieldParentPtr(@This(), "run_node", run_node);
const result = @call(.{}, func, closure.arguments);
closure.pool.lock.acquire();
defer closure.pool.lock.release();
closure.pool.allocator.destroy(closure);
}
};
self.lock.acquire();
defer self.lock.release();
const closure = try self.allocator.create(Closure);
closure.* = Closure{
.arguments = args,
.pool = self,
};
self.run_queue.prepend(&closure.run_node);
self.cond.notifyOne();
}
fn runWorker(self: *ThreadPool) void {
self.lock.acquire();
defer self.lock.release();
while (self.is_running) {
const run_node = self.run_queue.popFirst() orelse {
self.cond.wait(&held);
continue;
};
self.lock.release();
(run_node.data)(@ptrToInt(run_node));
self.lock.acquire();
}
}
};
pub const ThreadPool = struct {
state: usize = 0,
spawned: usize = 0,
run_queue: Queue,
idle_semaphore: Semaphore,
allocator: *std.mem.Allocator,
workers: []Worker = &[_]Worker{},
pub const InitConfig = struct {
allocator: ?*std.mem.Allocator = null,
max_threads: ?usize = null,
var default_gpa = std.heap.GeneralPurposeAllocator(.{}){};
var default_allocator = &default_gpa.allocator;
};
pub fn init(self: *ThreadPool, config: InitConfig) !void {
self.* = ThreadPool{
.run_queue = Queue.init(),
.idle_semaphore = Semaphore.init(0),
.allocator = config.allocator orelse InitConfig.default_allocator,
};
errdefer self.deinit();
const num_workers = std.math.max(1, config.max_threads orelse std.Thread.cpuCount() catch 1);
self.workers = try self.allocator.alloc(Worker, num_workers);
for (self.workers) |*worker| {
try worker.init(self);
@atomicStore(usize, &self.spawned, self.spawned + 1, .SeqCst);
}
}
pub fn deinit(self: *ThreadPool) void {
self.shutdown();
for (self.workers[0..self.spawned]) |*worker|
worker.deinit();
while (self.run_queue.pop()) |run_node|
(run_node.data.runFn)(&run_node.data);
self.allocator.free(self.workers);
self.idle_semaphore.deinit();
self.run_queue.deinit();
self.* = undefined;
}
pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
const Args = @TypeOf(args);
const Closure = struct {
func_args: Args,
allocator: *std.mem.Allocator,
run_node: RunNode = .{ .data = .{ .runFn = runFn } },
fn runFn(runnable: *Runnable) void {
const run_node = @fieldParentPtr(RunNode, "data", runnable);
const closure = @fieldParentPtr(@This(), "run_node", run_node);
const result = @call(.{}, func, closure.func_args);
closure.allocator.destroy(closure);
}
};
const allocator = self.allocator;
const closure = try allocator.create(Closure);
errdefer allocator.free(closure);
closure.* = Closure{
.func_args = args,
.allocator = allocator,
};
const run_node = &closure.run_node;
if (Worker.current) |worker| {
worker.run_queue.push(run_node);
} else {
self.run_queue.push(run_node);
}
self.notify();
}
const State = struct {
is_shutdown: bool = false,
is_notified: bool = false,
idle_workers: usize = 0,
fn pack(self: State) usize {
return (
(@as(usize, @boolToInt(self.is_shutdown)) << 0) |
(@as(usize, @boolToInt(self.is_notified)) << 1) |
(self.idle_workers << 2)
);
}
fn unpack(value: usize) State {
return State{
.is_shutdown = value & (1 << 0) != 0,
.is_notified = value & (1 << 1) != 0,
.idle_workers = value >> 2,
};
}
};
fn wait(self: *ThreadPool) error{Shutdown}!void {
var state = State.unpack(@atomicLoad(usize, &self.state, .SeqCst));
while (true) {
if (state.is_shutdown)
return error.Shutdown;
var new_state = state;
if (state.is_notified) {
new_state.is_notified = false;
} else {
new_state.idle_workers += 1;
}
if (@cmpxchgWeak(
usize,
&self.state,
state.pack(),
new_state.pack(),
.SeqCst,
.SeqCst,
)) |updated| {
state = State.unpack(updated);
continue;
}
if (!state.is_notified)
self.idle_semaphore.wait();
return;
}
}
fn notify(self: *ThreadPool) void {
var state = State.unpack(@atomicLoad(usize, &self.state, .SeqCst));
while (true) {
if (state.is_shutdown)
return;
var new_state = state;
if (state.is_notified) {
return;
} else if (state.idle_workers == 0) {
new_state.is_notified = true;
} else {
new_state.idle_workers -= 1;
}
if (@cmpxchgWeak(
usize,
&self.state,
state.pack(),
new_state.pack(),
.SeqCst,
.SeqCst,
)) |updated| {
state = State.unpack(updated);
continue;
}
if (!new_state.is_notified)
self.idle_semaphore.post();
return;
}
}
fn shutdown(self: *ThreadPool) void {
var state = State.unpack(@atomicRmw(
usize,
&self.state,
.Xchg,
(State{ .is_shutdown = true }).pack(),
.SeqCst,
));
while (state.idle_workers > 0) : (state.idle_workers -= 1)
self.idle_semaphore.post();
}
const Worker = struct {
thread: *std.Thread,
run_queue: Queue,
fn init(self: *Worker, pool: *ThreadPool) !void {
self.* = Worker{
.thread = undefined,
.run_queue = Queue.init(),
};
self.thread = std.Thread.spawn(RunConfig{
.worker = self,
.pool = pool,
}, Worker.run) catch |err| {
self.run_queue.deinit();
return err;
};
}
fn deinit(self: *Worker) void {
self.thread.wait();
self.run_queue.deinit();
self.* = undefined;
}
threadlocal var current: ?*Worker = null;
const RunConfig = struct {
worker: *Worker,
pool: *ThreadPool,
};
fn run(config: RunConfig) void {
const self = config.worker;
const pool = config.pool;
const old_current = current;
current = self;
defer current = old_current;
var tick = @ptrToInt(self);
var prng = std.rand.DefaultPrng.init(tick);
while (true) {
const run_node = self.poll(tick, pool, &prng.random) orelse {
pool.wait() catch break;
continue;
};
tick +%= 1;
(run_node.data.runFn)(&run_node.data);
}
}
fn poll(self: *Worker, tick: usize, pool: *ThreadPool, rand: *std.rand.Random) ?*RunNode {
if (tick % 128 == 0) {
if (self.steal(pool, rand, .fair)) |run_node|
return run_node;
}
if (tick % 64 == 0) {
if (self.run_queue.steal(&pool.run_queue, .fair)) |run_node|
return run_node;
}
if (self.run_queue.pop()) |run_node|
return run_node;
var attempts: usize = 8;
while (attempts > 0) : (attempts -= 1) {
if (self.steal(pool, rand, .unfair)) |run_node| {
return run_node;
} else {
std.os.sched_yield() catch spinLoopHint();
}
}
if (self.run_queue.steal(&pool.run_queue, .unfair)) |run_node|
return run_node;
return null;
}
fn steal(self: *Worker, pool: *ThreadPool, rand: *std.rand.Random, mode: anytype) ?*RunNode {
const spawned = @atomicLoad(usize, &pool.spawned, .SeqCst);
if (spawned < 2)
return null;
var index = rand.uintLessThan(usize, spawned);
var iter = spawned;
while (iter > 0) : (iter -= 1) {
const target = &pool.workers[index];
index += 1;
if (index == spawned)
index = 0;
if (target == self)
continue;
if (self.run_queue.steal(&target.run_queue, mode)) |run_node|
return run_node;
}
return null;
}
};
const Queue = struct {
mutex: Mutex,
size: usize,
list: List,
fn init() Queue {
return Queue{
.mutex = Mutex.init(),
.size = 0,
.list = .{},
};
}
fn deinit(self: *Queue) void {
self.mutex.deinit();
self.* = undefined;
}
fn push(self: *Queue, node: *List.Node) void {
self.mutex.lock();
defer self.mutex.unlock();
self.list.prepend(node);
@atomicStore(usize, &self.size, self.size + 1, .SeqCst);
}
fn pop(self: *Queue) ?*List.Node {
return self.popFrom(.head);
}
fn steal(self: *Queue, target: *Queue, mode: enum { fair, unfair }) ?*RunNode {
return target.popFrom(switch (mode) {
.fair => .tail,
.unfair => .head,
});
}
fn popFrom(self: *Queue, side: enum { head, tail }) ?*RunNode {
if (@atomicLoad(usize, &self.size, .SeqCst) == 0)
return null;
self.mutex.lock();
defer self.mutex.unlock();
// potential deadlock when all pops are fair..
const run_node = switch (side) {
.head => self.list.popFirst(),
.tail => self.list.pop(),
};
if (run_node != null)
@atomicStore(usize, &self.size, self.size - 1, .SeqCst);
return run_node;
}
};
const List = std.TailQueue(Runnable);
const RunNode = List.Node;
const Runnable = struct {
runFn: fn(*Runnable) void,
};
};
pub fn Channel(
comptime T: type,
comptime buffer_type: std.fifo.LinearFifoBufferType,
) type {
return struct {
mutex: Mutex,
putters: Condvar,
getters: Condvar,
buffer: Buffer,
is_closed: bool,
const Self = @This();
const Buffer = std.fifo.LinearFifo(T, buffer_type);
pub usingnamespace switch (buffer_type) {
.Static => struct {
pub fn init() Self {
return Self.withBuffer(Buffer.init());
}
},
.Slice => struct {
pub fn init(buf: []T) Self {
return Self.withBuffer(Buffer.init(buf));
}
},
.Dynamic => struct {
pub fn init(allocator: *std.mem.Allocator) Self {
return Self.withBuffer(Buffer.init(allocator));
}
},
};
fn withBuffer(buffer: Buffer) Self {
return Self{
.mutex = Mutex.init(),
.putters = Condvar.init(),
.getters = Condvar.init(),
.buffer = buffer,
.is_closed = false,
};
}
pub fn deinit(self: *Self) void {
self.mutex.deinit();
self.putters.deinit();
self.getters.deinit();
self.buffer.deinit();
self.* = undefined;
}
pub fn close(self: *Self) void {
self.mutex.lock();
defer self.mutex.unlock();
if (self.is_closed)
return;
self.is_closed = true;
self.putters.broadcast();
self.getters.broadcast();
}
pub fn tryWriteItem(self: *Self, item: T) !bool {
const wrote = try self.write(&[1]T{item});
return wrote == 1;
}
pub fn writeItem(self: *Self, item: T) !void {
return self.writeAll(&[1]T{item});
}
pub fn write(self: *Self, items: []const T) !usize {
return self.writeItems(items, false);
}
pub fn tryReadItem(self: *Self) !?T {
var items: [1]T = undefined;
if ((try self.read(&items)) != 1)
return null;
return items[0];
}
pub fn readItem(self: *Self) !T {
var items: [1]T = undefined;
try self.readAll(&items);
return items[0];
}
pub fn read(self: *Self, items: []T) !usize {
return self.readItems(items, false);
}
pub fn writeAll(self: *Self, items: []const T) !void {
std.debug.assert((try self.writeItems(items, true)) == items.len);
}
pub fn readAll(self: *Self, items: []T) !void {
std.debug.assert((try self.readItems(items, true)) == items.len);
}
fn writeItems(self: *Self, items: []const T, should_block: bool) !usize {
self.mutex.lock();
defer self.mutex.unlock();
var pushed: usize = 0;
while (pushed < items.len) {
const did_push = blk: {
if (self.is_closed)
return error.Closed;
self.buffer.writeItem(item) catch |err| {
if (buffer_type == .Dynamic)
return err;
break :blk false;
};
self.getters.signal();
break :blk true;
};
if (did_push) {
pushed += 1;
} else if (should_block) {
self.putters.wait(&self.mutex);
} else {
break;
}
}
return pushed;
}
fn readItems(self: *Self, items: []T, should_block: bool) !usize {
self.mutex.lock();
defer self.mutex.unlock();
var popped: usize = 0;
while (popped < items.len) {
const new_item = blk: {
if (self.buffer.readItem()) |item| {
self.putters.signal();
break :blk item;
}
if (self.is_closed)
return error.Closed;
break :blk null;
};
if (new_item) |item| {
items[popped] = item;
popped += 1;
} else if (should_block) {
self.getters.wait(&self.mutex);
} else {
break;
}
}
return popped;
}
};
}
pub const RwLock = if (std.builtin.os.tag != .windows and std.builtin.link_libc)
struct {
rwlock: if (std.builtin.os.tag != .windows) pthread_rwlock_t else void,
pub fn init() RwLock {
return .{ .rwlock = PTHREAD_RWLOCK_INITIALIZER };
}
pub fn deinit(self: *RwLock) void {
const safe_rc = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const rc = std.c.pthread_rwlock_destroy(&self.rwlock);
std.debug.assert(rc == 0 or rc == safe_rc);
self.* = undefined;
}
pub fn tryLock(self: *RwLock) bool {
return pthread_rwlock_trywrlock(&self.rwlock) == 0;
}
pub fn lock(self: *RwLock) void {
const rc = pthread_rwlock_wrlock(&self.rwlock);
std.debug.assert(rc == 0);
}
pub fn unlock(self: *RwLock) void {
const rc = pthread_rwlock_unlock(&self.rwlock);
std.debug.assert(rc == 0);
}
pub fn tryLockShared(self: *RwLock) bool {
return pthread_rwlock_tryrdlock(&self.rwlock) == 0;
}
pub fn lockShared(self: *RwLock) void {
const rc = pthread_rwlock_rdlock(&self.rwlock);
std.debug.assert(rc == 0);
}
pub fn unlockShared(self: *RwLock) void {
const rc = pthread_rwlock_unlock(&self.rwlock);
std.debug.assert(rc == 0);
}
const PTHREAD_RWLOCK_INITIALIZER = pthread_rwlock_t{};
const pthread_rwlock_t = switch (std.builtin.os.tag) {
.macos, .ios, .watchos, .tvos => extern struct {
__sig: c_long = 0x2DA8B3B4,
__opaque: [192]u8 = [_]u8{0} ** 192,
},
.linux => switch (std.builtin.abi) {
.android => switch (@sizeOf(usize)) {
4 => extern struct {
lock: std.c.pthread_mutex_t = std.c.PTHREAD_MUTEX_INITIALIZER,
cond: std.c.pthread_cond_t = std.c.PTHREAD_COND_INITIALIZER,
numLocks: c_int = 0,
writerThreadId: c_int = 0,
pendingReaders: c_int = 0,
pendingWriters: c_int = 0,
attr: i32 = 0,
__reserved: [12]u8 = [_]u8{0} ** 2,
},
8 => extern struct {
numLocks: c_int = 0,
writerThreadId: c_int = 0,
pendingReaders: c_int = 0,
pendingWriters: c_int = 0,
attr: i32 = 0,
__reserved: [36]u8 = [_]u8{0} ** 36,
},
else => unreachable,
},
else => extern struct {
size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56,
},
},
.fuchsia => extern struct {
size: [56]u8 align(@alignOf(usize)) = [_]u8{0} ** 56,
},
.emscripten => extern struct {
size: [32]u8 align(4) = [_]u8{0} ** 32,
},
.netbsd => extern struct {
ptr_magic: c_uint = 0x99990009,
ptr_interlock: switch (std.builtin.arch) {
.aarch64, .sparc, .x86_64, .i386 => u8,
.arm, .powerpc => c_int,
else => unreachable,
} = 0,
ptr_rblocked_first: ?*u8 = null,
ptr_rblocked_last: ?*u8 = null,
ptr_wblocked_first: ?*u8 = null,
ptr_wblocked_last: ?*u8 = null,
ptr_nreaders: c_uint = 0,
ptr_owner: std.c.pthread_t = null,
ptr_private: ?*c_void = null,
},
.haiku => extern struct {
flags: u32 = 0,
owner: i32 = -1,
lock_sem: i32 = 0,
lock_count: i32 = 0,
reader_count: i32 = 0,
writer_count: i32 = 0,
waiters: [2]?*c_void = [_]?*c_void{null, null},
},
.kfreebsd, .freebsd, .openbsd => extern struct {
ptr: ?*c_void = null,
},
.hermit => extern struct {
ptr: usize = std.math.maxInt(usize),
},
else => @compileError("pthread_rwlock_t not implemented for this platform"),
};
extern "c" fn pthread_rwlock_destroy(p: *pthread_rwlock_t) callconv(.C) c_int;
extern "c" fn pthread_rwlock_rdlock(p: *pthread_rwlock_t) callconv(.C) c_int;
extern "c" fn pthread_rwlock_wrlock(p: *pthread_rwlock_t) callconv(.C) c_int;
extern "c" fn pthread_rwlock_tryrdlock(p: *pthread_rwlock_t) callconv(.C) c_int;
extern "c" fn pthread_rwlock_trywrlock(p: *pthread_rwlock_t) callconv(.C) c_int;
extern "c" fn pthread_rwlock_unlock(p: *pthread_rwlock_t) callconv(.C) c_int;
}
else
struct {
/// https://github.com/bloomberg/rwl-bench/blob/master/bench11.cpp
state: usize,
mutex: Mutex,
semaphore: Semaphore,
const IS_WRITING: usize = 1;
const WRITER: usize = 1 << 1;
const READER: usize = 1 << (1 + std.meta.bitCount(Count));
const WRITER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, WRITER);
const READER_MASK: usize = std.math.maxInt(Count) << @ctz(usize, READER);
const Count = std.meta.Int(.unsigned, @divFloor(std.meta.bitCount(usize) - 1, 2));
pub fn init() RwLock {
return .{
.state = 0,
.mutex = Mutex.init(),
.semaphore = Semaphore.init(0),
};
}
pub fn deinit(self: *RwLock) void {
self.semaphore.deinit();
self.mutex.deinit();
self.* = undefined;
}
pub fn tryLock(self: *RwLock) bool {
if (self.mutex.tryLock()) {
const state = @atomicLoad(usize, &self.state, .SeqCst);
if (state & READER_MASK == 0) {
_ = @atomicRmw(usize, &self.state, .Or, IS_WRITING, .SeqCst);
return true;
}
self.mutex.unlock();
}
return false;
}
pub fn lock(self: *RwLock) void {
_ = @atomicRmw(usize, &self.state, .Add, WRITER, .SeqCst);
self.mutex.lock();
const state = @atomicRmw(usize, &self.state, .Or, IS_WRITING, .SeqCst);
if (state & READER_MASK != 0)
self.semaphore.wait();
}
pub fn unlock(self: *RwLock) void {
_ = @atomicRmw(usize, &self.state, .And, ~IS_WRITING, .SeqCst);
self.mutex.unlock();
}
pub fn tryLockShared(self: *RwLock) bool {
const state = @atomicLoad(usize, &self.state, .SeqCst);
if (state & (IS_WRITING | WRITER_MASK) == 0) {
_ = @cmpxchgStrong(
usize,
&self.state,
state,
state + READER,
.SeqCst,
.SeqCst,
) orelse return true;
}
if (self.mutex.tryLock()) {
_ = @atomicRmw(usize, &self.state, .Add, READER, .SeqCst);
self.mutex.unlock();
return true;
}
return false;
}
pub fn lockShared(self: *RwLock) void {
var state = @atomicLoad(usize, &self.state, .SeqCst);
while (state & (IS_WRITING | WRITER_MASK) == 0) {
state = @cmpxchgWeak(
usize,
&self.state,
state,
state + READER,
.SeqCst,
.SeqCst,
) orelse return;
}
self.mutex.lock();
_ = @atomicRmw(usize, &self.state, .Add, READER, .SeqCst);
self.mutex.unlock();
}
pub fn unlockShared(self: *RwLock) void {
const state = @atomicRmw(usize, &self.state, .Sub, READER, .SeqCst);
if ((state & READER_MASK == READER) and (state & IS_WRITING != 0))
self.semaphore.post();
}
};
pub const WaitGroup = struct {
mutex: Mutex,
cond: Condvar,
active: usize,
pub fn init() WaitGroup {
return .{
.mutex = Mutex.init(),
.cond = Condvar.init(),
.active = 0,
};
}
pub fn deinit(self: *WaitGroup) void {
self.mutex.deinit();
self.cond.deinit();
self.* = undefined;
}
pub fn add(self: *WaitGroup) void {
self.mutex.lock();
defer self.mutex.unlock();
self.active += 1;
}
pub fn done(self: *WaitGroup) void {
self.mutex.lock();
defer self.mutex.unlock();
self.active -= 1;
if (self.active == 0)
self.cond.signal();
}
pub fn wait(self: *WaitGroup) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.active != 0)
self.cond.wait(&self.mutex);
}
};
pub const Semaphore = struct {
mutex: Mutex,
cond: Condvar,
permits: usize,
pub fn init(permits: usize) Semaphore {
return .{
.mutex = Mutex.init(),
.cond = Condvar.init(),
.permits = permits,
};
}
pub fn deinit(self: *Semaphore) void {
self.mutex.deinit();
self.cond.deinit();
self.* = undefined;
}
pub fn wait(self: *Semaphore) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.permits == 0)
self.cond.wait(&self.mutex);
self.permits -= 1;
if (self.permits > 0)
self.cond.signal();
}
pub fn post(self: *Semaphore) void {
self.mutex.lock();
defer self.mutex.unlock();
self.permits += 1;
self.cond.signal();
}
};
pub const Mutex = if (std.builtin.os.tag == .windows)
struct {
srwlock: SRWLOCK,
pub fn init() Mutex {
return .{ .srwlock = SRWLOCK_INIT };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return TryAcquireSRWLockExclusive(&self.srwlock) != system.FALSE;
}
pub fn lock(self: *Mutex) void {
AcquireSRWLockExclusive(&self.srwlock);
}
pub fn unlock(self: *Mutex) void {
ReleaseSRWLockExclusive(&self.srwlock);
}
const SRWLOCK = usize;
const SRWLOCK_INIT: SRWLOCK = 0;
extern "kernel32" fn TryAcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) system.BOOL;
extern "kernel32" fn AcquireSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void;
extern "kernel32" fn ReleaseSRWLockExclusive(s: *SRWLOCK) callconv(system.WINAPI) void;
}
else if (std.builtin.link_libc)
struct {
mutex: if (std.builtin.link_libc) std.c.pthread_mutex_t else void,
pub fn init() Mutex {
return .{ .mutex = std.c.PTHREAD_MUTEX_INITIALIZER };
}
pub fn deinit(self: *Mutex) void {
const safe_rc = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const rc = std.c.pthread_mutex_destroy(&self.mutex);
std.debug.assert(rc == 0 or rc == safe_rc);
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return pthread_mutex_trylock(&self.mutex) == 0;
}
pub fn lock(self: *Mutex) void {
const rc = std.c.pthread_mutex_lock(&self.mutex);
std.debug.assert(rc == 0);
}
pub fn unlock(self: *Mutex) void {
const rc = std.c.pthread_mutex_unlock(&self.mutex);
std.debug.assert(rc == 0);
}
extern "c" fn pthread_mutex_trylock(m: *std.c.pthread_mutex_t) callconv(.C) c_int;
}
else if (std.builtin.os.tag == .linux)
struct {
state: State,
const State = enum(i32) {
unlocked,
locked,
waiting,
};
pub fn init() Mutex {
return .{ .state = .unlocked };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return @cmpxchgStrong(
State,
&self.state,
.unlocked,
.locked,
.Acquire,
.Monotonic,
) == null;
}
pub fn lock(self: *Mutex) void {
switch (@atomicRmw(State, &self.state, .Xchg, .locked, .Acquire)) {
.unlocked => {},
else => |s| self.lockSlow(s),
}
}
fn lockSlow(self: *Mutex, current_state: State) void {
@setCold(true);
var new_state = current_state;
while (true) {
var spin: u8 = 0;
while (spin < 100) : (spin += 1) {
const state = @cmpxchgWeak(
State,
&self.state,
.unlocked,
new_state,
.Acquire,
.Monotonic,
) orelse return;
switch (state) {
.unlocked => {},
.locked => {},
.waiting => break,
}
var iter = spin + 1;
while (iter > 0) : (iter -= 1)
spinLoopHint();
}
new_state = .waiting;
switch (@atomicRmw(State, &self.state, .Xchg, new_state, .Acquire)) {
.unlocked => return,
else => {},
}
Futex.wait(
@ptrCast(*const i32, &self.state),
@enumToInt(new_state),
);
}
}
pub fn unlock(self: *Mutex) void {
switch (@atomicRmw(State, &self.state, .Xchg, .unlocked, .Release)) {
.unlocked => unreachable,
.locked => {},
.waiting => self.unlockSlow(),
}
}
fn unlockSlow(self: *Mutex) void {
@setCold(true);
Futex.wake(@ptrCast(*const i32, &self.state));
}
}
else
struct {
is_locked: bool,
pub fn init() Mutex {
return .{ .is_locked = false };
}
pub fn deinit(self: *Mutex) void {
self.* = undefined;
}
pub fn tryLock(self: *Mutex) bool {
return @atomicRmw(bool, &self.is_locked, .Xchg, true, .Acquire) == false;
}
pub fn lock(self: *Mutex) void {
while (!self.tryLock())
spinLoopHint();
}
pub fn unlock(self: *Mutex) void {
@atomicStore(bool, &self.is_locked, false, .Release);
}
};
pub const Condvar = if (std.builtin.os.tag == .windows)
struct {
cond: CONDITION_VARIABLE,
pub fn init() Condvar {
return .{ .cond = CONDITION_VARIABLE_INIT };
}
pub fn deinit(self: *Condvar) void {
self.* = undefined;
}
pub fn wait(self: *Condvar, mutex: *Mutex) void {
const rc = SleepConditionVariableSRW(
&self.cond,
&mutex.srwlock,
system.INFINITE,
@as(system.ULONG, 0),
);
std.debug.assert(rc != system.FALSE);
}
pub fn signal(self: *Condvar) void {
WakeConditionVariable(&self.cond);
}
pub fn broadcast(self: *Condvar) void {
WakeAllConditionVariable(&self.cond);
}
const SRWLOCK = usize;
const CONDITION_VARIABLE = usize;
const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = 0;
extern "kernel32" fn WakeAllConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void;
extern "kernel32" fn WakeConditionVariable(c: *CONDITION_VARIABLE) callconv(system.WINAPI) void;
extern "kernel32" fn SleepConditionVariableSRW(
c: *CONDITION_VARIABLE,
s: *SRWLOCK,
t: system.DWORD,
f: system.ULONG,
) callconv(system.WINAPI) system.BOOL;
}
else if (std.builtin.link_libc)
struct {
cond: if (std.builtin.link_libc) std.c.pthread_cond_t else void,
pub fn init() Condvar {
return .{ .cond = std.c.PTHREAD_COND_INITIALIZER };
}
pub fn deinit(self: *Condvar) void {
const safe_rc = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const rc = std.c.pthread_cond_destroy(&self.cond);
std.debug.assert(rc == 0 or rc == safe_rc);
self.* = undefined;
}
pub fn wait(self: *Condvar, mutex: *Mutex) void {
const rc = std.c.pthread_cond_wait(&self.cond, &mutex.mutex);
std.debug.assert(rc == 0);
}
pub fn signal(self: *Condvar) void {
const rc = std.c.pthread_cond_signal(&self.cond);
std.debug.assert(rc == 0);
}
pub fn broadcast(self: *Condvar) void {
const rc = std.c.pthread_cond_broadcast(&self.cond);
std.debug.assert(rc == 0);
}
}
else
struct {
mutex: Mutex,
notified: bool,
waiters: std.SinglyLinkedList(Event),
pub fn init() Condvar {
return .{
.mutex = Mutex.init(),
.notified = false,
.waiters = .{},
};
}
pub fn deinit(self: *Condvar) void {
self.mutex.deinit();
self.* = undefined;
}
pub fn wait(self: *Condvar, mutex: *Mutex) void {
self.mutex.lock();
if (self.notified) {
self.notified = false;
self.mutex.unlock();
return;
}
var wait_node = @TypeOf(self.waiters).Node{ .data = .{} };
self.waiters.prepend(&wait_node);
self.mutex.unlock();
mutex.unlock();
wait_node.data.wait();
mutex.lock();
}
pub fn signal(self: *Condvar) void {
self.mutex.lock();
const maybe_wait_node = self.waiters.popFirst();
if (maybe_wait_node == null)
self.notified = true;
self.mutex.unlock();
if (maybe_wait_node) |wait_node|
wait_node.data.set();
}
pub fn broadcast(self: *Condvar) void {
self.mutex.lock();
var waiters = self.waiters;
self.notified = true;
self.mutex.unlock();
while (waiters.popFirst()) |wait_node|
wait_node.data.set();
}
const Event = struct {
futex: i32 = 0,
fn wait(self: *Event) void {
while (@atomicLoad(i32, &self.futex, .Acquire) == 0) {
if (@hasDecl(Futex, "wait")) {
Futex.wait(&self.futex, 0);
} else {
spinLoopHint();
}
}
}
fn set(self: *Event) void {
@atomicStore(i32, &self.futex, 1, .Release);
if (@hasDecl(Futex, "wake"))
Futex.wake(&self.futex);
}
};
};
const Futex = switch (std.builtin.os.tag) {
.linux => struct {
fn wait(ptr: *const i32, cmp: i32) void {
switch (system.getErrno(system.futex_wait(
ptr,
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT,
cmp,
null,
))) {
0 => {},
std.os.EINTR => {},
std.os.EAGAIN => {},
else => unreachable,
}
}
fn wake(ptr: *const i32) void {
switch (system.getErrno(system.futex_wake(
ptr,
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE,
@as(i32, 1),
))) {
0 => {},
std.os.EFAULT => {},
else => unreachable,
}
}
},
else => void,
};
fn spinLoopHint() void {
switch (std.builtin.arch) {
.i386, .x86_64 => asm volatile("pause" ::: "memory"),
.arm, .aarch64 => asm volatile("yield" ::: "memory"),
else => {},
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment