Skip to content

Instantly share code, notes, and snippets.

@voluntas
Forked from kprotty/ThreadPool.zig
Created July 16, 2022 14:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save voluntas/4ad9f7a167c7916956a0ae1fbe80c600 to your computer and use it in GitHub Desktop.
Save voluntas/4ad9f7a167c7916956a0ae1fbe80c600 to your computer and use it in GitHub Desktop.
An isolated thread pool implementation for Zig
const std = @import("std");
const system = std.os.system;
pub const Scheduler = struct {
pub const InitConfig = struct {
max_threads: ?u16 = null,
};
pub fn init(self: *Scheduler, config: InitConfig) !void {
const max_threads = if (std.builtin.single_threaded)
@as(u16, 1)
else if (config.max_threads) |max_threads|
std.math.max(1, max_threads)
else
std.math.max(1, @intCast(u16, std.Thread.cpuCount() catch 1));
}
pub fn deinit(self: *Scheduler) void {
}
pub const ScheduleConfig = struct {
};
pub fn schedule(self: *Scheduler, config: ScheduleConfig, batchable: anytype) void {
}
pub const Runnable = struct {
};
pub const Batch = struct {
};
const ThreadPool = struct {
lock: Lock,
state: State,
spawned: ?*Node,
const State = union(enum) {
running: Running,
terminating: Terminating,
shutdown,
};
const Running = struct {
idle: ?*Waiter,
spawned: usize,
spawnable: usize,
const Waiter = struct {
prev: ?*Waiter,
next: ?*Waiter,
wakeFn: fn(*Waiter) void,
};
};
const Terminating = struct {
cond: Condvar,
pending: usize,
};
pub fn init(num_workers: usize) ThreadPool {
return ThreadPool{
.lock = Lock.init(),
.spawned = null,
.state = .{
.running = .{
.idle = null,
.spawned = 0,
.spawnable = num_workers,
},
},
};
}
pub fn deinit(self: *ThreadPool) void {
self.shutdown();
self.join();
self.lock.deinit();
self.* = undefined;
}
pub fn spawn(self: *ThreadPool, comptime runFn: fn(*Node) void) void {
var spawned_thread = false;
var remaining_attempts: u8 = 5;
while (true) {
const held = self.lock.acquire();
const running = switch (self.state) {
.running => |*running| blk: {
if (spawned_thread)
running.spawned -= 1;
break :blk running;
},
.terminating => |*terminating| {
if (spawned_thread)
terminating.finishOne();
held.release();
return;
},
};
if (running.pop()) |waiter| {
held.release();
waiter.wake();
return;
}
if (running.spawned < running.spawnable and remaining_attempts > 0) {
running.spawned += 1;
held.release();
if (self.spawnThread(runFn))
return;
remaining_attempts -= 1;
continue;
}
held.release();
return;
}
}
const Iter = struct {
node: ?*Node,
fn next(self: *Iter) ?*Node {
const node = self.node orelse return null;
self.node = node.next;
return node;
}
};
pub fn iter(self: *ThreadPool) Iter {
const node = @atomicLoad(?*Node, &self.spawned, .Acquire);
return Iter{ .node = node };
}
fn spawnThread(self: *ThreadPool, comptime runFn: fn(*Node) void) bool {
const Spawner = struct {
pool: *ThreadPool,
thread: *std.Thread,
put_event: Event,
get_event: Event,
fn entry(this: *@This()) void {
this.put_event.wait();
const pool = this.pool;
const thread = this.thread;
this.get_event.set();
var node: Node = undefined;
node.init(pool, thread);
defer node.deinit();
runFn(&node);
}
};
var spawner = Spawner{
.pool = self,
.thread = undefined,
.put_event = Event.init(),
.get_event = Event.init(),
};
defer {
spawner.put_event.deinit();
spawner.get_event.deinit();
}
spawner.thread = std.Thread.spawn(&spawner, Spawner.entry) catch return false;
spawner.put_event.set();
spawner.get_event.wait();
return true;
}
const Node = struct {
data: usize,
next: ?*Node,
pool: *ThreadPool,
thread: *std.Thread,
join_event: Event,
fn init(self: *Node, pool; *ThreadPool, thread: *std.Thread) void {
self.* = .{
.next = undefined,
.pool = pool,
.thread = thread,
.join_event = Event.init(),
};
self.next = @atomicLoad(?*Node, &pool.spawned, .Monotonic);
while (true) {
self.next = @cmpxchgWeak(
?*Node,
&pool.spawned,
self.next,
self,
.Release,
.Monotonic,
) orelse break;
}
}
fn deinit(self: *Node) void {
self.join_event.wait();
self.join_event.deinit();
}
};
pub fn shutdown(self: *ThreadPool) void {
var waiters = blk: {
const held = self.lock.acquire();
defer held.release();
switch (self.state) {
.terminating => return,
.running => |running| {
self.state = .{
.terminating => {
.pending = running.spawned,
.cond = Condvar.init(),
},
};
break :blk running.idle;
},
}
};
while (waiters) |waiter| {
waiters = waiter.next;
(waiter.wakeFn)(waiter);
}
}
fn join(self: *ThreadPool) void {
{
var held = self.lock.acquire();
defer held.release();
defer self.state.terminating.cond.deinit();
while (self.state.terminating.pending > 0)
held = self.state.terminating.cond.wait(held);
}
while (self.spawned) |node| {
self.spawned = node.next;
const thread = node.thread;
node.join_event.set();
thread.wait();
}
}
};
const Condvar = if (std.builtin.os.tag == .windows)
struct {
cond: usize,
pub fn init() Condvar {
return .{ .cond = 0 };
}
pub fn deinit(self: *Condvar) void {
self.* = undefined;
}
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held {
const status = SleepConditionVariableSRW(
&self.cond,
&held.inner.srwlock,
system.INFINITE,
@as(system.ULONG, 0),
);
std.debug.assert(status != system.FALSE);
return held;
}
pub fn notifyOne(self: *Condvar) void {
WakeConditionVariable(&self.cond);
}
pub fn notifyAll(self: *Condvar) void {
WakeAllConditionVariable(&self.cond);
}
extern "kernel32" fn SleepConditionVariableSRW(
cond: *usize,
srwlock: *usize,
timeout: system.DWORD,
flags: system.ULONG,
) callconv(system.WINAPI) system.BOOL;
extern "kernel32" fn WakeConditionVariable(
cond: *usize,
) callconv(system.WINAPI) void;
extern "kernel32" fn WakeAllConditionVariable(
cond: *usize,
) callconv(system.WINAPI) void;
}
else if (std.builtin.link_libc)
struct {
cond: std.c.pthread_cond_t,
pub fn init() Condvar {
return .{ .cond = std.c.PTHREAD_COND_INTIALIZER };
}
pub fn deinit(self: *Condvar) void {
const safe_ret = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const ret_cond = std.c.pthread_cond_destroy(&self.cond);
std.debug.assert(ret_cond == 0 or ret_cond == safe_ret);
}
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held {
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &held.inner.mutex) == 0);
return held;
}
pub fn notifyOne(self: *Condvar) void {
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0);
}
pub fn notifyAll(self: *Condvar) void {
std.debug.assert(std.c.pthread_cond_broadcast(&self.cond) == 0);
}
}
else
struct {
lock: Lock,
waiters: std.SinglyLinkedList(Event),
pub fn init() Condvar {
return .{
.lock = Lock.init(),
.waiters = .{},
};
}
pub fn deinit(self: *Condvar) void {
self.lock.deinit();
self.* = undefined;
}
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held {
var waiter: @TypeOf(self.waiters).Node = undefined;
waiter.data = Event.init();
defer waiter.data.deinit();
{
const waiters_held = self.lock.acquire();
defer waiters_held.release();
self.waiters.prepend(&waiter);
}
held.release();
waiter.data.wait();
return held.inner.acquire();
}
pub fn notifyOne(self: *Condvar) void {
var maybe_waiter = blk: {
const held = self.lock.acquire();
defer held.release();
break :blk self.waiters.popFirst();
};
if (maybe_waiter) |waiter|
waiter.data.set();
}
pub fn notifyAll(self: *Condvar) void {
var waiters = blk: {
const held = self.lock.acquire();
defer held.release();
const waiters = self.waiters;
self.waiters = .{};
break :blk waiters;
};
while (waiters.popFirst()) |waiter|
waiter.data.set();
}
};
const Lock = if (std.builtin.os.tag == .windows)
struct {
srwlock: usize,
pub fn init() Lock {
return .{ .srwlock = 0 };
}
pub fn deinit(self: *Lock) void {
self.* = undefined;
}
pub fn acquire(self: *Lock) Held {
AcquireSRWLockExclusive(&self.srwlock);
return Held{ .inner = self };
}
pub const Held = struct {
inner: *Lock,
pub fn release(self: Held) void {
ReleaseSRWLockExclusive(&self.inner.srwlock);
}
};
extern "kernel32" fn AcquireSRWLockExclusive(
srwlock: *usize,
) callconv(system.WINAPI) void;
extern "kernel32" fn ReleaseSRWLockExclusive(
srwlock: *usize,
) callconv(system.WINAPI) void;
}
else if (comptime std.Target.current.isDarwin())
struct {
unfair_lock: u32,
pub fn init() Lock {
return .{ .unfair_lock = 0 };
}
pub fn deinit(self: *Lock) void {
self.* = undefined;
}
pub fn acquire(self: *Lock) Held {
os_unfair_lock_lock(&self.unfair_lock);
return Held{ .inner = self };
}
pub const Held = struct {
inner: *Lock,
pub fn release(self: Held) void {
os_unfair_lock_unlock(&self.inner.unfair_lock);
}
};
extern "c" fn os_unfair_lock_lock(
unfair_lock: *u32,
) callconv(.C) void;
extern "c" fn os_unfair_lock_unlock(
unfair_lock: *u32,
) callconv(.C) void;
}
else if (std.builtin.link_libc)
struct {
mutex: std.c.pthread_mutex_t,
pub fn init() Lock {
return .{ .mutex = std.c.PTHREAD_MUTEX_INITIALIZER };
}
pub fn deinit(self: *Lock) void {
const safe_ret = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const ret_mutex = std.c.pthread_mutex_destroy(&self.mutex);
std.debug.assert(ret_mutex == 0 or ret_mutex == safe_ret);
}
pub fn acquire(self: *Lock) Held {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
return Held{ .inner = self };
}
pub const Held = struct {
inner: *Lock,
pub fn release(self: Held) void {
std.debug.assert(std.c.pthread_mutex_unlock(&self.inner.mutex) == 0);
}
};
}
else if (std.builtin.os.tag == .linux)
struct {
state: enum(i32) {
unlocked,
locked,
waiting,
},
pub fn init() Lock {
return .{ .state = .unlocked };
}
pub fn deinit(self: *Lock) void {
self.* = undefined;
}
pub fn acquire(self: *Lock) Held {
const state = @atomicRmw(@TypeOf(self.state), &self.state, .Xchg, .locked, .Acquire);
if (state != .unlocked)
self.acquireSlow(state);
return Held{ .inner = self };
}
fn acquireSlow(self: *Lock, current_state: @TypeOf(self.state)) void {
@setCold(true);
var new_state = current_state;
var spin: std.math.Log2Int(usize) = 0;
while (true) {
while (spin < 10) : (spin += 1) {
switch (@atomicLoad(@TypeOf(self.state), &self.state, .Monotonic)) {
.unlocked => _ = @cmpxchgWeak(
@TypeOf(self.state),
&self.state,
.unlocked,
new_state,
.Acquire,
.Monotonic,
) orelse return,
.locked => {},
.waiting => break,
}
const iters = std.math.min(64, @as(usize, 1) << spin);
std.SpinLock.loopHint(iters);
}
new_state = .waiting;
switch (@atomicRmw(@TypeOf(self.state, &self.state, .Xchg, new_state, .Acquire))) {
.locked => return,
else => {},
}
spin = 0;
switch (system.getErrno(system.futex_wait(
@ptrCast(*const i32, &self.state),
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT,
@enumToInt(new_state),
null,
))) {
0 => {},
system.EINTR => {},
system.EAGAIN => {},
else => unreachable,
}
}
}
pub const Held = struct {
inner: *Lock,
pub fn release(self: Held) void {
switch (@atomicRmw(
@TypeOf(self.inner.state),
&self.inner.state,
.Xchg,
.unlocked,
.Release,
)) {
.unlocked => unreachable,
.locked => {},
.waiting => self.inner.releaseSlow(),
}
}
};
fn releaseSlow(self: *Lock) void {
@setCold(true);
switch (system.getErrno(system.futex_wake(
@ptrCast(*const i32, &self.state),
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE,
@as(i32, 1),
))) {
0 => {},
system.EFAULT => {},
else => unreachable,
}
}
}
else
struct {
is_locked: bool,
pub fn init() Lock {
return .{ .is_locked = false };
}
pub fn deinit(self: *Lock) void {
self.* = undefined;
}
pub fn acquire(self: *Lock) Held {
while (@atomicRmw(bool, &self.is_locked, .Xchg, true, .Acquire))
std.SpinLock.loopHint();
return Held{ .inner = self };
}
pub const Held = struct {
inner: *Lock,
pub fn release(self: Held) void {
@atomicStore(bool, &self.is_locked, false, .Release);
}
};
};
const Event = if (std.builtin.os.tag == .windows)
struct {
state: usize,
const UNSET = 0;
const WAITING = 1;
const IS_SET = std.math.maxInt(usize);
pub fn init() Event {
return .{ .state = UNSET };
}
pub fn deinit(self: *Event) void {
self.* = undefined;
}
pub fn wait(self: *Event) void {
var state = @atomicLoad(usize, &self.state, .Acquire);
while (true) {
if (state == IS_SET)
return;
state = @cmpxchgWeak(
usize,
&self.state,
state,
state + WAITING,
.Acquire,
.Acquire,
) orelse {
const status = NtWaitForKeyedEvent(null, @ptrToInt(&self.state), system.FALSE, null);
std.debug.assert(status == .SUCCESS);
return;
};
}
}
pub fn set(self: *Event) void {
var state = @atomicRmw(usize, &self.state, .Xchg, IS_SET, .Release);
if (state == IS_SET)
return;
while (state >= WAITING) : (state -= WAITING) {
const status = NtReleaseKeyedEvent(null, @ptrToInt(&self.state), system.FALSE, null);
std.debug.assert(status == .SUCCESS);
}
}
extern "NtDll" fn NtWaitForKeyedEvent(
handle: ?system.HANDLE,
key: usize,
alertable: system.BOOLEAN,
timeout: ?*const system.LARGE_INTEGER,
) callconv(system.WINAPI) system.NTSTATUS;
extern "NtDll" fn NtReleaseKeyedEvent(
handle: ?system.HANDLE,
key: usize,
alertable: system.BOOLEAN,
timeout: ?*const system.LARGE_INTEGER,
) callconv(system.WINAPI) system.NTSTATUS;
}
else if (std.builtin.link_libc)
struct {
is_set: bool,
cond: std.c.pthread_cond_t,
mutex: std.c.pthread_mutex_t,
pub fn init() Event {
return .{
.is_set = false,
.cond = std.c.PTHREAD_COND_INTIALIZER,
.mutex = std.c.PTHREAD_MUTEX_INITIALIZER,
};
}
pub fn deinit(self: *Event) void {
const safe_ret = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const ret_cond = std.c.pthread_cond_destroy(&self.cond);
std.debug.assert(ret_cond == 0 or ret_cond == safe_ret);
const ret_mutex = std.c.pthread_mutex_destroy(&self.mutex);
std.debug.assert(ret_mutex == 0 or ret_mutex == safe_ret);
self.* = undefined;
}
pub fn wait(self: *Event) void {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0);
while (self.is_set)
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == 0);
}
pub fn set(self: *Event) void {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0);
self.is_set = true;
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0);
}
}
else if (std.builtin.os.tag == .linux)
struct {
state: enum(i32) {
unset,
set,
},
pub fn init() Event {
return .{ .state = .unset };
}
pub fn deinit(self: *Event) void {
self.* = undefined;
}
pub fn wait(self: *Event) void {
while (true) {
if (@atomicLoad(@TypeOf(self.state), &self.state, .Acquire) == .set)
return;
switch (system.getErrno(system.futex_wait(
@ptrCast(*const i32, &self.state),
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT,
@enumToInt(@TypeOf(self.state).unset),
null,
))) {
0 => {},
system.EINTR => {},
system.EAGAIN => {},
else => unreachable,
}
}
}
pub fn set(self: *Event) void {
@atomicStore(@TypeOf(self.state), &self.state, .set, .Release);
switch (system.getErrno(system.futex_wake(
@ptrCast(*const i32, &self.state),
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE,
std.math.maxInt(i32),
))) {
0 => {},
system.EFAULT => {},
else => unreachable,
}
}
}
else
struct {
is_set: bool,
pub fn init() Event {
return .{ .is_set = false };
}
pub fn deinit(self: *Event) void {
self.* = undefined;
}
pub fn wait(self: *Event) void {
while (!@atomicLoad(bool, &self.is_set, .Acquire))
std.SpinLock.loopHint();
}
pub fn set(self: *Event) void {
@atomicStore(bool, &self.is_set, true, .Release);
}
};
};
const std = @import("std");
const system = std.os.system;
pub const Scheduler = struct {
pub const InitConfig = struct {
max_threads: ?u16 = null,
};
pub fn init(self: *Scheduler, config: InitConfig) !void {
const max_threads = if (std.builtin.single_threaded)
@as(u16, 1)
else if (config.max_threads) |max_threads|
std.math.max(1, max_threads)
else
std.math.max(1, @intCast(u16, std.Thread.cpuCount() catch 1));
}
pub fn deinit(self: *Scheduler) void {
}
pub const ScheduleConfig = struct {
};
pub fn schedule(self: *Scheduler, config: ScheduleConfig, batchable: anytype) void {
}
pub const Runnable = struct {
};
pub const Batch = struct {
};
const ThreadPool = struct {
lock: Lock,
state: State,
spawned: ?*Node,
const State = union(enum) {
running: Running,
terminating: Terminating,
shutdown,
};
const Running = struct {
idle: ?*Waiter,
spawned: usize,
spawnable: usize,
const Waiter = struct {
prev: ?*Waiter,
next: ?*Waiter,
wakeFn: fn(*Waiter) void,
};
};
const Terminating = struct {
cond: Condvar,
pending: usize,
};
pub fn init(num_workers: usize) ThreadPool {
return ThreadPool{
.lock = Lock.init(),
.spawned = null,
.state = .{
.running = .{
.idle = null,
.spawned = 0,
.spawnable = num_workers,
},
},
};
}
pub fn deinit(self: *ThreadPool) void {
self.shutdown();
self.join();
self.lock.deinit();
self.* = undefined;
}
pub fn spawn(self: *ThreadPool, comptime runFn: fn(*Node) void) void {
var spawned_thread = false;
var remaining_attempts: u8 = 5;
while (true) {
const held = self.lock.acquire();
const running = switch (self.state) {
.running => |*running| blk: {
if (spawned_thread)
running.spawned -= 1;
break :blk running;
},
.terminating => |*terminating| {
if (spawned_thread)
terminating.finishOne();
held.release();
return;
},
};
if (running.pop()) |waiter| {
held.release();
waiter.wake();
return;
}
if (running.spawned < running.spawnable and remaining_attempts > 0) {
running.spawned += 1;
held.release();
if (self.spawnThread(runFn))
return;
remaining_attempts -= 1;
continue;
}
held.release();
return;
}
}
const Iter = struct {
node: ?*Node,
fn next(self: *Iter) ?*Node {
const node = self.node orelse return null;
self.node = node.next;
return node;
}
};
pub fn iter(self: *ThreadPool) Iter {
const node = @atomicLoad(?*Node, &self.spawned, .Acquire);
return Iter{ .node = node };
}
fn spawnThread(self: *ThreadPool, comptime runFn: fn(*Node) void) bool {
const Spawner = struct {
pool: *ThreadPool,
thread: *std.Thread,
put_event: Event,
get_event: Event,
fn entry(this: *@This()) void {
this.put_event.wait();
const pool = this.pool;
const thread = this.thread;
this.get_event.set();
var node: Node = undefined;
node.init(pool, thread);
defer node.deinit();
runFn(&node);
}
};
var spawner = Spawner{
.pool = self,
.thread = undefined,
.put_event = Event.init(),
.get_event = Event.init(),
};
defer {
spawner.put_event.deinit();
spawner.get_event.deinit();
}
spawner.thread = std.Thread.spawn(&spawner, Spawner.entry) catch return false;
spawner.put_event.set();
spawner.get_event.wait();
return true;
}
const Node = struct {
data: usize,
next: ?*Node,
pool: *ThreadPool,
thread: *std.Thread,
join_event: Event,
fn init(self: *Node, pool; *ThreadPool, thread: *std.Thread) void {
self.* = .{
.next = undefined,
.pool = pool,
.thread = thread,
.join_event = Event.init(),
};
self.next = @atomicLoad(?*Node, &pool.spawned, .Monotonic);
while (true) {
self.next = @cmpxchgWeak(
?*Node,
&pool.spawned,
self.next,
self,
.Release,
.Monotonic,
) orelse break;
}
}
fn deinit(self: *Node) void {
self.join_event.wait();
self.join_event.deinit();
}
};
pub fn shutdown(self: *ThreadPool) void {
var waiters = blk: {
const held = self.lock.acquire();
defer held.release();
switch (self.state) {
.terminating => return,
.running => |running| {
self.state = .{
.terminating => {
.pending = running.spawned,
.cond = Condvar.init(),
},
};
break :blk running.idle;
},
}
};
while (waiters) |waiter| {
waiters = waiter.next;
(waiter.wakeFn)(waiter);
}
}
fn join(self: *ThreadPool) void {
{
var held = self.lock.acquire();
defer held.release();
defer self.state.terminating.cond.deinit();
while (self.state.terminating.pending > 0)
held = self.state.terminating.cond.wait(held);
}
while (self.spawned) |node| {
self.spawned = node.next;
const thread = node.thread;
node.join_event.set();
thread.wait();
}
}
};
const Condvar = if (std.builtin.os.tag == .windows)
struct {
cond: usize,
pub fn init() Condvar {
return .{ .cond = 0 };
}
pub fn deinit(self: *Condvar) void {
self.* = undefined;
}
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held {
const status = SleepConditionVariableSRW(
&self.cond,
&held.inner.srwlock,
system.INFINITE,
@as(system.ULONG, 0),
);
std.debug.assert(status != system.FALSE);
return held;
}
pub fn notifyOne(self: *Condvar) void {
WakeConditionVariable(&self.cond);
}
pub fn notifyAll(self: *Condvar) void {
WakeAllConditionVariable(&self.cond);
}
extern "kernel32" fn SleepConditionVariableSRW(
cond: *usize,
srwlock: *usize,
timeout: system.DWORD,
flags: system.ULONG,
) callconv(system.WINAPI) system.BOOL;
extern "kernel32" fn WakeConditionVariable(
cond: *usize,
) callconv(system.WINAPI) void;
extern "kernel32" fn WakeAllConditionVariable(
cond: *usize,
) callconv(system.WINAPI) void;
}
else if (std.builtin.link_libc)
struct {
cond: std.c.pthread_cond_t,
pub fn init() Condvar {
return .{ .cond = std.c.PTHREAD_COND_INTIALIZER };
}
pub fn deinit(self: *Condvar) void {
const safe_ret = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const ret_cond = std.c.pthread_cond_destroy(&self.cond);
std.debug.assert(ret_cond == 0 or ret_cond == safe_ret);
}
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held {
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &held.inner.mutex) == 0);
return held;
}
pub fn notifyOne(self: *Condvar) void {
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0);
}
pub fn notifyAll(self: *Condvar) void {
std.debug.assert(std.c.pthread_cond_broadcast(&self.cond) == 0);
}
}
else
struct {
lock: Lock,
waiters: std.SinglyLinkedList(Event),
pub fn init() Condvar {
return .{
.lock = Lock.init(),
.waiters = .{},
};
}
pub fn deinit(self: *Condvar) void {
self.lock.deinit();
self.* = undefined;
}
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held {
var waiter: @TypeOf(self.waiters).Node = undefined;
waiter.data = Event.init();
defer waiter.data.deinit();
{
const waiters_held = self.lock.acquire();
defer waiters_held.release();
self.waiters.prepend(&waiter);
}
held.release();
waiter.data.wait();
return held.inner.acquire();
}
pub fn notifyOne(self: *Condvar) void {
var maybe_waiter = blk: {
const held = self.lock.acquire();
defer held.release();
break :blk self.waiters.popFirst();
};
if (maybe_waiter) |waiter|
waiter.data.set();
}
pub fn notifyAll(self: *Condvar) void {
var waiters = blk: {
const held = self.lock.acquire();
defer held.release();
const waiters = self.waiters;
self.waiters = .{};
break :blk waiters;
};
while (waiters.popFirst()) |waiter|
waiter.data.set();
}
};
const Lock = if (std.builtin.os.tag == .windows)
struct {
srwlock: usize,
pub fn init() Lock {
return .{ .srwlock = 0 };
}
pub fn deinit(self: *Lock) void {
self.* = undefined;
}
pub fn acquire(self: *Lock) Held {
AcquireSRWLockExclusive(&self.srwlock);
return Held{ .inner = self };
}
pub const Held = struct {
inner: *Lock,
pub fn release(self: Held) void {
ReleaseSRWLockExclusive(&self.inner.srwlock);
}
};
extern "kernel32" fn AcquireSRWLockExclusive(
srwlock: *usize,
) callconv(system.WINAPI) void;
extern "kernel32" fn ReleaseSRWLockExclusive(
srwlock: *usize,
) callconv(system.WINAPI) void;
}
else if (comptime std.Target.current.isDarwin())
struct {
unfair_lock: u32,
pub fn init() Lock {
return .{ .unfair_lock = 0 };
}
pub fn deinit(self: *Lock) void {
self.* = undefined;
}
pub fn acquire(self: *Lock) Held {
os_unfair_lock_lock(&self.unfair_lock);
return Held{ .inner = self };
}
pub const Held = struct {
inner: *Lock,
pub fn release(self: Held) void {
os_unfair_lock_unlock(&self.inner.unfair_lock);
}
};
extern "c" fn os_unfair_lock_lock(
unfair_lock: *u32,
) callconv(.C) void;
extern "c" fn os_unfair_lock_unlock(
unfair_lock: *u32,
) callconv(.C) void;
}
else if (std.builtin.link_libc)
struct {
mutex: std.c.pthread_mutex_t,
pub fn init() Lock {
return .{ .mutex = std.c.PTHREAD_MUTEX_INITIALIZER };
}
pub fn deinit(self: *Lock) void {
const safe_ret = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const ret_mutex = std.c.pthread_mutex_destroy(&self.mutex);
std.debug.assert(ret_mutex == 0 or ret_mutex == safe_ret);
}
pub fn acquire(self: *Lock) Held {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
return Held{ .inner = self };
}
pub const Held = struct {
inner: *Lock,
pub fn release(self: Held) void {
std.debug.assert(std.c.pthread_mutex_unlock(&self.inner.mutex) == 0);
}
};
}
else if (std.builtin.os.tag == .linux)
struct {
state: enum(i32) {
unlocked,
locked,
waiting,
},
pub fn init() Lock {
return .{ .state = .unlocked };
}
pub fn deinit(self: *Lock) void {
self.* = undefined;
}
pub fn acquire(self: *Lock) Held {
const state = @atomicRmw(@TypeOf(self.state), &self.state, .Xchg, .locked, .Acquire);
if (state != .unlocked)
self.acquireSlow();
}
fn acquireSlow(self: *Lock) void {
@setCold(true);
while (true) {
var spin: std.math.Log2Int(usize) = 0;
while (spin < 10) : (spin += 1) {
switch (@atomicLoad(@TypeOf(self.state), &self.state, .Monotonic)) {
.unlocked => _ = @cmpxchgWeak(
@TypeOf(self.state),
&self.state,
.unlocked,
new_state,
.Acquire,
.Monotonic,
) orelse return,
.locked => {},
.waiting => break,
}
const iters = std.math.min(64, @as(usize, 1) << spin);
std.SpinLock.loopHint(iters);
}
const state = @atomicRmw(@TypeOf(self.state), &self.state, .Xchg, .waiting, .Acquire);
if (state == .unlocked)
return;
switch (system.getErrno(system.futex_wait(
@ptrCast(*const i32, &self.state),
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT,
@enumToInt(@TypeOf(self.state).waiting),
null,
))) {
0 => {},
system.EINTR => {},
system.EAGAIN => {},
else => unreachable,
}
}
}
pub const Held = struct {
inner: *Lock,
pub fn release(self: Held) void {
switch (@atomicRmw(
@TypeOf(self.inner.state),
&self.inner.state,
.Xchg,
.unlocked,
.Release,
)) {
.unlocked => unreachable,
.locked => {},
.waiting => self.inner.releaseSlow(),
}
}
};
fn releaseSlow(self: *Lock) void {
@setCold(true);
switch (system.getErrno(system.futex_wake(
@ptrCast(*const i32, &self.state),
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE,
@as(i32, 1),
))) {
0 => {},
system.EFAULT => {},
else => unreachable,
}
}
}
else
struct {
is_locked: bool,
pub fn init() Lock {
return .{ .is_locked = false };
}
pub fn deinit(self: *Lock) void {
self.* = undefined;
}
pub fn acquire(self: *Lock) Held {
while (@atomicRmw(bool, &self.is_locked, .Xchg, true, .Acquire))
std.SpinLock.loopHint();
return Held{ .inner = self };
}
pub const Held = struct {
inner: *Lock,
pub fn release(self: Held) void {
@atomicStore(bool, &self.is_locked, false, .Release);
}
};
};
const Event = if (std.builtin.os.tag == .windows)
struct {
state: usize,
const UNSET = 0;
const WAITING = 1;
const IS_SET = std.math.maxInt(usize);
pub fn init() Event {
return .{ .state = UNSET };
}
pub fn deinit(self: *Event) void {
self.* = undefined;
}
pub fn wait(self: *Event) void {
var state = @atomicLoad(usize, &self.state, .Acquire);
while (true) {
if (state == IS_SET)
return;
state = @cmpxchgWeak(
usize,
&self.state,
state,
state + WAITING,
.Acquire,
.Acquire,
) orelse {
const status = NtWaitForKeyedEvent(null, @ptrToInt(&self.state), system.FALSE, null);
std.debug.assert(status == .SUCCESS);
return;
};
}
}
pub fn set(self: *Event) void {
var state = @atomicRmw(usize, &self.state, .Xchg, IS_SET, .Release);
if (state == IS_SET)
return;
while (state >= WAITING) : (state -= WAITING) {
const status = NtReleaseKeyedEvent(null, @ptrToInt(&self.state), system.FALSE, null);
std.debug.assert(status == .SUCCESS);
}
}
extern "NtDll" fn NtWaitForKeyedEvent(
handle: ?system.HANDLE,
key: usize,
alertable: system.BOOLEAN,
timeout: ?*const system.LARGE_INTEGER,
) callconv(system.WINAPI) system.NTSTATUS;
extern "NtDll" fn NtReleaseKeyedEvent(
handle: ?system.HANDLE,
key: usize,
alertable: system.BOOLEAN,
timeout: ?*const system.LARGE_INTEGER,
) callconv(system.WINAPI) system.NTSTATUS;
}
else if (std.builtin.link_libc)
struct {
is_set: bool,
cond: std.c.pthread_cond_t,
mutex: std.c.pthread_mutex_t,
pub fn init() Event {
return .{
.is_set = false,
.cond = std.c.PTHREAD_COND_INTIALIZER,
.mutex = std.c.PTHREAD_MUTEX_INITIALIZER,
};
}
pub fn deinit(self: *Event) void {
const safe_ret = switch (std.builtin.os.tag) {
.dragonfly, .netbsd => std.os.EAGAIN,
else => 0,
};
const ret_cond = std.c.pthread_cond_destroy(&self.cond);
std.debug.assert(ret_cond == 0 or ret_cond == safe_ret);
const ret_mutex = std.c.pthread_mutex_destroy(&self.mutex);
std.debug.assert(ret_mutex == 0 or ret_mutex == safe_ret);
self.* = undefined;
}
pub fn wait(self: *Event) void {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0);
while (self.is_set)
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == 0);
}
pub fn set(self: *Event) void {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0);
self.is_set = true;
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0);
}
}
else if (std.builtin.os.tag == .linux)
struct {
state: enum(i32) {
unset,
set,
},
pub fn init() Event {
return .{ .state = .unset };
}
pub fn deinit(self: *Event) void {
self.* = undefined;
}
pub fn wait(self: *Event) void {
while (true) {
if (@atomicLoad(@TypeOf(self.state), &self.state, .Acquire) == .set)
return;
switch (system.getErrno(system.futex_wait(
@ptrCast(*const i32, &self.state),
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT,
@enumToInt(@TypeOf(self.state).unset),
null,
))) {
0 => {},
system.EINTR => {},
system.EAGAIN => {},
else => unreachable,
}
}
}
pub fn set(self: *Event) void {
@atomicStore(@TypeOf(self.state), &self.state, .set, .Release);
switch (system.getErrno(system.futex_wake(
@ptrCast(*const i32, &self.state),
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE,
std.math.maxInt(i32),
))) {
0 => {},
system.EFAULT => {},
else => unreachable,
}
}
}
else
struct {
is_set: bool,
pub fn init() Event {
return .{ .is_set = false };
}
pub fn deinit(self: *Event) void {
self.* = undefined;
}
pub fn wait(self: *Event) void {
while (!@atomicLoad(bool, &self.is_set, .Acquire))
std.SpinLock.loopHint();
}
pub fn set(self: *Event) void {
@atomicStore(bool, &self.is_set, true, .Release);
}
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment