-
-
Save voluntas/4ad9f7a167c7916956a0ae1fbe80c600 to your computer and use it in GitHub Desktop.
An isolated thread pool implementation for Zig
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const system = std.os.system; | |
pub const Scheduler = struct { | |
pub const InitConfig = struct { | |
max_threads: ?u16 = null, | |
}; | |
pub fn init(self: *Scheduler, config: InitConfig) !void { | |
const max_threads = if (std.builtin.single_threaded) | |
@as(u16, 1) | |
else if (config.max_threads) |max_threads| | |
std.math.max(1, max_threads) | |
else | |
std.math.max(1, @intCast(u16, std.Thread.cpuCount() catch 1)); | |
} | |
pub fn deinit(self: *Scheduler) void { | |
} | |
pub const ScheduleConfig = struct { | |
}; | |
pub fn schedule(self: *Scheduler, config: ScheduleConfig, batchable: anytype) void { | |
} | |
pub const Runnable = struct { | |
}; | |
pub const Batch = struct { | |
}; | |
const ThreadPool = struct { | |
lock: Lock, | |
state: State, | |
spawned: ?*Node, | |
const State = union(enum) { | |
running: Running, | |
terminating: Terminating, | |
shutdown, | |
}; | |
const Running = struct { | |
idle: ?*Waiter, | |
spawned: usize, | |
spawnable: usize, | |
const Waiter = struct { | |
prev: ?*Waiter, | |
next: ?*Waiter, | |
wakeFn: fn(*Waiter) void, | |
}; | |
}; | |
const Terminating = struct { | |
cond: Condvar, | |
pending: usize, | |
}; | |
pub fn init(num_workers: usize) ThreadPool { | |
return ThreadPool{ | |
.lock = Lock.init(), | |
.spawned = null, | |
.state = .{ | |
.running = .{ | |
.idle = null, | |
.spawned = 0, | |
.spawnable = num_workers, | |
}, | |
}, | |
}; | |
} | |
pub fn deinit(self: *ThreadPool) void { | |
self.shutdown(); | |
self.join(); | |
self.lock.deinit(); | |
self.* = undefined; | |
} | |
pub fn spawn(self: *ThreadPool, comptime runFn: fn(*Node) void) void { | |
var spawned_thread = false; | |
var remaining_attempts: u8 = 5; | |
while (true) { | |
const held = self.lock.acquire(); | |
const running = switch (self.state) { | |
.running => |*running| blk: { | |
if (spawned_thread) | |
running.spawned -= 1; | |
break :blk running; | |
}, | |
.terminating => |*terminating| { | |
if (spawned_thread) | |
terminating.finishOne(); | |
held.release(); | |
return; | |
}, | |
}; | |
if (running.pop()) |waiter| { | |
held.release(); | |
waiter.wake(); | |
return; | |
} | |
if (running.spawned < running.spawnable and remaining_attempts > 0) { | |
running.spawned += 1; | |
held.release(); | |
if (self.spawnThread(runFn)) | |
return; | |
remaining_attempts -= 1; | |
continue; | |
} | |
held.release(); | |
return; | |
} | |
} | |
const Iter = struct { | |
node: ?*Node, | |
fn next(self: *Iter) ?*Node { | |
const node = self.node orelse return null; | |
self.node = node.next; | |
return node; | |
} | |
}; | |
pub fn iter(self: *ThreadPool) Iter { | |
const node = @atomicLoad(?*Node, &self.spawned, .Acquire); | |
return Iter{ .node = node }; | |
} | |
fn spawnThread(self: *ThreadPool, comptime runFn: fn(*Node) void) bool { | |
const Spawner = struct { | |
pool: *ThreadPool, | |
thread: *std.Thread, | |
put_event: Event, | |
get_event: Event, | |
fn entry(this: *@This()) void { | |
this.put_event.wait(); | |
const pool = this.pool; | |
const thread = this.thread; | |
this.get_event.set(); | |
var node: Node = undefined; | |
node.init(pool, thread); | |
defer node.deinit(); | |
runFn(&node); | |
} | |
}; | |
var spawner = Spawner{ | |
.pool = self, | |
.thread = undefined, | |
.put_event = Event.init(), | |
.get_event = Event.init(), | |
}; | |
defer { | |
spawner.put_event.deinit(); | |
spawner.get_event.deinit(); | |
} | |
spawner.thread = std.Thread.spawn(&spawner, Spawner.entry) catch return false; | |
spawner.put_event.set(); | |
spawner.get_event.wait(); | |
return true; | |
} | |
const Node = struct { | |
data: usize, | |
next: ?*Node, | |
pool: *ThreadPool, | |
thread: *std.Thread, | |
join_event: Event, | |
fn init(self: *Node, pool; *ThreadPool, thread: *std.Thread) void { | |
self.* = .{ | |
.next = undefined, | |
.pool = pool, | |
.thread = thread, | |
.join_event = Event.init(), | |
}; | |
self.next = @atomicLoad(?*Node, &pool.spawned, .Monotonic); | |
while (true) { | |
self.next = @cmpxchgWeak( | |
?*Node, | |
&pool.spawned, | |
self.next, | |
self, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
} | |
fn deinit(self: *Node) void { | |
self.join_event.wait(); | |
self.join_event.deinit(); | |
} | |
}; | |
pub fn shutdown(self: *ThreadPool) void { | |
var waiters = blk: { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
switch (self.state) { | |
.terminating => return, | |
.running => |running| { | |
self.state = .{ | |
.terminating => { | |
.pending = running.spawned, | |
.cond = Condvar.init(), | |
}, | |
}; | |
break :blk running.idle; | |
}, | |
} | |
}; | |
while (waiters) |waiter| { | |
waiters = waiter.next; | |
(waiter.wakeFn)(waiter); | |
} | |
} | |
fn join(self: *ThreadPool) void { | |
{ | |
var held = self.lock.acquire(); | |
defer held.release(); | |
defer self.state.terminating.cond.deinit(); | |
while (self.state.terminating.pending > 0) | |
held = self.state.terminating.cond.wait(held); | |
} | |
while (self.spawned) |node| { | |
self.spawned = node.next; | |
const thread = node.thread; | |
node.join_event.set(); | |
thread.wait(); | |
} | |
} | |
}; | |
const Condvar = if (std.builtin.os.tag == .windows) | |
struct { | |
cond: usize, | |
pub fn init() Condvar { | |
return .{ .cond = 0 }; | |
} | |
pub fn deinit(self: *Condvar) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held { | |
const status = SleepConditionVariableSRW( | |
&self.cond, | |
&held.inner.srwlock, | |
system.INFINITE, | |
@as(system.ULONG, 0), | |
); | |
std.debug.assert(status != system.FALSE); | |
return held; | |
} | |
pub fn notifyOne(self: *Condvar) void { | |
WakeConditionVariable(&self.cond); | |
} | |
pub fn notifyAll(self: *Condvar) void { | |
WakeAllConditionVariable(&self.cond); | |
} | |
extern "kernel32" fn SleepConditionVariableSRW( | |
cond: *usize, | |
srwlock: *usize, | |
timeout: system.DWORD, | |
flags: system.ULONG, | |
) callconv(system.WINAPI) system.BOOL; | |
extern "kernel32" fn WakeConditionVariable( | |
cond: *usize, | |
) callconv(system.WINAPI) void; | |
extern "kernel32" fn WakeAllConditionVariable( | |
cond: *usize, | |
) callconv(system.WINAPI) void; | |
} | |
else if (std.builtin.link_libc) | |
struct { | |
cond: std.c.pthread_cond_t, | |
pub fn init() Condvar { | |
return .{ .cond = std.c.PTHREAD_COND_INTIALIZER }; | |
} | |
pub fn deinit(self: *Condvar) void { | |
const safe_ret = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const ret_cond = std.c.pthread_cond_destroy(&self.cond); | |
std.debug.assert(ret_cond == 0 or ret_cond == safe_ret); | |
} | |
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held { | |
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &held.inner.mutex) == 0); | |
return held; | |
} | |
pub fn notifyOne(self: *Condvar) void { | |
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0); | |
} | |
pub fn notifyAll(self: *Condvar) void { | |
std.debug.assert(std.c.pthread_cond_broadcast(&self.cond) == 0); | |
} | |
} | |
else | |
struct { | |
lock: Lock, | |
waiters: std.SinglyLinkedList(Event), | |
pub fn init() Condvar { | |
return .{ | |
.lock = Lock.init(), | |
.waiters = .{}, | |
}; | |
} | |
pub fn deinit(self: *Condvar) void { | |
self.lock.deinit(); | |
self.* = undefined; | |
} | |
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held { | |
var waiter: @TypeOf(self.waiters).Node = undefined; | |
waiter.data = Event.init(); | |
defer waiter.data.deinit(); | |
{ | |
const waiters_held = self.lock.acquire(); | |
defer waiters_held.release(); | |
self.waiters.prepend(&waiter); | |
} | |
held.release(); | |
waiter.data.wait(); | |
return held.inner.acquire(); | |
} | |
pub fn notifyOne(self: *Condvar) void { | |
var maybe_waiter = blk: { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
break :blk self.waiters.popFirst(); | |
}; | |
if (maybe_waiter) |waiter| | |
waiter.data.set(); | |
} | |
pub fn notifyAll(self: *Condvar) void { | |
var waiters = blk: { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
const waiters = self.waiters; | |
self.waiters = .{}; | |
break :blk waiters; | |
}; | |
while (waiters.popFirst()) |waiter| | |
waiter.data.set(); | |
} | |
}; | |
const Lock = if (std.builtin.os.tag == .windows) | |
struct { | |
srwlock: usize, | |
pub fn init() Lock { | |
return .{ .srwlock = 0 }; | |
} | |
pub fn deinit(self: *Lock) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Lock) Held { | |
AcquireSRWLockExclusive(&self.srwlock); | |
return Held{ .inner = self }; | |
} | |
pub const Held = struct { | |
inner: *Lock, | |
pub fn release(self: Held) void { | |
ReleaseSRWLockExclusive(&self.inner.srwlock); | |
} | |
}; | |
extern "kernel32" fn AcquireSRWLockExclusive( | |
srwlock: *usize, | |
) callconv(system.WINAPI) void; | |
extern "kernel32" fn ReleaseSRWLockExclusive( | |
srwlock: *usize, | |
) callconv(system.WINAPI) void; | |
} | |
else if (comptime std.Target.current.isDarwin()) | |
struct { | |
unfair_lock: u32, | |
pub fn init() Lock { | |
return .{ .unfair_lock = 0 }; | |
} | |
pub fn deinit(self: *Lock) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Lock) Held { | |
os_unfair_lock_lock(&self.unfair_lock); | |
return Held{ .inner = self }; | |
} | |
pub const Held = struct { | |
inner: *Lock, | |
pub fn release(self: Held) void { | |
os_unfair_lock_unlock(&self.inner.unfair_lock); | |
} | |
}; | |
extern "c" fn os_unfair_lock_lock( | |
unfair_lock: *u32, | |
) callconv(.C) void; | |
extern "c" fn os_unfair_lock_unlock( | |
unfair_lock: *u32, | |
) callconv(.C) void; | |
} | |
else if (std.builtin.link_libc) | |
struct { | |
mutex: std.c.pthread_mutex_t, | |
pub fn init() Lock { | |
return .{ .mutex = std.c.PTHREAD_MUTEX_INITIALIZER }; | |
} | |
pub fn deinit(self: *Lock) void { | |
const safe_ret = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const ret_mutex = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(ret_mutex == 0 or ret_mutex == safe_ret); | |
} | |
pub fn acquire(self: *Lock) Held { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
return Held{ .inner = self }; | |
} | |
pub const Held = struct { | |
inner: *Lock, | |
pub fn release(self: Held) void { | |
std.debug.assert(std.c.pthread_mutex_unlock(&self.inner.mutex) == 0); | |
} | |
}; | |
} | |
else if (std.builtin.os.tag == .linux) | |
struct { | |
state: enum(i32) { | |
unlocked, | |
locked, | |
waiting, | |
}, | |
pub fn init() Lock { | |
return .{ .state = .unlocked }; | |
} | |
pub fn deinit(self: *Lock) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Lock) Held { | |
const state = @atomicRmw(@TypeOf(self.state), &self.state, .Xchg, .locked, .Acquire); | |
if (state != .unlocked) | |
self.acquireSlow(state); | |
return Held{ .inner = self }; | |
} | |
fn acquireSlow(self: *Lock, current_state: @TypeOf(self.state)) void { | |
@setCold(true); | |
var new_state = current_state; | |
var spin: std.math.Log2Int(usize) = 0; | |
while (true) { | |
while (spin < 10) : (spin += 1) { | |
switch (@atomicLoad(@TypeOf(self.state), &self.state, .Monotonic)) { | |
.unlocked => _ = @cmpxchgWeak( | |
@TypeOf(self.state), | |
&self.state, | |
.unlocked, | |
new_state, | |
.Acquire, | |
.Monotonic, | |
) orelse return, | |
.locked => {}, | |
.waiting => break, | |
} | |
const iters = std.math.min(64, @as(usize, 1) << spin); | |
std.SpinLock.loopHint(iters); | |
} | |
new_state = .waiting; | |
switch (@atomicRmw(@TypeOf(self.state, &self.state, .Xchg, new_state, .Acquire))) { | |
.locked => return, | |
else => {}, | |
} | |
spin = 0; | |
switch (system.getErrno(system.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT, | |
@enumToInt(new_state), | |
null, | |
))) { | |
0 => {}, | |
system.EINTR => {}, | |
system.EAGAIN => {}, | |
else => unreachable, | |
} | |
} | |
} | |
pub const Held = struct { | |
inner: *Lock, | |
pub fn release(self: Held) void { | |
switch (@atomicRmw( | |
@TypeOf(self.inner.state), | |
&self.inner.state, | |
.Xchg, | |
.unlocked, | |
.Release, | |
)) { | |
.unlocked => unreachable, | |
.locked => {}, | |
.waiting => self.inner.releaseSlow(), | |
} | |
} | |
}; | |
fn releaseSlow(self: *Lock) void { | |
@setCold(true); | |
switch (system.getErrno(system.futex_wake( | |
@ptrCast(*const i32, &self.state), | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE, | |
@as(i32, 1), | |
))) { | |
0 => {}, | |
system.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
} | |
else | |
struct { | |
is_locked: bool, | |
pub fn init() Lock { | |
return .{ .is_locked = false }; | |
} | |
pub fn deinit(self: *Lock) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Lock) Held { | |
while (@atomicRmw(bool, &self.is_locked, .Xchg, true, .Acquire)) | |
std.SpinLock.loopHint(); | |
return Held{ .inner = self }; | |
} | |
pub const Held = struct { | |
inner: *Lock, | |
pub fn release(self: Held) void { | |
@atomicStore(bool, &self.is_locked, false, .Release); | |
} | |
}; | |
}; | |
const Event = if (std.builtin.os.tag == .windows) | |
struct { | |
state: usize, | |
const UNSET = 0; | |
const WAITING = 1; | |
const IS_SET = std.math.maxInt(usize); | |
pub fn init() Event { | |
return .{ .state = UNSET }; | |
} | |
pub fn deinit(self: *Event) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Event) void { | |
var state = @atomicLoad(usize, &self.state, .Acquire); | |
while (true) { | |
if (state == IS_SET) | |
return; | |
state = @cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
state + WAITING, | |
.Acquire, | |
.Acquire, | |
) orelse { | |
const status = NtWaitForKeyedEvent(null, @ptrToInt(&self.state), system.FALSE, null); | |
std.debug.assert(status == .SUCCESS); | |
return; | |
}; | |
} | |
} | |
pub fn set(self: *Event) void { | |
var state = @atomicRmw(usize, &self.state, .Xchg, IS_SET, .Release); | |
if (state == IS_SET) | |
return; | |
while (state >= WAITING) : (state -= WAITING) { | |
const status = NtReleaseKeyedEvent(null, @ptrToInt(&self.state), system.FALSE, null); | |
std.debug.assert(status == .SUCCESS); | |
} | |
} | |
extern "NtDll" fn NtWaitForKeyedEvent( | |
handle: ?system.HANDLE, | |
key: usize, | |
alertable: system.BOOLEAN, | |
timeout: ?*const system.LARGE_INTEGER, | |
) callconv(system.WINAPI) system.NTSTATUS; | |
extern "NtDll" fn NtReleaseKeyedEvent( | |
handle: ?system.HANDLE, | |
key: usize, | |
alertable: system.BOOLEAN, | |
timeout: ?*const system.LARGE_INTEGER, | |
) callconv(system.WINAPI) system.NTSTATUS; | |
} | |
else if (std.builtin.link_libc) | |
struct { | |
is_set: bool, | |
cond: std.c.pthread_cond_t, | |
mutex: std.c.pthread_mutex_t, | |
pub fn init() Event { | |
return .{ | |
.is_set = false, | |
.cond = std.c.PTHREAD_COND_INTIALIZER, | |
.mutex = std.c.PTHREAD_MUTEX_INITIALIZER, | |
}; | |
} | |
pub fn deinit(self: *Event) void { | |
const safe_ret = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const ret_cond = std.c.pthread_cond_destroy(&self.cond); | |
std.debug.assert(ret_cond == 0 or ret_cond == safe_ret); | |
const ret_mutex = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(ret_mutex == 0 or ret_mutex == safe_ret); | |
self.* = undefined; | |
} | |
pub fn wait(self: *Event) void { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
while (self.is_set) | |
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == 0); | |
} | |
pub fn set(self: *Event) void { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
self.is_set = true; | |
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0); | |
} | |
} | |
else if (std.builtin.os.tag == .linux) | |
struct { | |
state: enum(i32) { | |
unset, | |
set, | |
}, | |
pub fn init() Event { | |
return .{ .state = .unset }; | |
} | |
pub fn deinit(self: *Event) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Event) void { | |
while (true) { | |
if (@atomicLoad(@TypeOf(self.state), &self.state, .Acquire) == .set) | |
return; | |
switch (system.getErrno(system.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT, | |
@enumToInt(@TypeOf(self.state).unset), | |
null, | |
))) { | |
0 => {}, | |
system.EINTR => {}, | |
system.EAGAIN => {}, | |
else => unreachable, | |
} | |
} | |
} | |
pub fn set(self: *Event) void { | |
@atomicStore(@TypeOf(self.state), &self.state, .set, .Release); | |
switch (system.getErrno(system.futex_wake( | |
@ptrCast(*const i32, &self.state), | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE, | |
std.math.maxInt(i32), | |
))) { | |
0 => {}, | |
system.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
} | |
else | |
struct { | |
is_set: bool, | |
pub fn init() Event { | |
return .{ .is_set = false }; | |
} | |
pub fn deinit(self: *Event) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Event) void { | |
while (!@atomicLoad(bool, &self.is_set, .Acquire)) | |
std.SpinLock.loopHint(); | |
} | |
pub fn set(self: *Event) void { | |
@atomicStore(bool, &self.is_set, true, .Release); | |
} | |
}; | |
}; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const system = std.os.system; | |
pub const Scheduler = struct { | |
pub const InitConfig = struct { | |
max_threads: ?u16 = null, | |
}; | |
pub fn init(self: *Scheduler, config: InitConfig) !void { | |
const max_threads = if (std.builtin.single_threaded) | |
@as(u16, 1) | |
else if (config.max_threads) |max_threads| | |
std.math.max(1, max_threads) | |
else | |
std.math.max(1, @intCast(u16, std.Thread.cpuCount() catch 1)); | |
} | |
pub fn deinit(self: *Scheduler) void { | |
} | |
pub const ScheduleConfig = struct { | |
}; | |
pub fn schedule(self: *Scheduler, config: ScheduleConfig, batchable: anytype) void { | |
} | |
pub const Runnable = struct { | |
}; | |
pub const Batch = struct { | |
}; | |
const ThreadPool = struct { | |
lock: Lock, | |
state: State, | |
spawned: ?*Node, | |
const State = union(enum) { | |
running: Running, | |
terminating: Terminating, | |
shutdown, | |
}; | |
const Running = struct { | |
idle: ?*Waiter, | |
spawned: usize, | |
spawnable: usize, | |
const Waiter = struct { | |
prev: ?*Waiter, | |
next: ?*Waiter, | |
wakeFn: fn(*Waiter) void, | |
}; | |
}; | |
const Terminating = struct { | |
cond: Condvar, | |
pending: usize, | |
}; | |
pub fn init(num_workers: usize) ThreadPool { | |
return ThreadPool{ | |
.lock = Lock.init(), | |
.spawned = null, | |
.state = .{ | |
.running = .{ | |
.idle = null, | |
.spawned = 0, | |
.spawnable = num_workers, | |
}, | |
}, | |
}; | |
} | |
pub fn deinit(self: *ThreadPool) void { | |
self.shutdown(); | |
self.join(); | |
self.lock.deinit(); | |
self.* = undefined; | |
} | |
pub fn spawn(self: *ThreadPool, comptime runFn: fn(*Node) void) void { | |
var spawned_thread = false; | |
var remaining_attempts: u8 = 5; | |
while (true) { | |
const held = self.lock.acquire(); | |
const running = switch (self.state) { | |
.running => |*running| blk: { | |
if (spawned_thread) | |
running.spawned -= 1; | |
break :blk running; | |
}, | |
.terminating => |*terminating| { | |
if (spawned_thread) | |
terminating.finishOne(); | |
held.release(); | |
return; | |
}, | |
}; | |
if (running.pop()) |waiter| { | |
held.release(); | |
waiter.wake(); | |
return; | |
} | |
if (running.spawned < running.spawnable and remaining_attempts > 0) { | |
running.spawned += 1; | |
held.release(); | |
if (self.spawnThread(runFn)) | |
return; | |
remaining_attempts -= 1; | |
continue; | |
} | |
held.release(); | |
return; | |
} | |
} | |
const Iter = struct { | |
node: ?*Node, | |
fn next(self: *Iter) ?*Node { | |
const node = self.node orelse return null; | |
self.node = node.next; | |
return node; | |
} | |
}; | |
pub fn iter(self: *ThreadPool) Iter { | |
const node = @atomicLoad(?*Node, &self.spawned, .Acquire); | |
return Iter{ .node = node }; | |
} | |
fn spawnThread(self: *ThreadPool, comptime runFn: fn(*Node) void) bool { | |
const Spawner = struct { | |
pool: *ThreadPool, | |
thread: *std.Thread, | |
put_event: Event, | |
get_event: Event, | |
fn entry(this: *@This()) void { | |
this.put_event.wait(); | |
const pool = this.pool; | |
const thread = this.thread; | |
this.get_event.set(); | |
var node: Node = undefined; | |
node.init(pool, thread); | |
defer node.deinit(); | |
runFn(&node); | |
} | |
}; | |
var spawner = Spawner{ | |
.pool = self, | |
.thread = undefined, | |
.put_event = Event.init(), | |
.get_event = Event.init(), | |
}; | |
defer { | |
spawner.put_event.deinit(); | |
spawner.get_event.deinit(); | |
} | |
spawner.thread = std.Thread.spawn(&spawner, Spawner.entry) catch return false; | |
spawner.put_event.set(); | |
spawner.get_event.wait(); | |
return true; | |
} | |
const Node = struct { | |
data: usize, | |
next: ?*Node, | |
pool: *ThreadPool, | |
thread: *std.Thread, | |
join_event: Event, | |
fn init(self: *Node, pool; *ThreadPool, thread: *std.Thread) void { | |
self.* = .{ | |
.next = undefined, | |
.pool = pool, | |
.thread = thread, | |
.join_event = Event.init(), | |
}; | |
self.next = @atomicLoad(?*Node, &pool.spawned, .Monotonic); | |
while (true) { | |
self.next = @cmpxchgWeak( | |
?*Node, | |
&pool.spawned, | |
self.next, | |
self, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
} | |
fn deinit(self: *Node) void { | |
self.join_event.wait(); | |
self.join_event.deinit(); | |
} | |
}; | |
pub fn shutdown(self: *ThreadPool) void { | |
var waiters = blk: { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
switch (self.state) { | |
.terminating => return, | |
.running => |running| { | |
self.state = .{ | |
.terminating => { | |
.pending = running.spawned, | |
.cond = Condvar.init(), | |
}, | |
}; | |
break :blk running.idle; | |
}, | |
} | |
}; | |
while (waiters) |waiter| { | |
waiters = waiter.next; | |
(waiter.wakeFn)(waiter); | |
} | |
} | |
fn join(self: *ThreadPool) void { | |
{ | |
var held = self.lock.acquire(); | |
defer held.release(); | |
defer self.state.terminating.cond.deinit(); | |
while (self.state.terminating.pending > 0) | |
held = self.state.terminating.cond.wait(held); | |
} | |
while (self.spawned) |node| { | |
self.spawned = node.next; | |
const thread = node.thread; | |
node.join_event.set(); | |
thread.wait(); | |
} | |
} | |
}; | |
const Condvar = if (std.builtin.os.tag == .windows) | |
struct { | |
cond: usize, | |
pub fn init() Condvar { | |
return .{ .cond = 0 }; | |
} | |
pub fn deinit(self: *Condvar) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held { | |
const status = SleepConditionVariableSRW( | |
&self.cond, | |
&held.inner.srwlock, | |
system.INFINITE, | |
@as(system.ULONG, 0), | |
); | |
std.debug.assert(status != system.FALSE); | |
return held; | |
} | |
pub fn notifyOne(self: *Condvar) void { | |
WakeConditionVariable(&self.cond); | |
} | |
pub fn notifyAll(self: *Condvar) void { | |
WakeAllConditionVariable(&self.cond); | |
} | |
extern "kernel32" fn SleepConditionVariableSRW( | |
cond: *usize, | |
srwlock: *usize, | |
timeout: system.DWORD, | |
flags: system.ULONG, | |
) callconv(system.WINAPI) system.BOOL; | |
extern "kernel32" fn WakeConditionVariable( | |
cond: *usize, | |
) callconv(system.WINAPI) void; | |
extern "kernel32" fn WakeAllConditionVariable( | |
cond: *usize, | |
) callconv(system.WINAPI) void; | |
} | |
else if (std.builtin.link_libc) | |
struct { | |
cond: std.c.pthread_cond_t, | |
pub fn init() Condvar { | |
return .{ .cond = std.c.PTHREAD_COND_INTIALIZER }; | |
} | |
pub fn deinit(self: *Condvar) void { | |
const safe_ret = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const ret_cond = std.c.pthread_cond_destroy(&self.cond); | |
std.debug.assert(ret_cond == 0 or ret_cond == safe_ret); | |
} | |
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held { | |
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &held.inner.mutex) == 0); | |
return held; | |
} | |
pub fn notifyOne(self: *Condvar) void { | |
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0); | |
} | |
pub fn notifyAll(self: *Condvar) void { | |
std.debug.assert(std.c.pthread_cond_broadcast(&self.cond) == 0); | |
} | |
} | |
else | |
struct { | |
lock: Lock, | |
waiters: std.SinglyLinkedList(Event), | |
pub fn init() Condvar { | |
return .{ | |
.lock = Lock.init(), | |
.waiters = .{}, | |
}; | |
} | |
pub fn deinit(self: *Condvar) void { | |
self.lock.deinit(); | |
self.* = undefined; | |
} | |
pub fn wait(self: *Condvar, held: Lock.Held) Lock.Held { | |
var waiter: @TypeOf(self.waiters).Node = undefined; | |
waiter.data = Event.init(); | |
defer waiter.data.deinit(); | |
{ | |
const waiters_held = self.lock.acquire(); | |
defer waiters_held.release(); | |
self.waiters.prepend(&waiter); | |
} | |
held.release(); | |
waiter.data.wait(); | |
return held.inner.acquire(); | |
} | |
pub fn notifyOne(self: *Condvar) void { | |
var maybe_waiter = blk: { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
break :blk self.waiters.popFirst(); | |
}; | |
if (maybe_waiter) |waiter| | |
waiter.data.set(); | |
} | |
pub fn notifyAll(self: *Condvar) void { | |
var waiters = blk: { | |
const held = self.lock.acquire(); | |
defer held.release(); | |
const waiters = self.waiters; | |
self.waiters = .{}; | |
break :blk waiters; | |
}; | |
while (waiters.popFirst()) |waiter| | |
waiter.data.set(); | |
} | |
}; | |
const Lock = if (std.builtin.os.tag == .windows) | |
struct { | |
srwlock: usize, | |
pub fn init() Lock { | |
return .{ .srwlock = 0 }; | |
} | |
pub fn deinit(self: *Lock) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Lock) Held { | |
AcquireSRWLockExclusive(&self.srwlock); | |
return Held{ .inner = self }; | |
} | |
pub const Held = struct { | |
inner: *Lock, | |
pub fn release(self: Held) void { | |
ReleaseSRWLockExclusive(&self.inner.srwlock); | |
} | |
}; | |
extern "kernel32" fn AcquireSRWLockExclusive( | |
srwlock: *usize, | |
) callconv(system.WINAPI) void; | |
extern "kernel32" fn ReleaseSRWLockExclusive( | |
srwlock: *usize, | |
) callconv(system.WINAPI) void; | |
} | |
else if (comptime std.Target.current.isDarwin()) | |
struct { | |
unfair_lock: u32, | |
pub fn init() Lock { | |
return .{ .unfair_lock = 0 }; | |
} | |
pub fn deinit(self: *Lock) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Lock) Held { | |
os_unfair_lock_lock(&self.unfair_lock); | |
return Held{ .inner = self }; | |
} | |
pub const Held = struct { | |
inner: *Lock, | |
pub fn release(self: Held) void { | |
os_unfair_lock_unlock(&self.inner.unfair_lock); | |
} | |
}; | |
extern "c" fn os_unfair_lock_lock( | |
unfair_lock: *u32, | |
) callconv(.C) void; | |
extern "c" fn os_unfair_lock_unlock( | |
unfair_lock: *u32, | |
) callconv(.C) void; | |
} | |
else if (std.builtin.link_libc) | |
struct { | |
mutex: std.c.pthread_mutex_t, | |
pub fn init() Lock { | |
return .{ .mutex = std.c.PTHREAD_MUTEX_INITIALIZER }; | |
} | |
pub fn deinit(self: *Lock) void { | |
const safe_ret = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const ret_mutex = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(ret_mutex == 0 or ret_mutex == safe_ret); | |
} | |
pub fn acquire(self: *Lock) Held { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
return Held{ .inner = self }; | |
} | |
pub const Held = struct { | |
inner: *Lock, | |
pub fn release(self: Held) void { | |
std.debug.assert(std.c.pthread_mutex_unlock(&self.inner.mutex) == 0); | |
} | |
}; | |
} | |
else if (std.builtin.os.tag == .linux) | |
struct { | |
state: enum(i32) { | |
unlocked, | |
locked, | |
waiting, | |
}, | |
pub fn init() Lock { | |
return .{ .state = .unlocked }; | |
} | |
pub fn deinit(self: *Lock) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Lock) Held { | |
const state = @atomicRmw(@TypeOf(self.state), &self.state, .Xchg, .locked, .Acquire); | |
if (state != .unlocked) | |
self.acquireSlow(); | |
} | |
fn acquireSlow(self: *Lock) void { | |
@setCold(true); | |
while (true) { | |
var spin: std.math.Log2Int(usize) = 0; | |
while (spin < 10) : (spin += 1) { | |
switch (@atomicLoad(@TypeOf(self.state), &self.state, .Monotonic)) { | |
.unlocked => _ = @cmpxchgWeak( | |
@TypeOf(self.state), | |
&self.state, | |
.unlocked, | |
new_state, | |
.Acquire, | |
.Monotonic, | |
) orelse return, | |
.locked => {}, | |
.waiting => break, | |
} | |
const iters = std.math.min(64, @as(usize, 1) << spin); | |
std.SpinLock.loopHint(iters); | |
} | |
const state = @atomicRmw(@TypeOf(self.state), &self.state, .Xchg, .waiting, .Acquire); | |
if (state == .unlocked) | |
return; | |
switch (system.getErrno(system.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT, | |
@enumToInt(@TypeOf(self.state).waiting), | |
null, | |
))) { | |
0 => {}, | |
system.EINTR => {}, | |
system.EAGAIN => {}, | |
else => unreachable, | |
} | |
} | |
} | |
pub const Held = struct { | |
inner: *Lock, | |
pub fn release(self: Held) void { | |
switch (@atomicRmw( | |
@TypeOf(self.inner.state), | |
&self.inner.state, | |
.Xchg, | |
.unlocked, | |
.Release, | |
)) { | |
.unlocked => unreachable, | |
.locked => {}, | |
.waiting => self.inner.releaseSlow(), | |
} | |
} | |
}; | |
fn releaseSlow(self: *Lock) void { | |
@setCold(true); | |
switch (system.getErrno(system.futex_wake( | |
@ptrCast(*const i32, &self.state), | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE, | |
@as(i32, 1), | |
))) { | |
0 => {}, | |
system.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
} | |
else | |
struct { | |
is_locked: bool, | |
pub fn init() Lock { | |
return .{ .is_locked = false }; | |
} | |
pub fn deinit(self: *Lock) void { | |
self.* = undefined; | |
} | |
pub fn acquire(self: *Lock) Held { | |
while (@atomicRmw(bool, &self.is_locked, .Xchg, true, .Acquire)) | |
std.SpinLock.loopHint(); | |
return Held{ .inner = self }; | |
} | |
pub const Held = struct { | |
inner: *Lock, | |
pub fn release(self: Held) void { | |
@atomicStore(bool, &self.is_locked, false, .Release); | |
} | |
}; | |
}; | |
const Event = if (std.builtin.os.tag == .windows) | |
struct { | |
state: usize, | |
const UNSET = 0; | |
const WAITING = 1; | |
const IS_SET = std.math.maxInt(usize); | |
pub fn init() Event { | |
return .{ .state = UNSET }; | |
} | |
pub fn deinit(self: *Event) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Event) void { | |
var state = @atomicLoad(usize, &self.state, .Acquire); | |
while (true) { | |
if (state == IS_SET) | |
return; | |
state = @cmpxchgWeak( | |
usize, | |
&self.state, | |
state, | |
state + WAITING, | |
.Acquire, | |
.Acquire, | |
) orelse { | |
const status = NtWaitForKeyedEvent(null, @ptrToInt(&self.state), system.FALSE, null); | |
std.debug.assert(status == .SUCCESS); | |
return; | |
}; | |
} | |
} | |
pub fn set(self: *Event) void { | |
var state = @atomicRmw(usize, &self.state, .Xchg, IS_SET, .Release); | |
if (state == IS_SET) | |
return; | |
while (state >= WAITING) : (state -= WAITING) { | |
const status = NtReleaseKeyedEvent(null, @ptrToInt(&self.state), system.FALSE, null); | |
std.debug.assert(status == .SUCCESS); | |
} | |
} | |
extern "NtDll" fn NtWaitForKeyedEvent( | |
handle: ?system.HANDLE, | |
key: usize, | |
alertable: system.BOOLEAN, | |
timeout: ?*const system.LARGE_INTEGER, | |
) callconv(system.WINAPI) system.NTSTATUS; | |
extern "NtDll" fn NtReleaseKeyedEvent( | |
handle: ?system.HANDLE, | |
key: usize, | |
alertable: system.BOOLEAN, | |
timeout: ?*const system.LARGE_INTEGER, | |
) callconv(system.WINAPI) system.NTSTATUS; | |
} | |
else if (std.builtin.link_libc) | |
struct { | |
is_set: bool, | |
cond: std.c.pthread_cond_t, | |
mutex: std.c.pthread_mutex_t, | |
pub fn init() Event { | |
return .{ | |
.is_set = false, | |
.cond = std.c.PTHREAD_COND_INTIALIZER, | |
.mutex = std.c.PTHREAD_MUTEX_INITIALIZER, | |
}; | |
} | |
pub fn deinit(self: *Event) void { | |
const safe_ret = switch (std.builtin.os.tag) { | |
.dragonfly, .netbsd => std.os.EAGAIN, | |
else => 0, | |
}; | |
const ret_cond = std.c.pthread_cond_destroy(&self.cond); | |
std.debug.assert(ret_cond == 0 or ret_cond == safe_ret); | |
const ret_mutex = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(ret_mutex == 0 or ret_mutex == safe_ret); | |
self.* = undefined; | |
} | |
pub fn wait(self: *Event) void { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
while (self.is_set) | |
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == 0); | |
} | |
pub fn set(self: *Event) void { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
self.is_set = true; | |
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0); | |
} | |
} | |
else if (std.builtin.os.tag == .linux) | |
struct { | |
state: enum(i32) { | |
unset, | |
set, | |
}, | |
pub fn init() Event { | |
return .{ .state = .unset }; | |
} | |
pub fn deinit(self: *Event) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Event) void { | |
while (true) { | |
if (@atomicLoad(@TypeOf(self.state), &self.state, .Acquire) == .set) | |
return; | |
switch (system.getErrno(system.futex_wait( | |
@ptrCast(*const i32, &self.state), | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAIT, | |
@enumToInt(@TypeOf(self.state).unset), | |
null, | |
))) { | |
0 => {}, | |
system.EINTR => {}, | |
system.EAGAIN => {}, | |
else => unreachable, | |
} | |
} | |
} | |
pub fn set(self: *Event) void { | |
@atomicStore(@TypeOf(self.state), &self.state, .set, .Release); | |
switch (system.getErrno(system.futex_wake( | |
@ptrCast(*const i32, &self.state), | |
system.FUTEX_PRIVATE_FLAG | system.FUTEX_WAKE, | |
std.math.maxInt(i32), | |
))) { | |
0 => {}, | |
system.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
} | |
else | |
struct { | |
is_set: bool, | |
pub fn init() Event { | |
return .{ .is_set = false }; | |
} | |
pub fn deinit(self: *Event) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Event) void { | |
while (!@atomicLoad(bool, &self.is_set, .Acquire)) | |
std.SpinLock.loopHint(); | |
} | |
pub fn set(self: *Event) void { | |
@atomicStore(bool, &self.is_set, true, .Release); | |
} | |
}; | |
}; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment