-
-
Save kprotty/08b9bc0658f57cb9412ca48ebe653a66 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Futex = std.Thread.Futex; | |
const assert = std.debug.assert; | |
const Atomic = std.atomic.Atomic; | |
const Task = @This(); | |
next: ?*Task = null, | |
runFn: fn(*Task) void, | |
pub const Group = struct { | |
head: ?*Task = null, | |
tail: ?*Task = null, | |
pub fn from(task: *Task) Group { | |
return .{ .head = task, .tail = task }; | |
} | |
pub fn push(self: *Group, group: Group) { | |
const tail = group.tail orelse return; | |
tail.next = self.head; | |
self.head = group.head; | |
self.tail = self.tail orelse tail; | |
} | |
pub fn pop(self: *Group) ?*Task { | |
const task = self.head orelse return null; | |
self.head = task.next; | |
if (self.head == null) self.tail = null; | |
return task; | |
} | |
}; | |
pub const Scheduler = struct { | |
max_threads: u14, | |
stack_size: usize, | |
runnable: List = .{}, | |
workers: Atomic(?*Worker) = Atomic(?*Worker).init(null), | |
sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})), | |
const Sync = packed struct { | |
idle: u14 = 0, | |
spawned: u14 = 0, | |
reserved: bool = false, | |
notified: bool = false, | |
state: enum(u2) { | |
pending = 0, | |
waking, | |
signaled, | |
shutdown, | |
}, | |
}; | |
pub fn schedule(self: *Scheduler, group: Group) error{Shutdown}!void { | |
assert(group.head != null); | |
assert(group.tail != null); | |
var maybe_worker = Worker.current; | |
if (maybe_worker) |worker| { | |
worker.push(group); | |
} else { | |
self.runnable.push(group); | |
} | |
return self.notifyWith(maybe_worker); | |
} | |
fn notifyWith( | |
noalias self: *Scheduler, | |
noalias worker: ?*Worker, | |
) error{Shutdown}!void { | |
var max_spawned = self.max_threads; | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
var is_waking = if (worker) |w| w.state == .waking else false; | |
while (true) { | |
if (sync.state == .shutdown) return error.Shutdown; | |
if (is_waking) assert(sync.state == .waking); | |
const can_wake = is_waking or sync.state == .pending; | |
var new_sync = sync; | |
new_sync.notified = true; | |
if (can_wake and sync.idle > 0) { | |
new_sync.state = .signaled; | |
} else if (can_wake and sync.spawned < max_spawned) { | |
new_sync.state = .signaled; | |
new_sync.spawned += 1; | |
} else if (is_waking) { | |
new_sync.state = .pending; | |
} else if (sync.notified) { | |
break; | |
} | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(u32, sync), | |
@bitCast(u32, new_sync), | |
.Release, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
if (can_wake and sync.idle > 0) { | |
Futex.wake(&self.sync, 1); | |
} else if (can_wake and sync.spawned < max_spawned) { | |
Worker.spawn(self) catch self.complete(); | |
} | |
return; | |
} | |
} | |
fn waitWith( | |
noalias self: *Scheduler, | |
noalias worker: *Worker, | |
) error{Shutdown}!void { | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
wait_loop: while (true) { | |
if (sync.state == .shutdown) { | |
worker.state = .shutdown; | |
self.complete(); | |
return error.Shutdown; | |
} | |
const can_wake = sync.notified or sync.state == .signaled; | |
if (can_wake or worker.state != .idle) { | |
var new_sync = sync; | |
new_sync.notified = false; | |
if (can_wake) { | |
if (sync.state == .signaled) new_sync.state = .waking; | |
if (worker.state == .idle) new_sync.idle -= 1; | |
} else { | |
if (worker.state == .waking) new_sync.state = .pending; | |
new_sync.idle += 1; | |
} | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(u32, sync), | |
@bitCast(u32, new_sync), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
if (can_wake) { | |
if (worker.state == .idle) worker.state = .running; | |
if (sync.state == .signaled) worker.state = .waking; | |
return; | |
} | |
sync = new_sync; | |
worker.state = .idle; | |
} | |
var spin: u8 = 10; | |
while (spin > 0) : (spin -= 1) { | |
std.atomic.spinLoopHint(); | |
sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
if (sync.notified or sync.state == .shutdown) { | |
continue :wait_loop; | |
} | |
} | |
Futex.wait(&self.sync, @bitCast(u32, sync), null) catch unreachable; | |
sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
} | |
} | |
pub fn shutdown(self: *Scheduler) void { | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (sync.state != .shutdown) { | |
var new_sync = sync; | |
new_sync.idle = 0; | |
new_sync.notified = false; | |
new_sync.state = .shutdown; | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(u32, sync), | |
@bitCast(u32, new_sync), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
if (sync.idle > 0) Futex.wake(&self.sync, 1); | |
return; | |
} | |
} | |
pub fn join(self: *Scheduler) void { | |
completed: { | |
var spin: u8 = 10; | |
var is_waiting = false; | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (true) { | |
assert(sync.state == .shutdown); | |
if (sync.spawned == 0) { | |
break :completed; | |
} | |
if (spin > 0) { | |
spin -= 1; | |
std.atomic.spinLoopHint(); | |
sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
continue; | |
} | |
if (!is_waiting) { | |
var new_sync = sync; | |
new_sync.idle += 1; | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(u32, sync), | |
@bitCast(u32, new_sync), | |
.Monotonic, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
sync = new_sync; | |
is_waiting = true; | |
} | |
Futex.wait(&self.sync, @bitCast(u32, sync), null) catch unreachable; | |
sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
} | |
} | |
var worker = self.workers.load(.Monotonic) orelse return; | |
worker = self.workers.swap(null, .Acquire) orelse return; | |
worker.complete(); | |
} | |
fn complete(self: *Scheduler) void { | |
var sync = Sync{ .spawned = 1 }; | |
sync = @bitCast(Sync, self.sync.fetchSub(@bitCast(u32, sync), .AcqRel)); | |
assert(sync.state == .shutdown); | |
assert(sync.spawned >= 1); | |
if (sync.spawned == 0 and sync.idle > 0) { | |
Futex.wake(&self.sync, sync.idle); | |
} | |
} | |
}; | |
const Worker = struct { | |
next: ?*Worker = null, | |
buffer: Buffer = .{}, | |
runnable: List = .{}, | |
target: ?*Worker = null, | |
joined: Atomic(u32) = Atomic(u32).init(0), | |
state: enum { | |
running, | |
waking, | |
idle, | |
shutdown, | |
} = .running, | |
threadlocal var current: ?*Worker = null; | |
fn spawn(noalias scheduler: *Scheduler) !void { | |
const thread = try std.Thread.spawn( | |
.{ .stack_size = scheduler.stack_size }, | |
Worker.run, | |
.{ scheduler }, | |
); | |
thread.detach(); | |
} | |
fn run(noalias scheduler: *Scheduler) void { | |
var self = Worker{}; | |
current = &self; | |
var workers = scheduler.workers.load(.Monotonic); | |
while (true) { | |
self.next = workers; | |
workers = scheduler.workers.tryCompareAndSwap( | |
workers, | |
self, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
while (true) { | |
const result = self.pop() orelse { | |
scheduler.waitWith(&self) catch return self.join(); | |
continue; | |
}; | |
if (result.pushed or self.state == .waking) { | |
scheduler.notifyWith(&self); | |
} | |
self.state = .running; | |
(result.task.runFn)(result.task); | |
} | |
} | |
fn join(self: *Worker) void { | |
while (self.joined.load(.Acquire) == 0) { | |
Futex.wait(&self.joined, 0, null) catch unreachable; | |
} | |
const next_worker = self.next orelse return; | |
next_worker.complete(); | |
} | |
fn complete(self: *Worker) void { | |
self.joined.store(1, .Release); | |
Futex.wake(&self.joined, 1); | |
} | |
fn push(self: *Worker, group: Group) void { | |
if (self.buffer.push(group)) |overflowed| { | |
self.runnable.push(overflowed); | |
} | |
} | |
fn pop(noalias self: *Worker, noalias scheduler: *Scheduler) ?Buffer.Pop { | |
var check_runnable = true; | |
if (self.buffer.pop()) |result| { | |
return result; | |
} | |
var attempts: u8 = 4; | |
while (attempts > 0) : (attempts -= 1) { | |
if (check_runnable) blk: { | |
return self.buffer.steal(&self.runnable) catch |err| { | |
check_runnable = err != error.Empty; | |
break :blk; | |
}; | |
} | |
var was_contended = false; | |
defer if (was_contended) { | |
attempts += 1; | |
}; | |
var num_workers = @bitCast(Sync, scheduler.sync.load(.Monotonic)).spawned; | |
while (num_workers > 0) : (num_workers -= 1) { | |
const target = self.target orelse blk: { | |
self.target = scheduler.workers.load(.Acquire); | |
break :blk self.target.?; | |
}; | |
if (target != self) blk: { | |
return self.buffer.steal(&target.runnable) catch |err| { | |
if (err == error.Contended) was_contended = true; | |
return self.buffer.steal(&target.buffer) catch |run_err| { | |
if (run_err == error.Contended) was_contended = true; | |
break :blk; | |
}; | |
}; | |
} | |
self.target = target.next; | |
std.atomic.spinLoopHint(); | |
} | |
return self.buffer.steal(&scheduler.runnable) catch |err| { | |
if (err == error.Contended) was_contended = true; | |
continue; | |
}; | |
} | |
return null; | |
} | |
}; | |
const List = struct { | |
input: Atomic(?*Task) = Atomic(?*Task).init(null), | |
output: Atomic(usize) = Atomic(usize).init(0), | |
const IS_OUTPUT_EMPTY: usize = 0; | |
const IS_OUTPUT_CONSUMING: usize = 1; | |
fn push( | |
self: *List, | |
group: Group, | |
comptime producer_type: enum{ single_producer, multi_producer }, | |
) void { | |
const head = group.head orelse unreachable; | |
const tail = group.tail orelse unreachable; | |
var input = self.input.load(.Monotonic); | |
while (true) { | |
tail.next = input; | |
if (producer_type == .single_producer and input == null) { | |
self.input.store(head, .Release); | |
break; | |
} | |
input = self.input.tryCompareAndSwap( | |
input, | |
head, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
} | |
fn consume(self: *List) error{Empty, Contended}!Consumer { | |
const output = self.output.load(.Monotonic); | |
if (output == IS_OUTPUT_CONSUMING) { | |
return error.Contended; | |
} | |
if (output == IS_OUTPUT_EMPTY) { | |
if (self.input.load(.Monotonic) == null) { | |
return error.Empty; | |
} | |
} | |
if (self.output.compareAndSwap( | |
output, | |
IS_OUTPUT_CONSUMING, | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
return error.Contended; | |
} | |
return Consumer{ | |
.output = @intToPtr(*Task, output), | |
.list = self, | |
}; | |
} | |
const Consumer = struct { | |
output: ?*Task, | |
list: *List, | |
fn pop(self: *Consumer) ?*Task { | |
if (self.output) |task| { | |
self.output = task.next; | |
return task; | |
} | |
var input = self.list.input.load(.Monotonic) orelse return null; | |
input = self.list.input.swap(null, .Acquire) orelse unreachable; | |
self.output = input.next; | |
return input; | |
} | |
fn release(self: Consumer) void { | |
assert(self.list.output.loadUnchecked() == IS_OUTPUT_CONSUMING); | |
const output = @ptrToInt(self.output); | |
self.list.output.store(output, .Release); | |
} | |
}; | |
}; | |
const Buffer = struct { | |
head: Atomic(i32) = Atomic(i32).init(0), | |
tail: Atomic(i32) = Atomic(i32).init(0), | |
array: [256]Atomic(*Task) = undefined, | |
fn push(self: *Buffer, group: Group) ?Group { | |
const task = group.head orelse unreachable; | |
if (group.tail != task) { | |
return group; | |
} | |
var tail = self.tail.loadUnchecked(); | |
var head = self.head.load(.Monotonic); | |
while (true) { | |
if (tail -% head < self.array.len) { | |
self.array[tail % self.array.len].store(task, .Unordered); | |
self.tail.store(tail +% 1, .Release); | |
return null; | |
} | |
var migrate = @intCast(i32, self.array.len / 2); | |
if (self.head.tryCompareAndSwap( | |
head, | |
head +% migrate, | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
head = updated; | |
continue; | |
} | |
head +%= migrate - 1; | |
var overflowed = Group.from(task); | |
while (migrate > 0) : (migrate -= 1) { | |
const index = head % self.array.len; | |
const task = self.array[index].loadUnchecked(); | |
overflowed.push(Group.from(task)); | |
head -%= 1; | |
} | |
return overflowed; | |
} | |
} | |
const Pop = struct { | |
task: *Task, | |
pushed: bool = false, | |
}; | |
fn pop(self: *Buffer) ?Pop { | |
var tail = self.tail.loadUnchecked(); | |
var head = self.head.load(.Monotonic); | |
if (head == tail) { | |
return null; | |
} | |
const new_tail = tail -% 1; | |
self.tail.store(new_tail, .SeqCst); | |
head = self.head.load(.SeqCst); | |
var task: ?*Task = null; | |
if (head != tail) { | |
task = self.array[new_tail % self.array.len].loadUnchecked(); | |
if (head == new_tail) { | |
return Pop{ .task = task }; | |
} | |
_ = self.head.compareAndSwap( | |
head, | |
tail, | |
.SeqCst, | |
.Monotonic, | |
) orelse return Pop{ .task = task }; | |
task = null; | |
} | |
self.tail.store(tail, .Monotonic); | |
return null; | |
} | |
fn steal( | |
noalias self: *Buffer, | |
noalias target: anytype, | |
) error{Empty, Contended}!Pop { | |
const is_buffer = switch (@TypeOf(target)) { | |
*List => false, | |
*Buffer => true, | |
else => |t| @compileError("Can't steal from " ++ @typeName(t)), | |
}; | |
if (is_buffer) { | |
const head = target.head.load(.Acquire); | |
const tail = target.tail.load(.Acquire); | |
if (tail -% head <= 0) { | |
return error.Empty; | |
} | |
const index = head % target.array.len; | |
const task = target.array[index].load(.Unordered); | |
_ = target.head.compareAndSwap( | |
head, | |
head +% 1, | |
.SeqCst, | |
.Monotonic, | |
) orelse return Pop{ .task = task }; | |
return error.Contended; | |
} | |
var consumer = try target.consume(); | |
defer consumer.release(); | |
const tail = self.tail.loadUnchecked(); | |
var new_tail = tail; | |
if (std.debug.runtime_safety) { | |
const head = self.head.load(.Monotonic); | |
assert(tail == head); | |
} | |
var free_slots = self.array.len; | |
while (free_slots > 0) : (free_slots -= 1) { | |
const task = consumer.pop() orelse break; | |
self.array[new_tail % self.array.len].store(task, .Unordered); | |
new_tail +%= 1; | |
} | |
const task = consumer.pop() orelse blk: { | |
if (new_tail == tail) break :blk null; | |
new_tail -%= 1; | |
break :blk self.array[new_tail % self.array.len].loadUnchecked(); | |
}; | |
const pushed = new_tail != tail; | |
if (pushed) { | |
self.tail.store(new_tail, .Release); | |
} | |
return Pop{ | |
.task = task, | |
.pushed = pushed, | |
}; | |
} | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Futex = std.Thread.Futex; | |
const assert = std.debug.assert; | |
const Atomic = std.atomic.Atomic; | |
const arch = std.Target.current.cpu.arch; | |
const ThreadPool = @This(); | |
config: Config, | |
runnable: List = .{}, | |
workers: Atomic(?*Worker) = Atomic(?*Worker).init(null), | |
sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})), | |
pub const Config = struct { | |
max_threads: u14, | |
stack_size: usize = (std.Thread.SpawnConfig{}).stack_size, | |
}; | |
pub fn init(config: Config) ThreadPool { | |
return .{ .config = config }; | |
} | |
pub fn deinit(self: *ThreadPool) void { | |
return self.waitForShutdown(); | |
} | |
pub const Runnable = struct { | |
next: ?*Runnable = undefined, | |
runFn: fn(*Runnable, *ThreadPool) void, | |
}; | |
pub const Batch = struct { | |
head: ?*Batch = null, | |
tail: *Batch = undefined, | |
pub fn from(runnable: *Runnable) Batch { | |
runnable.next = null; | |
return Batch{ | |
.head = runnable, | |
.tail = runnable, | |
}; | |
} | |
pub fn isEmpty(self: Batch) bool { | |
return self.head == null; | |
} | |
pub fn push(noalias self: *Batch, batch: Batch) void { | |
if (batch.isEmpty()) return; | |
if (self.isEmpty()) { | |
self.* = batch; | |
} else { | |
batch.tail.next = self.head; | |
self.head = batch.head; | |
} | |
} | |
}; | |
pub fn getScheduler(self: *ThreadPool) Scheduler { | |
return Scheduler { | |
.thread_pool = self, | |
.producer = blk: { | |
if (Worker.tls_current) |worker| { | |
break :blk .{ .local = worker.getProducer() }; | |
} | |
const is_single_producer = false; | |
break :blk .{ .remote = self.runnable.getProducer(is_single_producer) }; | |
}, | |
}; | |
} | |
pub const Scheduler = struct { | |
thread_pool: *ThreadPool, | |
producer: union(enum) { | |
local: Worker.Producer, | |
remote: List.Producer, | |
}, | |
pub fn submit( | |
noalias self: *Scheduler, | |
noalias runnable: *Runnable, | |
) void { | |
return self.submitBatch(Batch.from(runnable)); | |
} | |
pub fn submitBatch(noalias self: *Scheduler, batch: Batch) void { | |
if (batch.isEmpty()) return; | |
switch (self.producer) { | |
.local => |*producer| producer.push(batch), | |
.remove => |*producer| producer.push(batch), | |
} | |
} | |
pub fn schedule(self: Scheduler) void { | |
if (switch (self.producer) { | |
.local => |producer| producer.commit(), | |
.remove => |producer| producer.commit(), | |
}) { | |
const is_waking = false; | |
self.thread_pool.notify(is_waking); | |
} | |
} | |
}; | |
const Spin = struct { | |
count: u8 = if (arch.isX86()) 100 else 10, | |
fn yield(self: *Spin) bool { | |
if (self.count == 0) return false; | |
std.atomic.spinLoopHint(); | |
self.count -= 1; | |
} | |
}; | |
const Sync = packed struct { | |
idle: u14 = 0, | |
spawned: u14 = 0, | |
_reserved: u1 = 0, | |
notified: bool = false, | |
state: enum(u2) { | |
pending = 0, | |
waking, | |
signaled, | |
shutdown, | |
}; | |
}; | |
fn register(noalias self: *ThreadPool, noalias worker: *Worker) bool { | |
var workers = self.workers.load(.Monotonic); | |
while (true) { | |
worker.next = workers; | |
workers = self.workers.tryCompareAndSwap( | |
workers, | |
worker, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (true) { | |
if (sync.state == .shutdown) { | |
return false; | |
} | |
var new_sync = sync; | |
new_sync.notified = false; | |
if (sync.state == .signaled) { | |
new_sync.state = .waking; | |
} else if (sync.notified) { | |
return false; | |
} | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(u32, sync), | |
@bitCast(u32, new_sync), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
const is_waking = sync.state == .signaled; | |
return is_waking; | |
} | |
} | |
fn unregister(noalias self: *ThreadPool, noalias maybe_worker: ?*Worker) void { | |
const remove = @bitCast(u32, Sync{ .spawned = 1 }); | |
const updated = self.sync.fetchSub(remove, .AcqRel); | |
const sync = @bitCast(Sync, updated); | |
assert(sync.state == .shutdown); | |
assert(sync.spawned >= 1); | |
if (sync.spawned == 1 and sync.idle != 0) { | |
const notify_all = std.math.maxInt(u32); | |
Futex.wake(&self.sync, notify_all); | |
} | |
const worker = maybe_worker orelse return; | |
worker.join(); | |
const next_worker = worker.next orelse return; | |
next_worker.shutdown(); | |
} | |
pub fn isShutdown(self: *const ThreadPool) bool { | |
const sync = @bitCast(Sync, self.sync.load(.Acquire)); | |
return sync.state == .shutdown; | |
} | |
pub fn shutdown(self: *ThreadPool) void { | |
@setCold(true); | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (sync.state != .shutdown) { | |
var new_sync = sync; | |
new_sync.idle = 0; | |
new_sync.notified = true; | |
new_sync.state = .shutdown; | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(u32, sync), | |
@bitCast(u32, new_sync), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
if (sync.idle > 0) { | |
const notify_all = std.math.maxInt(u32); | |
Futex.wake(&self.sync, notify_all); | |
} | |
return; | |
} | |
} | |
fn join(noalias self: *ThreadPool) void { | |
@setCold(true); | |
var spin = Spin{}; | |
var is_waiting = false; | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (true) { | |
assert(sync.state == .shutdown); | |
if (sync.spawned == 0) { | |
break; | |
} | |
if (spin.yield()) { | |
sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
continue; | |
} | |
if (!is_waiting) { | |
var old_sync = sync; | |
sync.idle += 1; | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(u32, old_sync), | |
@bitCast(u32, sync), | |
.Monotonic, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
} | |
Futex.wait(&self.sync, @bitCast(u32, sync), null) catch unreachable; | |
sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
is_waiting = true; | |
} | |
cosnt worker = self.workers.load(.Acquire) orelse return; | |
worker.shutdown(); | |
} | |
fn notify(noalias self: *ThreadPool, is_waking: bool) callconv(.Inline) void { | |
const sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
if (!sync.notified) { | |
self.notifySlow(is_waking); | |
} | |
} | |
fn notifySlow(noalias self: *ThreadPool, is_waking: bool) void { | |
@setCold(true); | |
const max_spawn = self.max_threads; | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (true) { | |
if (sync.state == .shutdown) return; | |
if (is_waking) assert(sync.state == .waking); | |
const can_wake = is_waking or (sync.state == .pending); | |
var new_sync = sync; | |
new_sync.notified = true; | |
if (sync.idle > 0 and can_wake) { | |
new_sync.state = .signaled; | |
} else if (sync.spawned < max_spawn and can_wake) { | |
new_sync.state = .signaled; | |
new_sync.spawned += 1; | |
} else if (is_waking) { | |
new_sync.state = .pending; | |
} else if (sync.notified) { | |
return; | |
} | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(u32, sync), | |
@bitCast(u32, new_sync), | |
.Release, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
if (sync.idle > 0 and can_wake) { | |
Futex.wake(&self.sync, 1); | |
} else if (sync.spawned < max_spawn and can_wake) { | |
Worker.spawn(self) catch self.unregister(null); | |
} | |
return; | |
} | |
} | |
fn wait(noalias self: *ThreadPool, is_waking: bool) error{Shutdown}!bool { | |
@setCold(true); | |
var spin = Spin{}; | |
var is_waiting = false; | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (true) { | |
if (sync.state == .shutdown) { | |
return error.Shutdown; | |
} | |
if (sync.notified or !is_waiting) { | |
var new_sync = sync; | |
new_sync.notified = false; | |
if (sync.notified) { | |
if (sync.state == .signaled) new_sync.state = .waking; | |
if (is_waiting) new_sync.idle -= 1; | |
} else { | |
if (is_waking) new_sync.state = .pending; | |
new_sync.idle += 1; | |
} | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(u32, sync), | |
@bitCast(u32, new_sync), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
is_waiting = !sync.notified; | |
if (!is_waiting) { | |
if (sync.state == .signaled) return true; | |
if (was_waiting) return false; | |
return is_waking; | |
} | |
} | |
if (spin.yield()) { | |
sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
continue; | |
} | |
Futex.wait(&self.sync, @bitCast(u32, sync), null) catch unreachable; | |
sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
} | |
} | |
const Worker = struct { | |
runnable: List = .{}, | |
buffer: Buffer = .{}, | |
next: ?*Worker = undefined, | |
state: Atomic(State) = Atomic(State).init(.running), | |
const State = enum(u32) { | |
running, | |
joining, | |
shutdown, | |
}; | |
threadlocal var tls_current: ?*Worker = null; | |
fn spawn(noalias thread_pool: *ThreadPool) !void { | |
const thread = try std.Thread.spawn( | |
.{ .stack_size = thread_pool.config.stack_size }, | |
Worker.run, | |
.{ thread_pool }, | |
); | |
thread.detach(); | |
} | |
fn run(noalias thread_pool: *ThreadPool) void { | |
var self = Worker{}; | |
tls_current = &self; | |
var is_waking = thread_pool.register(&self); | |
defer thread_pool.unregister(&self); | |
var steal_target: ?*Worker = null; | |
while (true) { | |
const popped = self.pop(thread_pool, &steal_target) orelse { | |
is_waking = thread_pool.wait(is_waking) catch break; | |
continue; | |
}; | |
if (popped.pushed or is_waking) { | |
thread_pool.notify(is_waking); | |
} | |
is_waking = false; | |
(popped.runnable.runFn)(popped.runnable, thread_pool); | |
} | |
} | |
fn getProducer(noalias self: *Worker) Producer { | |
const is_single_producer = true; | |
return Producer{ | |
.buffer_producer = self.buffer.getProducer(), | |
.list_producer = self.runnable.getProducer(is_single_producer), | |
}; | |
} | |
const Producer = struct { | |
buffer_producer: Buffer.Producer, | |
list_producer: List.Producer, | |
fn push(noalias self: *Producer, batch: Batch) void { | |
const head = batch.head orelse unreachable; | |
if (batch.tail == head) { | |
self.pushRunnable(head); | |
} else { | |
self.list_producer.push(batch); | |
} | |
} | |
fn pushRunnable(noalias self: *Producer, noalias runnable: *Runnable) void { | |
const overflowed = self.buffer_producer.push(runnable) orelse return; | |
self.list_producer.push(runnable); | |
} | |
fn commit(self: Producer) bool { | |
const buffer_committed = self.buffer_producer.commit(); | |
const list_committed = self.list_producer.commit(); | |
return buffer_committed or list_committed; | |
} | |
}; | |
fn pop( | |
self: *Worker, | |
noalias thread_pool: *ThreadPool, | |
noalias steal_target_ptr: *?*Worker, | |
) callconv(.Inline) ?Buffer.Popped { | |
if (self.buffer.pop()) |runnable| { | |
return Buffer.Popped{ .runnable = runnable }; | |
} | |
return self.steal(thread_pool, steal_target_ptr); | |
} | |
fn steal( | |
self: *Worker, | |
noalias thread_pool: *ThreadPool, | |
noalias steal_target_ptr: *?*Worker, | |
) ?Buffer.Popped { | |
@setCold(true); | |
var attempts: u8 = if (arch.isX86()) 32 else 8; | |
while (attempts > 0) : (attempts -= 1) { | |
if (self.buffer.consume(&self.runnable)) |popped| { | |
return popped; | |
} | |
var num_workers: u16 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned; | |
while (num_workers > 0) : (num_workers -= 1) { | |
const target_worker = steal_target_ptr.* orelse blk: { | |
const new_target = thread_pool.workers.load(.Acquire) orelse unreachable; | |
steal_target_ptr.* = new_target.next; | |
break :blk new_target; | |
}; | |
if (target_worker != self) { | |
if (self.buffer.consume(&target_worker.runnable)) |popped| { | |
return popped; | |
} else if (target_worker.steal()) |runnable| { | |
return Buffer.Popped{ .runnable = runnable }; | |
} | |
} | |
} | |
if (self.buffer.consume(&thread_pool.runnable)) |popped| { | |
return popped; | |
} | |
std.atomic.spinLoopHint(); | |
} | |
return null; | |
} | |
fn shutdown(noalias self: *Worker) void { | |
const state = self.state.swap(.shutdown, .Release); | |
const ptr = @ptrCast(*const Atomic(u32), &self.state); | |
switch (state) { | |
.running => {}, | |
.joining => Futex.wake(ptr, 1), | |
.shutdown => unreachable, | |
} | |
} | |
fn join(noalias self: *Worker) void { | |
var spin = Spin{}; | |
while (true) { | |
const state = self.state.load(.Acquire); | |
if (state == .shutdown) { | |
return; | |
} | |
if (state == .running) { | |
if (spin.yield()) { | |
continue; | |
} | |
if (self.state.compareAndSwap( | |
.running, | |
.joining, | |
.Acquire, | |
.Acquire, | |
)) |updated| { | |
assert(updated == .shutdown); | |
return; | |
} | |
} | |
Futex.wait( | |
@ptrCast(*const Atomic(u32), &self.state), | |
@enumToInt(State.joining), | |
null, | |
) catch unreachable; | |
} | |
} | |
}; | |
const List = struct { | |
input: Atomic(?*Runnable) = Atomic(?*Runnable).init(null), | |
output: Atomic(usize) = Atomic(usize).init(IS_EMPTY), | |
const IS_EMPTY: usize = 0b0; | |
const IS_CONSUMING: usize = 0b1; | |
comptime { | |
assert(@alignOf(Runnable) >= (IS_CONSUMING << 1)); | |
} | |
fn getProducer(noalias self: *List, is_single_producer: bool) Producer { | |
return Producer{ | |
.list = self, | |
.is_single_producer = is_single_producer, | |
}; | |
} | |
const Producer = struct { | |
list: *List, | |
batch: Batch = .{}, | |
is_single_producer: bool, | |
fn push(noalias self: *Producer, batch: Batch) void { | |
self.batch.push(batch); | |
} | |
fn commit(self: Producer) bool { | |
const head = self.batch.head orelse return false; | |
const tail = self.batch.tail; | |
var input = self.list.input.load(.Monotonic); | |
while (true) { | |
tail.next = input; | |
if (input == null and self.is_single_producer) { | |
self.list.input.store(head, .Release); | |
return true; | |
} | |
input = self.list.input.tryCompareAndSwap( | |
input, | |
head, | |
.Release, | |
.Monotonic, | |
) orelse return true; | |
} | |
} | |
}; | |
fn getConsumer(noalias self: *List) ?Consumer { | |
var output = self.output.load(.Monotonic); | |
if (output == IS_CONSUMING) return null; | |
if (output == 0 and self.input.load(.Monotonic) == null) { | |
return null; | |
} | |
acquired: { | |
if (comptime arch.isX86()) { | |
output = self.output.swap(IS_CONSUMING, .Acquire); | |
if (output == IS_CONSUMING) return null; | |
break :acquired; | |
} | |
while (true) { | |
output = self.output.tryCompareAndSwap( | |
output, | |
IS_CONSUMING, | |
.Acquire, | |
.Monotonic, | |
) orelse break :acquired; | |
if (output == IS_CONSUMING) return null; | |
if (output == 0 and self.input.load(.Monotonic) == null) { | |
return null; | |
} | |
} | |
} | |
return Consumer{ | |
.list = self, | |
.output = @intToPtr(?*Runnable, output), | |
}; | |
} | |
const Consumer = struct { | |
list: *List, | |
output: ?*Runnable, | |
fn pop(self: *Consumer) ?*Runnable { | |
if (self.output) |runnable| { | |
self.output = runnable.next; | |
return runnable; | |
} | |
var input = self.list.input.load(.Monotonic) orelse return null; | |
input = self.list.input.swap(null, .Acquire) orelse unreachable; | |
self.output = input.next; | |
return input; | |
} | |
fn release(self: Consumer) void { | |
const output = @ptrToInt(self.output); | |
self.list.output.store(output, .Release); | |
} | |
}; | |
}; | |
const Buffer = struct { | |
head: Atomic(Index) align(std.atomic.cache_line) = Atomic(Index).init(0), | |
tail: Atomic(Index) align(std.atomic.cache_line) = Atomic(Index).init(0), | |
array: [capacity]Atomic(*Runnable) = undefined, | |
const Index = i32; | |
const capacity = 256; | |
comptime { | |
assert(@typeInfo(Index).Int.signedness == .signed); | |
assert(std.math.maxInt(Index) >= capacity); | |
} | |
fn getProducer(self: *Buffer) Producer { | |
const tail = self.tail.loadUnchecked(); | |
const head = self.head.load(.Monotonic); | |
const size = tail -% head; | |
assert(size >= 0 and size <= capacity); | |
return Producer { | |
.buffer = self, | |
.tail = tail, | |
.pushed = 0, | |
.remaining = capacity - size, | |
}; | |
} | |
const Producer = struct { | |
buffer: *Buffer, | |
tail: Index, | |
pushed: Index, | |
remaining: Index, | |
fn push(noalias self: *Producer, noalias runnable: *Runnable) ?Batch { | |
return switch (self.remaining) { | |
0 => self.pushOverflow(runnable), | |
else => self.pushRunnable(runnable), | |
}; | |
} | |
fn pushRunnable(noalias self: *Producer, noalias runnable: *Runnable) ?Batch { | |
const index = self.tail +% self.pushed; | |
self.pushed += 1; | |
runnable.next = null; | |
if (self.pushed > 0) { | |
runnable.next = self.buffer.array[(index -% 1) % capacity].loadUnchecked(); | |
} | |
self.buffer.array[index % capacity].store(runnable, .Unordered); | |
self.remaining -= 1; | |
return null; | |
} | |
fn pushOverflow(noalias self: *Producer, noalias runnable: *Runnable) ?Batch { | |
@setCold(true); | |
const head = self.buffer.head.load(.Monotonic); | |
const size = (self.tail +% self.pushed) -% self.head; | |
assert(size >= 0 and size <= capacity); | |
self.remaining = capacity - size; | |
if (self.remaining != 0) { | |
return self.pushRunnable(runnable); | |
} | |
const migrate = capacity / 2; | |
if (self.buffer.head.compareAndSwap( | |
head, | |
head +% migrate, | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
self.remaining = capacity - (index -% updated); | |
return self.pushRunnable(runnable); | |
} | |
const first = runnable; | |
first.next = self.buffer.array[(head +% (overflow - 1)) % capacity].loadUnchecked(); | |
const last = self.buffer.array[head % capacity].loadUnchecked(); | |
last.next = null; | |
return Batch{ | |
.head = first, | |
.tail = last, | |
}; | |
} | |
fn commit(self: Producer) bool { | |
if (self.pushed == 0) { | |
return false; | |
} | |
const tail = self.tail +% self.pushed; | |
self.buffer.tail.store(tail, .Release); | |
return true; | |
} | |
}; | |
fn pop(noalias self: *Buffer) ?*Runnable { | |
const tail = self.tail.loadUnchecked(); | |
var head = self.head.load(.Monotonic); | |
if (head == tail) { | |
return null; | |
} | |
const new_tail = tail -% 1; | |
head = switch (arch) { | |
.i386, .x86_64 => blk: { | |
_ = self.tail.fetchSub(1, .SeqCst); | |
break :blk self.head.load(.SeqCst); | |
}, | |
.arm, .armeb, .thumb, .thumbeb => blk: { | |
self.tail.store(new_tail, .Monotonic); | |
std.atomic.fence(.SeqCst); | |
break :blk self.head.load(.Monotonic); | |
}, | |
else => blk: { | |
self.tail.store(new_tail, .SeqCst); | |
break :blk self.head.load(.SeqCst); | |
}, | |
}; | |
const size = tail -% head; | |
assert(size <= capacity); | |
if (size >= 1) { | |
const runnable = self.array[new_tail % capacity].loadUnchecked(); | |
if (size > 1) { | |
return runnable; | |
} | |
_ = self.head.compareAndSwap( | |
head, | |
tail, | |
.Acquire, | |
.Monotonic, | |
) orelse return runnable; | |
} | |
self.tail.store(tail, .Monotonic); | |
return null; | |
} | |
fn steal(noalias self: *Buffer) ?*Runnable { | |
const use_fence = switch (arch) { | |
.arm, .armeb, .thumb, .thumbeb => true, | |
else => false, | |
}; | |
var head = self.head.load(if (use_fence) .Monotonic else .Acquire); | |
while (true) { | |
if (use_fence) std.atomic.fence(.SeqCst); | |
const tail = self.tail.load(if (use_fence) .Monotonic else .Acquire); | |
const size = tail -% head; | |
if (size <= 0) { | |
return null; | |
} | |
const runnable = self.array[head % capacity].load(.Unordered); | |
head = self.head.tryCompareAndSwap( | |
head, | |
head +% 1, | |
.SeqCst, | |
.Monotonic, | |
) orelse return runnable; | |
std.atomic.spinLoopHint(); | |
} | |
} | |
const Popped { | |
runnable: *Runnable, | |
pushed: bool = false, | |
}; | |
fn consume(noalias self: *Buffer, noalias list: *List) ?Popped { | |
const tail = self.tail.loadUnchecked(); | |
if (std.debug.runtime_safety) { | |
assert(tail == self.head.load(.Monotonic)); | |
} | |
var consumer = list.getConsumer() orelse return null; | |
defer consumer.release(); | |
var pushed: i32 = 0; | |
while (pushed < capacity) : (pushed += 1) { | |
const runnable = consumer.pop() orelse break; | |
self.array[(tail +% pushed) % capacity].store(runnable, .Unordered); | |
runnable.next = null; | |
if (pushed != 0) { | |
runnable.next = self.array[(tail +% (pushed - 1)) % capacity].loadUnchecked(); | |
} | |
} | |
const popped = consumer.pop() orelse blk: { | |
if (pushed == 0) break :blk null; | |
pushed -= 1; | |
break :blk self.array[(tail +% pushed) % capacity].loadUnchecked(); | |
}; | |
if (pushed > 0) self.tail.store(tail +% pushed, .Release); | |
return Popped { | |
.runnable = popped, | |
.pushed = pushed > 0, | |
}; | |
} | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const assert = std.debug.assert; | |
const arch = std.Target.current.cpu.arch; | |
const Thread = std.Thread; | |
const Atomic = std.atomic.Atomic; | |
const Semaphore = std.Thread.Semaphore; | |
pub const ThreadPool = struct { | |
workers: []Atomic(usize), | |
idle_sema: Semaphore = .{}, | |
run_queue: UnboundedQueue = .{}, | |
waker: Atomic(usize) = Atomic(usize).init(@bitCast(usize, Waker{})), | |
pub fn init(threads: []usize) ThreadPool { | |
const workers = @ptrCast([*]Atomic(usize), threads.ptr)[0..threads.len]; | |
for (workers) |*ptr| ptr.* = 0; | |
return .{ .workers = workers }; | |
} | |
pub fn deinit(self: *ThreadPool) void { | |
self.shutdown(); | |
self.join(); | |
self.* = undefined; | |
} | |
pub const Runnable = struct { | |
runFn: fn(*Runnable) void, | |
next: ?*Runnable = undefined, | |
pub const Group = struct { | |
len: usize = 0, | |
head: *Runnable = undefined, | |
tail: *Runnable = undefined, | |
pub fn from(runnable: *Runnable) Group { | |
runnable.next = null; | |
return .{ | |
.len = 1, | |
.head = runnable, | |
.tail = runnable, | |
}; | |
} | |
pub fn push(self: *Group, group: Group) void { | |
if (group.len == 0) return; | |
if (self.len == 0) { | |
self.* = group; | |
} else { | |
self.tail.next = group.head; | |
self.tail = group.tail; | |
self.len += group.len; | |
} | |
} | |
pub fn pop(self: *Group) ?*Runnable { | |
if (self.len == 0) return null; | |
const runnable = self.head; | |
self.head = runnable.next orelse undefined; | |
self.len -= 1; | |
return runnable; | |
} | |
}; | |
}; | |
pub fn schedule(self: *ThreadPool, group: Runnable.Group) void { | |
if (group.len == 0) { | |
return; | |
} | |
if (Worker.getCurrent()) |worker| { | |
if (worker.buffer.push(group)) |overflowed| { | |
worker.queue.push(overflowed); | |
} | |
} else { | |
self.run_queue.push(group); | |
} | |
self.notify(false); | |
} | |
pub fn shutdown(self: *ThreadPool) void { | |
var waker = @bitCast(Waker, self.waker.load(.Monotonic)); | |
while (waker.state != .shutdown) { | |
var new_waker = waker; | |
new_waker.state = .shutdown; | |
new_waker.waiting = false; | |
if (self.waker.tryCompareAndSwap( | |
@bitCast(u32, waker), | |
@bitCast(u32, new_waker), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
waker = @bitCast(Waker, updated); | |
continue; | |
} | |
while (waker.waiting and waker.active > 0) { | |
self.idle_sema.post(); | |
waker.active -= 1; | |
} | |
return; | |
} | |
} | |
const State = enum(u2) { | |
pending = 0, | |
signaled, | |
waking, | |
shutdown, | |
}; | |
const Waker = packed struct { | |
state: State = .pending, | |
notified: bool = false, | |
waiting: bool = false, | |
active: u28 = 0, | |
}; | |
fn notify(self: *ThreadPool, is_waking: bool) void { | |
var waker = @bitCast(Waker, self.waker.load(.Monotonic)); | |
while (true) { | |
if (waker.state == .shutdown) return; | |
if (is_waking) assert(waker.state == .waking); | |
const can_wake = is_waking or waker.state == .pending; | |
var new_waker = waker; | |
new_waker.notified = true; | |
if (waker.waiting and can_wake) { | |
new_waker.waiting = false; | |
new_waker.state = .signaled; | |
} else if (waker.active < self.workers.len and can_wake) { | |
new_waker.active += 1; | |
new_waker.state = .signaled; | |
} else if (is_waking) { | |
new_waker.state = .pending; | |
} else if (waker.notified) { | |
return; | |
} | |
if (self.waker.tryCompareAndSwap( | |
@bitCast(u32, waker), | |
@bitCast(u32, new_waker), | |
.Release, | |
.Monotonic, | |
)) |updated| { | |
waker = @bitCast(Waker, updated); | |
continue; | |
} | |
if (waker.waiting and can_wake) { | |
self.idle_sema.post(); | |
return; | |
} | |
if (waker.active < self.workers.len and can_wake) { | |
if (Worker.spawn(self, waker.active)) return; | |
self.unregister(null, undefined); | |
} | |
return; | |
} | |
} | |
fn spawned(self: *ThreadPool) error{Shutdown}!bool { | |
var waker = @bitCast(Waker, self.waker.load(.Monotonic)); | |
while (true) { | |
if (waker.state == .shutdown) return error.Shutdown; | |
if (!waker.notified) return false; | |
var new_waker = waker; | |
new_waker.notified = false; | |
if (waker.state == .signaled) { | |
new_waker.state = .waking; | |
} | |
if (self.waker.tryCompareAndSwap( | |
@bitCast(u32, waker), | |
@bitCast(u32, new_waker), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
waker = @bitCast(u32, updated); | |
continue; | |
} | |
retrn waker.state == .waking; | |
} | |
} | |
fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool { | |
var is_waiting = false; | |
var is_waking = _is_waking; | |
var waker = @bitCast(Waker, self.waker.load(.Monotonic)); | |
while (true) { | |
if (waker.state == .shutdown) return error.Shutdown; | |
if (is_waking) assert(waker.state == .waking); | |
const is_notified = waker.notified or waker.state == .signaled; | |
if (is_notified or !is_waiting) { | |
var new_waker = waker; | |
new_waker.notified = false; | |
if (is_notified) { | |
if (is_waking or waker.state == .signaled) { | |
new_waker.state = .waking; | |
} | |
if (is_waiting) { | |
new_waker.waiting = true; | |
} | |
} else { | |
new_waker.waiting = true; | |
if (is_waking) { | |
new_waker.state = .pending; | |
} | |
} | |
if (@bitCast(u32, waker) != @bitCast(u32, new_waker)) { | |
if (self.waker.tryCompareAndSwap( | |
@bitCast(u32, waker), | |
@bitCast(u32, new_waker), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
waker = @bitCast(u32, updated); | |
continue; | |
} | |
if (is_notified) { | |
return is_waking or waker.state == .signaled; | |
} | |
} | |
} | |
is_waking = false; | |
is_waiting = true; | |
self.idle_sema.wait(); | |
waker = @bitCast(Waker, self.waker.load(.Monotonic)); | |
} | |
} | |
fn register( | |
noalais self: *ThreadPool, | |
noalias worker: *Worker, | |
worker_index: u32, | |
) void { | |
const ptr = &self.workers[worker_index]; | |
assert(ptr.loadUnchecked() == 0); | |
comptime assert(@alignOf(Worker) >= 0b10); | |
const worker_ptr = @ptrToInt(worker) | 1; | |
ptr.store(worker_ptr, .Release); | |
} | |
fn unregister( | |
noalais self: *ThreadPool, | |
noalias maybe_worker: ?*Worker, | |
worker_index: u32, | |
) void { | |
if (maybe_worker) |worker| { | |
const ptr = &self.workers[worker_index]; | |
assert(ptr.loadUnchecked() == @ptrToInt(worker) | 1); | |
const thread_ptr = @ptrToInt(worker.thread); | |
ptr.store(thread_ptr, .Release); | |
} | |
var inactive = @bitCast(u32, Waker{ .active = 1 }); | |
var waker = @bitCast(Waker, self.waker.fetchSub(inactive, .Release)); | |
assert(waker.state == .shutdown); | |
assert(waker.active >= 1); | |
if (waker.active == 1 and waker.waiting) { | |
self.idle_sema.post(); | |
} | |
} | |
fn join(self: *ThreadPool) void { | |
var spin: usize = 10; | |
var is_waiting = false; | |
var waker = @bitCast(Waker, self.waker.load(.Acquire)); | |
while (waker.active > 0) { | |
assert(waker.state == .shutdown); | |
if (!waker.waiting) { | |
if (spin > 0) { | |
spin -= 1; | |
std.atomic.spinLoopHint(); | |
waker = @bitCast(Waker, self.waker.load(.Acquire)); | |
continue; | |
} | |
var new_waker = waker; | |
new_waker.waiting = true; | |
if (self.waker.tryCompareAndSwap( | |
@bitCast(u32, waker), | |
@bitCast(u32, new_waker), | |
.Acquire, | |
.Acquire, | |
)) |updated| { | |
waker = @bitCast(Waker, updated); | |
continue; | |
} | |
} | |
self.idle_sema.wait(); | |
waker = @bitCast(Waker, self.waker.load(.Acquire)); | |
} | |
for (self.workers) |*ptr| { | |
const thread_ptr = ptr.load(.Acquire); | |
if (thread_ptr == 0) { | |
continue; | |
} | |
assert(thread_ptr & 1 != 0); | |
const thread = @intToPtr(*Thread, thread_ptr); | |
thread.wait(); | |
} | |
} | |
const Worker = struct { | |
thread: *Thread, | |
buffer: BoundedQueue = .{}, | |
queue: UnboundedQueue = .{}, | |
threadlocal var tls_current: ?*Worker = null; | |
fn getCurrent() ?*Worker { | |
return tls_current; | |
} | |
fn spawn(pool: *ThreadPool, index: u32) bool { | |
const Spawner = struct { | |
_pool: *ThreadPool, | |
_index: u32, | |
_thread: *Thread = undefined, | |
_put_sema: Semaphore = .{}, | |
_got_sema: Semaphore = .{}, | |
fn entry(self: *@This()) void { | |
self._put_sema.wait(); | |
const copy = self; | |
self._got_sema.post(); | |
var worker: Worker = undefined; | |
return worker.run( | |
copy._pool, | |
copy._thread, | |
copy._index, | |
); | |
} | |
}; | |
var spawner = Spawner{ | |
._pool = pool, | |
._index = index, | |
}; | |
spawner._thread = Thread.spawn(Spawner.entry, &spawner) catch return false; | |
spawner._put_sema.post(); | |
spawner._got_sema.wait(); | |
return true; | |
} | |
fn run( | |
noalais self: *Worker, | |
noalias pool: *ThreadPool, | |
noalias thread: *Thread, | |
index: u32, | |
) void { | |
tls_current = self; | |
self.* = { .thread = thread }; | |
pool.register(self, index); | |
defer pool.unregister(self, index); | |
var is_waking = pool.spawned() catch return; | |
var prng = @truncate(u32, @ptrToInt(self) *% 31) | 1; | |
while (true) { | |
var did_push = false; | |
const runnable = self.pop(pool, &prng, &did_push) orelse { | |
is_waking = pool.wait(is_waking) catch return; | |
continue; | |
}; | |
if (is_waking or did_push) { | |
pool.notify(is_waking); | |
} | |
is_waking = false; | |
(runnable.runFn)(runnable); | |
} | |
} | |
fn pop( | |
noalias self: *Worker, | |
noalias pool: *ThreadPool, | |
noalias prng: *u32, | |
noalais did_push: *bool, | |
) ?*Runnable { | |
if (self.buffer.pop()) |runnable| { | |
return runnable; | |
} | |
var attempts: usize = switch (arch) { | |
.i386, .x86_64 => 8, | |
else => 4, | |
}; | |
while (attempts > 0) : (attempts -= 1) { | |
var was_contended = false; | |
if (self.buffer.consume(&self.queue, did_push) catch blk: { | |
was_contended = true; | |
break :blk null; | |
}) |runnable| { | |
return runnable; | |
} | |
const num_workers = @intCast(u32, pool.workers.len); | |
var target_index = blk: { | |
var rng = prng.*; | |
rng ^= rng << 13; | |
rng ^= rng >> 17; | |
rng ^= rng << 7; | |
prng.* = rng; | |
break :blk rng % num_workers; | |
}; | |
var worker_iter = num_workers; | |
while (worker_iter > 0) : (worker_iter -= 1) { | |
const target_ptr = &pool.workers[target_index]; | |
target_index = (target_index + 1) % num_workers; | |
const target = blk: { | |
const ptr = target_ptr.load(.Acquire); | |
if (ptr & 1 == 0) { | |
continue; | |
} | |
const worker = @intToPtr(*Worker, ptr); | |
if (worker == self) { | |
continue; | |
} | |
break :blk worker; | |
} | |
if (self.buffer.consume(&target.queue, did_push) catch blk: { | |
was_contended = true; | |
break :blk null; | |
}) |runnable| { | |
return runnable; | |
} | |
if (target.buffer.steal() catch blk: { | |
was_contended = true; | |
break :blk null; | |
}) |runnable| { | |
return runnable; | |
} | |
} | |
if (self.buffer.consume(&pool.run_queue, did_push) catch blk: { | |
was_contended = true; | |
break :blk null; | |
}) |runnable| { | |
return runnable; | |
} | |
if (was_contended) { | |
attempts += 1; | |
} else { | |
std.os.sched_yield() catch {}; | |
} | |
} | |
return null; | |
} | |
}; | |
const UnboundedQueue = struct { | |
input: Atomic(?*Runnable) = Atomic(?*Runnable).init(null), | |
output: Atomic(usize) = Atomic(usize).init(0), | |
fn push( | |
self: *UnboundedQueue, | |
group: Group, | |
comptime single_producer: bool, | |
) void { | |
assert(group.len > 0); | |
var input = self.input.load(.Monotonic); | |
while (true) { | |
group.tail.next = input; | |
if (input == null and single_producer) { | |
self.input.store(group.head, .Release); | |
return; | |
} | |
input = self.input.tryCompareAndSwap( | |
input, | |
group.head, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
} | |
const HAS_CONSUMER: usize = 0b1; | |
comptime { | |
const alignment = ~HAS_CONSUMER + 1; | |
assert(@alignOf(Runnable) >= alignment); | |
} | |
fn acquire(self: *UnboundedQueue) error{Contended}?!Consumer { | |
var output = self.output.load(.Monotonic); | |
if (output == HAS_CONSUMER) { | |
return error.Contended; | |
} | |
if (output == 0 and self.input.load(.Monotonic) == null) { | |
return null; | |
} | |
acquired: { | |
if (arch.isX86() or arch.isRISCV()) { | |
output = self.output.swap(HAS_CONSUMER, .Acquire); | |
if (output == HAS_CONSUMER) return error.Contended; | |
break :blk acquired; | |
} | |
const is_aarch64 = arch == .aarch64 or arch == .aarch64_be or arch == .aarch64_32; | |
if (arch.isARM() or arch.isThumb() or arch.isPPC() or arch.isPPC64() or arch.isMIPS() or is_aarch64) { | |
while (true) { | |
output = self.output.tryCompareAndSwap( | |
output, | |
HAS_CONSUMER, | |
.Acquire, | |
.Monotonic, | |
) orelse break :blk acquired; | |
if (output == HAS_CONSUMER) { | |
return error.Contended; | |
} | |
if (output == 0 and self.input.load(.Monotonic) == null) { | |
return null; | |
} | |
} | |
} | |
_ = self.output.compareAndSwap( | |
output, | |
HAS_CONSUMER, | |
.Acquire, | |
.Monotonic, | |
) orelse break :blk acquired; | |
return error.Contended; | |
} | |
assert(output & HAS_CONSUMER == 0); | |
return Consumer{ | |
.queue = self, | |
.output = @intToPtr(?*Runnable, output), | |
}; | |
} | |
const Consumer = struct { | |
queue: *UnboundedQueue, | |
output: ?*Runnable, | |
fn pop(self: *Consumer) ?*Runnable { | |
if (self.output) |runnable| { | |
self.output = runnable.next; | |
return runnable; | |
} | |
var input = self.queue.input.load(.Monotonic); | |
if (input == null) { | |
return null; | |
} | |
input = self.queue.input.swap(null, .Acquire); | |
const runnable = input orelse unreachable; | |
self.output = runnable.next; | |
return runnable; | |
} | |
fn release(self: Consumer) void { | |
const output = @ptrToInt(self.output); | |
self.queue.output.store(output, .Release); | |
} | |
}; | |
}; | |
const BoundedQueue = struct { | |
head: Atomic(isize) = Atomic(isize).init(0), | |
tail: Atomic(isize) = Atomic(isize).init(0), | |
array: [capacity]Atomic(*Runnable) = undefined, | |
const capacity = 256; | |
fn push(self: *BoundedQueue, group: Group) ?Group { | |
assert(group.len > 0); | |
if (group.len > 1) { | |
return group; | |
} | |
var mutable_group = group; | |
const runnable = mutable_group.pop(); | |
assert(mutable_group.len == 0); | |
var tail = self.tail.loadUnchecked(); | |
var head = self.head.load(.Monotonic); | |
while (true) { | |
const size = tail -% head; | |
assert(size >= 0); | |
assert(size <= capacity); | |
if (size < capacity) { | |
self.array[tail % capacity].store(runnable, .Unordered); | |
self.tail.store(tail +% 1, .Release); | |
return null; | |
} | |
const new_head = head +% (capacity / 2); | |
if (self.head.tryCompareAndSwap(head, new_head, .Acquire, .Monotonic)) |updated| { | |
head = updated; | |
continue; | |
} | |
var overflowed = Group.from(runnable); | |
while (head != new_head) : (head +%= 1) { | |
const migrated = self.array[head % capacity].loadUnchecked(); | |
overflowed.push(Group.from(migrated)); | |
} | |
return overflowed; | |
} | |
} | |
fn pop(self: *BoundedQueue) ?*Runnable { | |
const tail = self.tail.loadUnchecked(); | |
var head = self.head.load(.Unordered); | |
if (head == tail) { | |
return null; | |
} | |
const new_tail = tail +% 1; | |
self.tail.store(new_tail, .SeqCst); | |
head = self.head.load(.SeqCst); | |
const size = tail -% head; | |
assert(size >= 0); | |
assert(size <= capacity); | |
if (head == tail) { | |
self.tail.store(tail, .Monotonic); | |
return null; | |
} | |
const runnable = self.array[new_tail % capacity].loadUnchecked(); | |
if (head != new_tail) { | |
return runnable; | |
} | |
if (self.head.compareAndSwap(head, tail, .SeqCst, .Monotonic)) |_| { | |
self.tail.store(tail, .Monotonic); | |
return null; | |
} | |
return runnable; | |
} | |
fn steal(self: *BoundedQueue) error{Contended}!?*Runnable { | |
const head = self.head.load(.Acquire); | |
const tail = self.tail.load(.Acquire); | |
const size = tail -% head; | |
if (size <= 0) { | |
return null; | |
} | |
const runnable = self.array[head % capacity].load(.Unordered); | |
if (self.head.compareAndSwap(head, head +% 1, .SeqCst, .Monotonic)) |_| { | |
return error.Contended; | |
} | |
return runnable; | |
} | |
fn consume( | |
noalias self: *BoundedQueue, | |
noalias unbounded: *UnboundedQueue, | |
noalias did_push: *bool, | |
) error{Contended}!?*Runnable { | |
var consumer = (try unbounded.acquire()) orelse return null; | |
defer consumer.release(); | |
var pushed = false; | |
var runnable = consumer.pop(); | |
var tail = self.tail.loadUnchecked(); | |
var head = self.head.load(.Monotonic); | |
while (true) { | |
const size = tail -% head; | |
assert(size >= 0); | |
assert(size <= capacity); | |
if (size == capacity) { | |
if (pushed) { | |
head = self.head.load(.Monotonic); | |
if (head != tail) { | |
pushed = false; | |
continue; | |
} | |
} | |
break; | |
} | |
const consumed = consumer.pop() orelse break; | |
self.array[tail % capacity].store(consumed, .Unordered); | |
pushed = true; | |
tail +%= 1; | |
} | |
if (runnable == null) runnable = consumer.pop(); | |
if (runnable == null and tail != head) { | |
tail -%= 1; | |
runnable = self.array[tail % capacity].loadUnchecked(); | |
} | |
if (tail != head) { | |
did_push.* = true; | |
self.tail.store(tail, .Release); | |
} | |
return runnable; | |
} | |
}; | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Scheduler = struct { | |
registry: Registry, | |
injected: Queue.List = .{}, | |
pub fn init() void { | |
} | |
pub const Resumed = enum { | |
Signaled, | |
Spawned, | |
}; | |
pub const Notified = struct { | |
resumed: Resumed, | |
scheduler: *Scheduler, | |
pub fn complete(self: Notified, succeeded: bool) void { | |
if (self.resumed == .Spawned and !succeeded) { | |
self.registry.unregister(null) | |
} | |
} | |
} | |
pub fn schedule(self: *Scheduler) void { | |
self.injected.push(batch); | |
self.registry.notify(); | |
} | |
pub fn shutdown(self: *Scheduler) void { | |
self.registry.shutdown(); | |
} | |
}; | |
const Worker = struct { | |
node: Registry.Node = undefined, | |
scheduler: *Scheduler, | |
queue: Queue = .{}, | |
pub fn register( | |
noalias self: *Worker, | |
noalias scheduler: *Scheduler, | |
) error{Shutdown}!void { | |
} | |
pub fn shutdown(self: *Worker) error{Ready}!void { | |
self.scheduler.no | |
} | |
pub const Poll = struct { | |
node: *Queue.Node, | |
notified: ?Scheduler.Notified, | |
}; | |
pub fn poll(self: *Worker) error{Shutdown}!?Poll { | |
switch (self.node.getStatus()) { | |
.uninit => unreachable, | |
.waiting => unreachable, | |
.waking, .running => {}, | |
} | |
const result = self.pop() orelse return null; | |
const notify = try self.scheduler.registry.processed( | |
&self.node, | |
result.pushed, | |
); | |
var notified: ?Scheduler.Notified = null; | |
if (notify) |notification| { | |
notified = .{ | |
.scheduler = self.scheduler, | |
.resumed = switch (notification) { | |
.Signal => .Signaled, | |
.Register => .Spawned, | |
}, | |
}; | |
} | |
return Poll{ | |
.node = result.node, | |
.notified = notified, | |
}; | |
} | |
pub fn wait(self: *Worker, externally_notified: bool) error{Ready, Shutdown}!void { | |
return self.scheduler.registry.wait( | |
&self.node, | |
externally_notified, | |
); | |
} | |
fn pop(self: *Worker) ?Queue.PopResult { | |
var local_queue_empty = false; | |
var attempts: usize = switch (arch) { | |
.i386, .x86_64 => 8, | |
else => 4, | |
}; | |
const scheduler = self.scheduler; | |
while (true) { | |
var was_contended = contended: { | |
if (local_queue_empty) { | |
break :contended false; | |
} | |
return self.queue.pop() catch |err| { | |
local_queue_empty = err == error.Empty; | |
break :contended (err == error.Contended); | |
} | |
}; | |
var num_workers = scheduler.registry.count(); | |
while (num_workers > 0) : (num_workers -= 1) { | |
const target_node = self.nodes.next() orelse blk: { | |
self.nodes = scheduler.registry.iter(); | |
breal :blk self.nodes.next() orelse unreachable; | |
}; | |
const target_worker = @fieldParentPtr(Worker, "node", target_node); | |
if (target_worker == self) { | |
continue; | |
} | |
return self.queue.steal(&target_worker.queue) catch |err| { | |
was_contended = was_contended or (err == error.Contended); | |
continue; | |
}; | |
} | |
was_contended = contended: { | |
return self.queue.consume(&scheduler.injected) catch |err| { | |
break :blk (was_contended or (err == error.Contended)); | |
}; | |
}; | |
if (was_contended) { | |
continue; | |
} | |
attempts -= 1; | |
if (attempts == 0) { | |
return null; | |
} | |
} | |
} | |
}; | |
const Registry = struct { | |
pub const Node = struct { | |
data: Atomic(usize) align(std.math.max(@alignOf(usize), 4)), | |
pub const Status = enum(u2) { | |
uninit, | |
waiting, | |
waking, | |
running, | |
}; | |
fn getData(self: *const Node) usize { | |
return self.data.load(.Unordered); | |
} | |
fn setData(self: *Node, data: usize) void { | |
self.data.store(data, .Unordered); | |
} | |
pub fn getStatus(self: Node) Status { | |
return @intToEnum(Status, @truncate(u2, self.getData())); | |
} | |
fn setStatus(self: *Node, status: Status) void { | |
self.setData((self.data & ~@as(usize, 0b11)) | @enumToInt(status)); | |
} | |
fn getNext(self: Node) ?*Node { | |
return @intToPtr(?*Node, self.getData() & ~@as(usize, 0b11)); | |
} | |
fn setNext(self: *Node, next: ?*Node) void { | |
self.setData((self.data & 0b11) | @ptrToInt(next)); | |
} | |
}; | |
const State = enum(u2) { | |
pending, | |
signaled, | |
waking, | |
shutdown, | |
}; | |
const Sync = packed struct { | |
state: State = .pending, | |
waiting: bool = false, | |
notified: bool = false, | |
active: NodeCount = 0, | |
}; | |
pub const NodeCount = std.meta.Int( | |
.unsigned, | |
std.meta.bitCount(usize) - 2 - 1 - 1, | |
); | |
max_nodes: NodeCount, | |
active: Atomic(?*Node) = Atomic(?*Node).init(null), | |
sync: Atomic(usize) = Atomic(usize).init(@bitCast(usize, Sync{})), | |
pub const Notify = union(enum) { | |
Signal, | |
Register, | |
}; | |
pub fn notify(self: *Registry) error{Pending, Shutdown}!Notify { | |
const sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
if (sync.notified) { | |
return error.Pending; | |
} | |
return self.@"resume"(false); | |
} | |
fn @"resume"( | |
self: *Registry, | |
is_waking: bool, | |
) error{Pending, Shutdown}!Notify { | |
@setCold(true); | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (true) { | |
const can_wake = switch (sync.state) { | |
.pending => if (is_waking) unreachable else true, | |
.waking => is_waking, | |
.signaled => if (is_waking) unreachable else false, | |
.shutdown => return error.Shutdown, | |
}; | |
var new_sync = sync; | |
new_sync.notified = true; | |
if (sync.waiting and can_wake) { | |
new_sync.waiting = false; | |
new_sync.state = .signaled; | |
} else if (sync.active < self.max_nodes and can_wake) { | |
new_sync.active += 1; | |
new_sync.state = .signaled; | |
} else if (is_waking) { | |
new_sync.state = .pending; | |
} else if (sync.notified) { | |
return error.Pending; | |
} | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(usize, sync), | |
@bitCast(usize, new_sync), | |
.Release, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
if (sync.waiting and can_wake) { | |
return .Signal; | |
} else if (sync.active < self.max_nodes and can_wake) { | |
return .Register; | |
} else { | |
return error.Pending; | |
} | |
} | |
} | |
pub fn register( | |
noalias self: *Registry, | |
noalias node: *Node, | |
) error{Shutdown}!void { | |
var active = self.active.load(.Monotonic); | |
while (true) { | |
node.setNext(active); | |
node.setStatus(.running); | |
active = self.active.tryCompareAndSwap( | |
active, | |
node, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (true) { | |
var new_sync = sync; | |
new_sync.notified = false; | |
switch (sync.state) { | |
.pending, .waking => if (!sync.notified) return, | |
.signaled => new_sync.state = .waking, | |
.shutdown => return error.Shutdown, | |
} | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(usize, sync), | |
@bitCast(usize, new_sync), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
return node.setStatus(switch (sync.state) { | |
.signaled => .waking, | |
else => .running, | |
}); | |
} | |
} | |
pub fn unregister( | |
noalias self: *Registry, | |
noalias registered_node: ?*Node, | |
) error{Pending}!void { | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
assert(sync.state == .shutdown); | |
assert(sync.active > 0); | |
sync = @bitCast(Sync, blk: { | |
const inactive = @bitCast(usize, Sync{ .active = 1 }); | |
break :blk self.sync.fetchSub(inactive, .AcqRel); | |
}); | |
assert(sync.state == .shutdown); | |
assert(sync.active > 0); | |
if (sync.active > 1) { | |
return error.Pending; | |
} | |
} | |
pub fn processed( | |
noalias self: *Registry, | |
noalias node: *Node, | |
should_notify: bool, | |
) error{Shutdown}!?Notify { | |
const is_waking = switch (node.getStatus()) { | |
.uninit => unreachable, | |
.waiting => unreachable, | |
.waking => true, | |
.running => false, | |
}; | |
var notified: ?Notify = null; | |
if (should_notify or is_waking) { | |
notified = try self.@"resume"(is_waking); | |
} | |
node.setStatus(.running); | |
return notified; | |
} | |
pub fn count(self: *const Registry) NodeCount { | |
const sync_value = self.sync.load(.Monotonic); | |
const sync = @bitCast(Sync, sync_value); | |
return sync.active; | |
} | |
pub fn iter(self: *const Registry) Iter { | |
const top = self.active.load(.Acquire); | |
return Iter{ .node = top }; | |
} | |
pub const Iter = struct { | |
node: ?*Node, | |
pub fn next(self: *Iter) ?*Node { | |
const node = self.node orelse return null; | |
self.node = node.getNext(); | |
return node; | |
} | |
}; | |
pub fn wait( | |
noalias self: *Registry, | |
noalias node: *Node, | |
is_notified: bool, | |
) error{Ready, Shutdown}!void { | |
@setCold(true); | |
const status = node.getStatus(); | |
assert(status != .uninit); | |
const is_waking = status == .waking; | |
const is_waiting = status == .waiting; | |
self.@"suspend"(is_waking, is_waiting, is_notified); | |
} | |
fn @"suspend"( | |
self: *Registry, | |
is_waking: bool, | |
is_waiting: bool, | |
is_notified: bool, | |
) error{Ready, Shutdown}!void { | |
@setCold(true); | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (true) { | |
const signaled = switch (sync.state) { | |
.pending => if (is_waking) unreachable else false, | |
.waking => false, | |
.signaled => if (is_waking) unreachable else true, | |
.shutdown => return error.Shutdown; | |
}; | |
var new_sync = sync; | |
new_sync.notified = false; | |
const was_notified = signaled or sync.notified or is_notified; | |
if (was_notified) { | |
if (signaled or is_waking) new_sync.state = .waking; | |
if (is_waiting) new_sync.waiting = true; | |
} else if (!sync.waiting or is_waking) { | |
if (is_waking) new_sync.state = .pending; | |
new_sync.waiting = true; | |
} else { | |
node.setStatus(.waiting); | |
return; | |
} | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(usize, sync), | |
@bitCast(usize, new_sync), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
if (was_notified) { | |
const new_status = if (signaled or is_waking) .waking else .runninng; | |
node.setStatus(new_status); | |
return error.Ready; | |
} | |
node.setStatus(.waiting); | |
return; | |
} | |
} | |
pub fn shutdown(self: *Registry) bool { | |
var sync = @bitCast(Sync, self.sync.load(.Monotonic)); | |
while (true) { | |
assert(sync.state != .shutdown); | |
var new_sync = sync; | |
new_sync.state = .shutdown; | |
new_sync.waiting = false; | |
if (self.sync.tryCompareAndSwap( | |
@bitCast(usize, sync), | |
@bitCast(usize, new_sync), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
sync = @bitCast(Sync, updated); | |
continue; | |
} | |
return sync.waiting; | |
} | |
} | |
}; | |
const Queue = struct { | |
pub const Node = struct { | |
next: ?*Node, | |
}; | |
/// An ordered set of *Nodes forming a linked list. | |
pub const Batch = struct { | |
head: ?*Node = null, | |
tail: *Node = undefined, | |
pub fn from(node: *Node) Batch { | |
node.next = null; | |
return .{ | |
.head = node, | |
.tail = node, | |
}; | |
} | |
pub fn push(self: *Batch, other: Batch) void { | |
if (self.head == null) { | |
self.* = other; | |
} else if (other.head != null) { | |
self.tail.next = other.head; | |
self.tail = other.tail; | |
} | |
} | |
pub fn pop(self: *Batch) ?*Node { | |
const node = self.head orelse return null; | |
self.head = node.next; | |
return node; | |
} | |
}; | |
/// An unbounded MPSC of Nodes that are only available to this Queue | |
local: List = .{}, | |
/// An unbounded, non-blocking MPMC of Nodes that can be stolen from by other Queues | |
backlog: List = .{}, | |
/// A bounded, SPMC, fast-access ring-buffer of *Nodes for `backlog` that can be stolen from by other Queues | |
buffer: Buffer = .{}, | |
/// The context in which Queue.push() is being called under | |
pub const PushContext = enum { | |
/// Push to the queue's local list. This can used by any thread. | |
local, | |
/// Push to the queue's backlog list. This can be used by any thread, generally not the Queue's thread. | |
remote, | |
/// Push to either the queue's buffer or backlog. Can be called only by the thread which owns the Queue. | |
buffered, | |
}; | |
pub fn push( | |
self: *Queue, | |
batch: Batch, | |
comptime context: PushContext, | |
) void { | |
switch (context) { | |
.local => self.local.push(batch), | |
.remote => self.backlog.push(batch), | |
.buffered => { | |
const overflowed = self.buffer.push(batch) orelse return; | |
self.backlog.pushUnchecked(overflowed); | |
}, | |
} | |
} | |
pub const PopResult = struct { | |
/// The *Node which was dequeued from the Queue | |
node: *Node, | |
/// True if the dequeuing of this *Node resulting in pushing extra *Nodes | |
pushed: bool = false, | |
}; | |
pub fn pop(self: *Queue) error{Empty, Contended}!PopResult { | |
// Nodes local to this Queue take priority over everything else. | |
if (self.local.popUnchecked()) |node| { | |
return PopResult{ .node = node }; | |
} | |
// Check the buffer first before the backlog as it's much faster to access. | |
if (self.buffer.pop()) |node| { | |
return PopResult{ .node = node }; | |
} | |
// The buffer is empty, try to pop *Nodes from the backlog and refill the buffer in the process. | |
var consumer = try self.backlog.tryAcquireConsumer(); | |
var pushed: bool = false; | |
const node = self.buffer.consume(&consumer, &pushed); | |
consumer.release(); | |
return PopResult{ | |
.node = node, | |
.pushed = pushed, | |
}; | |
} | |
pub fn consume( | |
noalias self: *Queue, | |
noalais list: *List, | |
) error{Empty, Contended}!PopResult { | |
var consumer = try target.backlog.tryAcquireConsumer(); | |
defer consumer.release(); | |
var pushed: bool = false; | |
const node = self.buffer.consume(consumer, &pushed); | |
return PopResult{ | |
.node = node, | |
.pushed = pushed, | |
}; | |
} | |
pub fn steal( | |
noalias self: *Queue, | |
noalias target: *Queue, | |
) error{Empty, Contended}!PopResult { | |
// Check the target Queues backlog before the buffer. | |
// This minimizes contention with the Queue's pop() thread. | |
var was_contended = contended: { | |
return self.consume(&target.backlog) catch |err| { | |
break :blk contended (err == error.Contended); | |
}; | |
}; | |
// Try to steal from the target Queues buffer if the backlog is unavailable. | |
// Never steal from its local List as that is private to the target Queue. | |
was_contended = blk: { | |
const node = target.buffer.steal() catch |err| switch (err) { | |
error.Empty => break :blk was_contended, | |
error.Contended => break :blk true, | |
}; | |
return PopResult{ .node = node }; | |
}; | |
if (was_contended) return error.Contended; | |
return error.Empty; | |
} | |
/// Unbounded, Multi-Producer-(Single | non-blocking-Multi)-Consumer queue of *Nodes | |
const List = struct { | |
input: Atomic(?*Node) = Atomic(?*Node).init(null), | |
output: Atomic(usize) = Atomic(usize).init(0), | |
// Use the low bit of a node pointer to indicate that the output has an active consumer. | |
const HAS_CONSUMER: usize = 0b1; | |
comptime { | |
const consumer_alignment = ~HAS_CONSUMER + 1; | |
assert(@alignOf(Node) >= consumer_alignment); | |
} | |
pub fn push(self: *List, batch: Batch) void { | |
return self.pushToStack(false, batch); | |
} | |
pub fn pushUnchecked(self: *List, batch: Batch) void { | |
return self.pushToStack(true, batch); | |
} | |
fn pushToStack(self: *List, comptime is_only_producer: bool, batch: Batch) void { | |
const head = batch.head orelse return; | |
const tail = batch.tail; | |
var stack = self.stack.load(.Monotonic); | |
while (true) { | |
// Prepend our batch on top of the existing stack if any | |
tail.next = stack; | |
// Optimization to avoid the RMW operation below if there is only one producer | |
if (stack == null and is_only_producer) { | |
self.stack.store(head, .Release); | |
break; | |
} | |
// Update the stack with our prepended batch. | |
// Release barrier so that Acquire swap in Consumer.pop() sees well-formed .next links. | |
stack = self.stack.tryCompareAndSwap( | |
stack, | |
head, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
} | |
pub fn popUnchecked(self: *List) ?*Node { | |
// Effectively the following, but without consumer-side synchronization: | |
// | |
// var consumer = self.tryAcquireConsumer() catch unreachable; | |
// defer consumer.release(); | |
// return consumer.pop(); | |
// | |
var output = self.output.loadUnchecked(); | |
assert(output & HAS_CONSUMER == 0); | |
self.output.storeUnchecked(HAS_CONSUMER); | |
var consumer = Consumer{ | |
.stack = @intToPtr(?*Node, output), | |
.list = self, | |
}; | |
const node = consumer.pop(); | |
output = @ptrToInt(consumer.stack); // & ~HAS_CONSUMER | |
self.output.storeUnchecked(output); | |
return node; | |
} | |
pub fn tryAcquireConsumer(self: *List) error{Empty, Contended}!Consumer { | |
// Check if the List can be consumed without invalidating its cache-lines with an RMW op | |
var output = self.output.load(.Monotonic); | |
if (output == HAS_CONSUMER) { | |
return error.Contended; | |
} | |
if (output == 0 and self.input.load(.Monotonic) == null) { | |
return error.Empty; | |
} | |
// Select the best strategy to use for acquiring the Consumer based on the architecture. | |
const Strategy = enum { xchg, cmpxchg, ll_sc }; | |
comptime var strategy = Strategy.cmpxchg; | |
inline for (.{ | |
.{Strategy.xchg, .{"x86", "riscv"}}, | |
.{Strategy.ll_sc, .{"arm", "aarch64", "mips", "powerpc"}}, | |
}) |cpu_strategy| { | |
inline for (cpu_strategy[1]) |cpu_name| { | |
if (comptime std.mem.eql(u8, comptime arch.genericName(), cpu_name)) { | |
strategy = cpu_strategy[0]; | |
} | |
} | |
} | |
switch (strategy) { | |
// Optimized for platforms where swap is fast or a single instruction | |
.xchg => { | |
output = self.output.swap(HAS_CONSUMER, .Acquire); | |
if (output == HAS_CONSUMER) { | |
return error.Contended; | |
} | |
}, | |
// The default strategy | |
.cmpxchg => acquired: { | |
_ = self.output.compareAndSwap( | |
output, | |
HAS_CONSUMER, | |
.Acquire, | |
.Monotonic, | |
) orelse break :acquired {}; | |
return error.Contended; | |
}, | |
// Optimized for platforms where the cmpxchg default strategy is a loop | |
// in which case the cmpxchg loop can be implemented more efficiently. | |
.llsc => while (true) { | |
output = self.output.tryCompareAndSwap( | |
output, | |
HAS_CONSUMER, | |
.Acquire, | |
.Monotonic, | |
) orelse break; | |
if (output == HAS_CONSUMER) { | |
return error.Contended; | |
} | |
if (output == 0 and self.input.load(.Monotonic) == null) { | |
return error.Empty; | |
} | |
}, | |
} | |
return Consumer{ | |
.stack = @intToPtr(?*Node, output & ~HAS_CONSUMER), | |
.list = self, | |
}; | |
} | |
pub const Consumer = struct { | |
stack: ?*Node, | |
list: *List, | |
pub fn pop(self: *Consumer) ?*Node { | |
// Check the local stack of Nodes first. | |
if (self.stack) |node| { | |
self.stack = node.next; | |
return node; | |
} | |
// Check if there's any Nodes pushed without doing the RMW operation below. | |
var input = self.list.input.load(.Monotonic); | |
if (input == null) { | |
return null; | |
} | |
// Steal any pushed Nodes and use the remaining for the local stack | |
const node = self.list.input.swap(null, .Acquire) orelse unreachable; | |
self.stack = node.next; | |
return node; | |
} | |
pub fn release(self: Consumer) void { | |
// Update the List's output with the remaining Nodes | |
// while also unsetting the HAS_CONSUMER bit for other threads to start dequeueing. | |
const new_output = @ptrToInt(self.stack); // & ~HAS_CONSUMER | |
self.list.output.store(new_output, .Release); | |
} | |
}; | |
}; | |
// Bounded, Single-Producer-Multi-Consumer, Ring-Buffer of *Nodes | |
const Buffer = struct { | |
head: Atomic(Index) = Atomic(Index).init(0), | |
tail: Atomic(Index) = Atomic(Index).init(0), | |
array: [capacity]Atomic(*Node) = undefined, | |
// Found to be the point of diminishing returns in terms of scheduler throughput. | |
// Appears to also be the queue size used in Go's scheduler: | |
// https://github.com/golang/go/blob/master/src/runtime/runtime2.go#L626 | |
const capacity = 256; | |
// Ensure that the head and tail indices have valid integer types. | |
const Index = std.meta.Int(.signed, std.meta.bitCount(usize) / 2); | |
comptime { | |
assert(std.math.maxInt(Index) >= capacity); | |
assert(@typeInfo(Index).Int.signedness == .signed); | |
} | |
pub fn push(self: *Buffer, batch: Batch) ?Batch { | |
// Make sure there is a node to push | |
const node = batch.head orelse return null; | |
// Don't push more than one node to the queue. | |
// Batches of multiple nodes should be pushed to unbounded queues instead. | |
if (node.next != null) { | |
return batch; | |
} | |
// Since we're the producer, the tail can't change so it doesn't need atomics. | |
// Other threads also aren't publishing memory to us so Monotonic is fine for the head. | |
const tail = self.tail.loadUnchecked(); | |
var head = self.head.load(.Monotonic); | |
while (true) { | |
const size = tail -% head; | |
assert(size <= capacity); | |
// Push to the array if it's not full. | |
// The array slot must be written to atomically since there could be a concurrent reader on wrap around. | |
// The tail must be updated with a Release barrier so that stealing threads see the node written into the array on tail Acquire. | |
if (size < capacity) { | |
self.array[tail % capacity].store(node, .Unordered); | |
self.tail.store(tail +% 1, .Release); | |
return null; | |
} | |
// The array is full, try to move some node out to make space for future push() operations. | |
// We migrate only half of the array's nodes so that subsequent pop() operations don't fail either. | |
// Acquire barrier to ensure that the Node modifications done below only happen on a successful steal and not reorderd before it. | |
var migrate = size / 2; | |
if (self.head.tryCompareAndSwap( | |
head, | |
head +% migrate, | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
head = updated; | |
continue; | |
} | |
// Create a batch of the migrated nodes to report as overflowing. | |
// Iterate backwards in the array as the most recent nodes are more likely to be in cache. | |
// NOTE: Creating this batch already thrashes the cache by dereferencing the intrusive nodes... | |
var overflowed = Batch.from(node); | |
head +%= (migrate - 1); | |
while (migrate > 0) : (migrate -= 1) { | |
const migrated = self.array[head % capacity].loadUnchecked(); | |
overflowed.push(Batch.from(migrated)); | |
head -%= 1; | |
} | |
return overflowed; | |
} | |
} | |
/// SeqCst fence is slower on most architectures that are either | |
/// strongly-ordered (i.e. x86 mfence) or have operational orderings (i.e. aarch64 ldar/stlr). | |
/// | |
/// On Armv7 and lower, it doesn't generate as any barriers so it's more effective while being correct: | |
/// https://fzn.fr/readings/ppopp13.pdf | |
const use_seq_cst_fence = arch.isARM() or arch.isThumb(); | |
pub fn pop(self: *Buffer) ?*Node { | |
// Read the tail without synchronization as we're the only thread that can update it. | |
const tail = self.tail.loadUnchecked(); | |
// Bail if the array isn't empty without invaliding the cache-line from the tail update below. | |
var head = self.head.load(.Monotonic); | |
if (tail == head) { | |
return null; | |
} | |
// Speculatively update the tail with the assumption that we dequeued from the array. | |
// The tail must be updated before observing the head which can't be ensured without SeqCst: | |
// | |
// reads / writes ----------- | |
// ^ | | |
// | store(tail, Release) -| | | |
// x | | x | |
// | |- load(head, Acquire) | | |
// | v | |
// |----- reads / writes | |
// | |
// SeqCst *on both* the store() and the load() ensure a total-ordering between | |
// both atomics which prevents them from being re-ordered around each other. | |
if (use_seq_cst_fence) { | |
self.tail.store(tail -% 1, .Monotonic); | |
std.atomic.fence(.SeqCst); | |
head = self.head.load(.Monotonic); | |
} else { | |
self.tail.store(tail -% 1, .SeqCst); | |
head = self.head.load(.SeqCst); | |
} | |
// If the array becomes empty (head was updated after store() but before load()) | |
// then we need to restore the tail to its original value so that head == tail. | |
if (head == tail) { | |
self.tail.store(tail, .Monotonic); | |
return null; | |
} | |
// Updating the tail speculatively means that the node is already claimed. | |
// However, the last node in the array must be handled specially below. | |
const node = self.array[(tail -% 1) % capacity].loadUnchecked(); | |
if (head -% tail > 1) { | |
return node; | |
} | |
// A steal() thread may see a valid head/tail and get preempted before it updates the head. | |
// If we don't update the head on the last node, the preempted stealer could update it and assume it owns the node. | |
// | |
// Update the head with SeqCst which synchronizes with the ordering of the store/load above: | |
// - it also implies Acquire to prevent node updates from being reordered before the "local steal" | |
// Restore the tail on failure to do so. | |
if (self.head.compareAndSwap( | |
head, | |
tail, | |
.SeqCst, | |
.Monotonic, | |
)) |udpated| { | |
self.tail.store(tail, .Monotonic); | |
return null; | |
} | |
return node; | |
} | |
pub fn steal(self: *Buffer) error{Empty, Contended}!*Node { | |
// Acquire barrier on head to prevent the tail load below from being reordered before it. | |
const head = self.head.load(.Acquire); | |
if (use_seq_cst_fence) { | |
std.atomic.fence(.SeqCst); | |
} | |
// Acquire barrier on tail load to ensure we see the Released node in the array. | |
const tail = self.tail.load(.Acquire); | |
// The tail can be smaller-than/precedessor-of the head when the pop() thread | |
// speculatively updates the tail while the queue is empty (head == tail). | |
if (tail -% head <= 0) { | |
return error.Empty; | |
} | |
// Read the node from the array before actually performing the steal. | |
// If read after, the node could have been written over by a push() and give an unstolen node. | |
// Also read from the array atomically given a push() could write over it with a valid node on wrap. | |
const node = self.array[head % capacity].load(.Unordered); | |
// Try to advance the head, therefor claiming the read node above. | |
// SeqCst on success to synchronize with the ordering of store() and CAS() in push(): | |
// - it also implies Release to prevent the node read from being reordered after the steal. | |
// - it also implies Acquire to prevent node updates from being reordered before the steal. | |
if (self.head.compareAndSwap( | |
head, | |
head +% 1, | |
.SeqCst, | |
.Monotonic, | |
)) |updated| { | |
return error.Contended; | |
} | |
return node orelse unreachable; | |
} | |
pub fn consume( | |
noalias self: *Buffer, | |
noalias consumer: *List.Consumer, | |
noalias pushed_to_buffer: *bool, | |
) ?*Node { | |
var pushed: usize = 0; | |
var popped = consumer.pop(); | |
const tail = self.tail.loadUnchecked(); | |
while (true) { | |
// Reload the head in case any stealer threads updated it. | |
// Relaxed ordering as it's not being used to synchronize any memory between threads. | |
const head = self.head.load(.Monotonic); | |
// Bail if the buffer is as full as it can get. | |
const size = (tail +% pushed) -% head; | |
assert(size <= capacity); | |
if (size == capacity) { | |
break; | |
} | |
// Try to dequeue a node from the Consumer and push it to the array. | |
// Writing into the array must be done atomically due to concurrent stealing threads reading from it. | |
const node = consumer.pop() orelse break; | |
assert(pushed < capacity); | |
self.array[(tail +% pushed) % capacity].store(node, .Unordered); | |
pushed += 1; | |
} | |
// Use one of the array's nodes as the return value if the first consumer.pop() failed. | |
if (popped == null and pushed > 0) { | |
pushed -= 1; | |
popped = self.array[(tail +% pushed) % capacity].loadUnchecked(); | |
} | |
// Update the tail if there were nodes pushed into the array. | |
// Release ordering on tail update so stealers see the pushed nodes in the array with an Acquire tail load. | |
if (pushed > 0) { | |
pushed_to_buffer.* = true; | |
self.tail.store(tail +% pushed, .Release); | |
} | |
return popped; | |
} | |
}; | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Task = @This(); | |
next: ?*Task align(std.math.max(4, @alignOf(usize))) = undefined, | |
onExecute: fn(*Task) void, | |
pub fn execute(self: *Task) void { | |
return (self.onExecute)(self); | |
} | |
pub const Platform = struct { | |
callFn: fn(*Platform, Command) void, | |
pub fn call(self: *Platform, command: Command) void { | |
return (self.callFn)(self, command); | |
} | |
pub const Command = union(enum) { | |
WorkerWait: struct { | |
worker: *Worker, | |
}, | |
WorkerNotify: struct { | |
worker: *Worker, | |
}, | |
WorkerSpawn: struct { | |
scheduler: *Scheduler, | |
spawned: *bool, | |
}, | |
WorkerSignal: struct { | |
scheduler: *Scheduler, | |
max_notify: usize, | |
}, | |
WorkerPollFirst: struct { | |
worker: *Worker, | |
be_fair: *bool, | |
attempts: *usize, | |
}, | |
WorkerPollYield: struct { | |
worker: *Worker, | |
be_fair: *bool, | |
attempt: *usize, | |
}, | |
WorkerPollLast: struct { | |
worker: *Worker, | |
}, | |
WorkerExecute: struct { | |
worker: *Worker, | |
task: *Task, | |
}, | |
WorkerJoinWait: struct { | |
worker: *Worker, | |
scheduler: *Scheduler, | |
}, | |
WorkerJoinNotify: struct { | |
worker: *Worker, | |
scheduler: *Scheduler, | |
}, | |
ShutdownWait: struct { | |
scheduler: *Scheduler, | |
}, | |
ShutdownSignal: struct { | |
scheduler: *Scheduler, | |
waiting: usize, | |
}, | |
}; | |
}; | |
pub const Group = struct { | |
len: usize = 0, | |
head: *Task = undefined, | |
tail: *Task = undefined, | |
pub fn from(task: *Task) Group { | |
task.next = null; | |
return .{ | |
.len = 1, | |
.head = task, | |
.tail = task, | |
}; | |
} | |
pub fn push(self: *Group, group: Group) void { | |
if (self.len == 0) { | |
self.* = group; | |
} else if (group.len > 0) { | |
self.tail.next = group.head; | |
self.tail = group.tail; | |
self.len += group.len; | |
} | |
} | |
pub fn pop(self: *Group) ?*Task { | |
if (self.len == 0) { | |
return null; | |
} else { | |
const task = self.head; | |
self.head = task.next orelse undefined; | |
self.len -= 1; | |
return task; | |
} | |
} | |
}; | |
pub const SchedHints = struct { | |
affinity: ?*Worker = null, | |
}; | |
pub const Worker = struct { | |
state: usize, | |
next_target: ?*Worker = null, | |
next_spawned: ?*Worker = null, | |
run_queue: struct { | |
local: UnboundedQueue, | |
overflow: UnboundedQueue, | |
buffer: BoundedQueue, | |
}, | |
const State = struct { | |
scheduler: ?*Scheduler = null, | |
is_waking: bool = false, | |
did_schedule: bool = false, | |
is_monitoring: bool = false, | |
comptime { | |
const alignment = 0b111 + 1; | |
std.debug.assert(@alignOf(Scheduler) >= alignment); | |
} | |
fn pack(self: SchedState) usize { | |
return @ptrToInt(self.scheduler) | |
| (@as(usize, @boolToInt(self.is_waking)) << 0) | |
| (@as(usize, @boolToInt(self.did_schedule)) << 1) | |
| (@as(usize, @boolToInt(self.is_monitoring)) << 2); | |
} | |
fn unpack(state: usize) State { | |
return .{ | |
.scheduler = @intToPtr(?*Scheduler, state & ~@as(usize, 0b111)), | |
.is_waking = state & (1 << 0) != 0, | |
.did_schedule = state & (1 << 1) != 0, | |
.is_monitoring = state & (1 << 2) != 0, | |
}; | |
} | |
}; | |
pub const Iter = struct { | |
worker: ?*Worker, | |
pub fn next(self: *Iter) *Worker { | |
const worker = self.worker orelse return null; | |
self.worker = worker.next_spawned; | |
return worker; | |
} | |
}; | |
pub const Config = struct { | |
be_fair: bool, | |
scheduler: *Scheduler, | |
}; | |
pub fn run(self: *Worker, config: Config) void { | |
const scheduler = config.scheduler; | |
self.* = .{ | |
.state = (State{ | |
.scheduler = scheduler, | |
.is_waking = true, | |
}).pack(), | |
.run_queue = .{ | |
.local = UnboundedQueue.init(config.be_fair), | |
.overflow = UnboundedQueue.init(config.be_fair), | |
.buffer = .{}, | |
}, | |
}; | |
self.next_spawned = @atomicLoad(?*Worker, &scheduler.spawned, .Monotonic); | |
while (true) { | |
self.next_spawned = @cmpxchgWeak( | |
?*Worker, | |
&scheduler.spawned, | |
self.next_spawned, | |
self, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
self.process(); | |
scheduler.markWorkerShutdown(); | |
scheduler.platform.call(.{ | |
.WorkerJoinWait = .{ | |
.worker = self, | |
.scheduler = scheduler, | |
}, | |
}); | |
} | |
pub fn getScheduler(self: Worker) *Scheduler { | |
const state = State.unpack(self.state); | |
return state.scheduler orelse unreachable; | |
} | |
pub fn schedule(self: *Worker, hints: SchedHints, group: Group) void { | |
if (group.len == 0) { | |
return; | |
} | |
if (hints.affinity) |worker| { | |
worker.run_queue.local.push(group); | |
return self.getScheduler().platform.call(.{ | |
.WorkerNotify = .{ | |
.worker = worker, | |
}, | |
}); | |
} | |
if (self.run_queue.buffer.push(group)) |overflowed_group| { | |
self.run_queue.overflow.push(overflowed_group); | |
} | |
var state = State.unpack(self.state); | |
if (state.is_monitoring) { | |
state.did_schedule = true; | |
self.state = state.pack(); | |
return; | |
} | |
const scheduler = state.scheduler orelse unreachable; | |
scheduler.notify(false); | |
} | |
fn process(self: *Worker) void { | |
while (blk: { | |
const state = State.unpack(self.state); | |
break :blk state.scheduler; | |
}) |scheduler| { | |
const task = self.poll(scheduler) orelse { | |
scheduler.wait(self); | |
continue; | |
}; | |
scheduler.platform.call(.{ | |
.WorkerExecute = .{ | |
.worker = self, | |
.task = task, | |
}, | |
}); | |
} | |
} | |
fn poll(self: *Worker, scheduler: *Scheduler) ?*Task { | |
var state = State.unpack(self.state); | |
std.debug.assert(!state.did_schedule); | |
std.debug.assert(!state.is_monitoring); | |
state.is_monitoring = true; | |
self.state = state.pack(); | |
const polled_task = self.pollTask(scheduler, &state.did_schedule); | |
const poll_scheduled = state.did_schedule; | |
state = State.unpack(self.state); | |
std.debug.assert(state.is_monitoring); | |
if (polled_task) |task| { | |
if (state.is_waking or state.did_schedule or poll_scheduled) { | |
scheduler.notify(state.is_waking); | |
} | |
} | |
state = State{ .scheduler = scheduler }; | |
self.state = state.pack(); | |
return polled_task; | |
} | |
fn pollTask(self: *Worker, scheduler: *Scheduler, did_schedule: *bool) ?*Task { | |
var be_fair = false; | |
var poll_attempts: usize = 0; | |
var poll_result: ?*Task = null; | |
scheduler.platform.call(.{ | |
.WorkerPollFirst = .{ | |
.worker = self, | |
.be_fair = &be_fair, | |
.result = &poll_result, | |
.attempts = &poll_attempts, | |
}, | |
}); | |
if (poll_result) |task| return task; | |
if (be_fair) { | |
if (scheduler.run_queue.tryAcquireConsumer()) |*consumer| { | |
defer consumer.release(); | |
if (consumer.pop()) |task| { | |
return task; | |
} | |
} | |
} | |
var attempt: usize = 0; | |
while (attempt < std.math.max(1, poll_attempts)) : (attempt += 1) { | |
const QueueTarget = enum{ local, buffer, overflow }; | |
var queue_targets = [_]QueueTarget{ .local, .buffer, .overflow, }; | |
if (be_fair) { | |
queue_targets = [_]QueueTarget{ .buffer, .overflow, .local }; | |
} | |
for (queue_targets) |queue_target| { | |
if (switch (local_queue) { | |
.local => self.run_queue.local.popUnchecked(), | |
.buffer => self.run_queue.buffer.pop(be_fair), | |
.overflow => blk: { | |
const stole = self.run_queue.buffer.steal(&self.run_queue.buffer.overflow) orelse break :blk null; | |
did_schedule.* = stole.pushed; | |
break :blk stole.task; | |
}, | |
}) |task| { | |
return task; | |
} | |
} | |
if (self.run_queue.buffer.steal(&scheduler.run_queue)) |stole| { | |
did_schedule.* = stole.pushed; | |
return stole.task; | |
} | |
var worker_iter = scheduler.getWorkerCount(); | |
while (worker_iter > 0) : (worker_iter -= 1) { | |
const target = self.next_target.worker orelse self; | |
if (target != self) { | |
var steal_targets = [_]QueueTarget{ .overflow, .buffer }; | |
if (be_fair and target.run_queue.overflow.be_fair) { | |
steal_targets = [_]QueueTarget{ .buffer, .overflow }; | |
} | |
for (steal_targets) |steal_target| { | |
if (switch (steal_target) { | |
.buffer => self.run_queue.buffer.steal(&target.run_queue.buffer), | |
.overflow => self.run_queue.buffer.steal(&target.run_queue.overflow), | |
}) |stole| { | |
did_schedule.* = stole.pushed; | |
return stole.task; | |
} | |
} | |
} | |
self.next_target = target.next_spawned | |
orelse scheduler.getWorkerIter().worker | |
orelse unreachable; | |
} | |
scheduler.platform.call(.{ | |
.WorkerPollYield = .{ | |
.worker = worker, | |
.be_fair = &be_fair, | |
.attempt = &attempt, | |
}, | |
}); | |
} | |
scheduler.platform.call(.{ | |
.WorkerPollLast = .{ | |
.worker = worker, | |
.result = &poll_result, | |
}, | |
}); | |
return poll_result; | |
} | |
}; | |
pub const Scheduler = struct { | |
max_workers: usize, | |
platform: *Platform, | |
spawned: ?*Worker = null, | |
idle_queue: usize = 0, | |
run_queue: UnboundedQueue, | |
pub const Config = struct { | |
be_fair: bool, | |
max_workers: usize, | |
platform: *Platform, | |
}; | |
pub fn init(config: Config) Scheduler { | |
return .{ | |
.max_workers = std.math.min(config.max_workers, std.math.maxInt(IdleQueue.Count)), | |
.platform = config.platform, | |
.run_queue = UnboundedQueue.init(config.be_fair), | |
}; | |
} | |
pub fn getWorkerCount(self: *const Scheduler) usize { | |
const idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic)); | |
return idle_queue.spawned; | |
} | |
pub fn getWorkerIter(self: *const Scheduler) Worker.Iter { | |
const spawned = @atomicLoad(?*Worker, &self.spawned, .Acquire); | |
return Worker.Iter{ .worker = spawned }; | |
} | |
pub fn schedule(self: *Scheduler, hints: SchedHints, group: Group) void { | |
if (group.len == 0) { | |
return; | |
} | |
if (hints.affinity) |worker| { | |
return worker.schedule(hints, group); | |
} | |
self.run_queue.push(group); | |
self.notify(false); | |
} | |
fn notify(self: *Scheduler, is_waking: bool) void { | |
@setCold(true); | |
const platform = self.platform; | |
const max_workers = self.max_workers; | |
var idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic)); | |
while (true) { | |
const result = idle_queue.notify(.{ | |
.is_waking = is_waking, | |
.max_spawn = max_workers, | |
}) orelse return; | |
const new_idle_queue = switch (result) { | |
.changed => |q| q, | |
.resumed => |q| q, | |
.spawned => |q| q, | |
}; | |
if (@cmpxchgWeak( | |
usize, | |
&self.idle_queue, | |
idle_queue.pack(), | |
new_idle_queue.pack(), | |
.Release, | |
.Monotonic, | |
)) |updated| { | |
idle_queue = IdleQueue.unpack(updated); | |
continue; | |
} | |
const do_notify = switch (result) { | |
.changed => return, | |
.resumed => true, | |
.spawned => false, | |
}; | |
if (do_notify) { | |
return platform.call(.{ | |
.WorkerSignal = .{ | |
.scheduler = self, | |
.max_notify = 1, | |
}, | |
}); | |
} | |
var spawned = false; | |
platform.call(.{ | |
.WorkerSpawn = .{ | |
.scheduler = self, | |
.spawned = &spawned, | |
}, | |
}); | |
if (!spawned) self.markWorkerShutdown(); | |
return; | |
} | |
} | |
fn wait(self: *Scheduler, worker: *Worker) void { | |
@setCold(true); | |
var is_waiting = false; | |
var state = Worker.State.unpack(worker.state); | |
var idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic)); | |
while (true) { | |
const result = idle_queue.wait(blk: { | |
if (state.did_schedule) break :blk .notified; | |
if (state.is_waking) break :blk .waking; | |
if (is_waiting) break :blk .waiting; | |
break :blk .running; | |
}) orelse { | |
state.scheduler = null; | |
break; | |
}; | |
if (switch (result) { | |
.running => |q| @as(?IdleQueue, q), | |
.waking => |q| @as(?IdleQueue, q), | |
.waiting => |q| q, | |
}) |new_idle_queue| { | |
if (@cmpxchgWeak( | |
usize, | |
&self.idle_queue, | |
idle_queue.pack(), | |
new_idle_queue.pack(), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
idle_queue = IdleQueue.unpack(updated); | |
continue; | |
} | |
} | |
if (switch (result) { | |
.running => false, | |
.waking => true, | |
.waiting => null, | |
}) |is_waking| { | |
state.is_waking = is_waking; | |
break; | |
} | |
is_waiting = true; | |
state.is_waking = false; | |
state.is_monitoring = true; | |
worker.state = state.pack(); | |
self.platform.call(.{ | |
.WorkerWait = .{ | |
.worker = worker, | |
}, | |
}); | |
state = Worker.State.unpack(worker.state); | |
std.debug.assert(state.scheduler == self); | |
std.debug.assert(state.is_monitoring); | |
std.debug.assert(!state.is_waking); | |
state.is_monitoring = false; | |
idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic)); | |
} | |
} | |
pub fn shutdown(self: *Scheduler) void { | |
@setCold(true); | |
var idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic)); | |
while (true) { | |
const result = idle_queue.shutdown() orelse return; | |
if (@cmpxchgWeak( | |
usize, | |
&self.idle_queue, | |
idle_queue.pack(), | |
result.changed.pack(), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
idle_queue = IdleQueue.unpack(updated); | |
continue; | |
} | |
if (result.resumed == 0) return; | |
return platform.call(.{ | |
.WorkerSignal = .{ | |
.scheduler = self, | |
.max_notify = result.resumed, | |
}, | |
}); | |
} | |
} | |
fn markWorkerShutdown(self: *Scheduler) void { | |
@setCold(true); | |
var idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic)); | |
while (true) { | |
const result = idle_queue.notifyShutdown(); | |
if (@cmpxchgWeak( | |
usize, | |
&self.idle_queue, | |
idle_queue.pack(), | |
result.changed.pack(), | |
.Release, | |
.Monotonic, | |
)) |updated| { | |
idle_queue = IdleQueue.unpack(updated); | |
continue; | |
} | |
if (result.resumed == 0) return; | |
return platform.call(.{ | |
.ShutdownSignal = .{ | |
.scheduler = self, | |
.waiting = result.resumed, | |
}, | |
}); | |
} | |
} | |
pub fn join(self: *Scheduler) void { | |
@setCold(true); | |
self.shutdown(); | |
self.waitForWorkerShutdown(); | |
var workers = self.getWorkerIter(); | |
while (workers.next()) |worker| { | |
self.platform.call(.{ | |
.WorkerJoinNotify = .{ | |
.worker = worker, | |
.scheduler = self, | |
}, | |
}); | |
} | |
} | |
fn waitForWorkerShutdown(self: *Scheduler) void { | |
@setCold(true); | |
var idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic)); | |
while (true) { | |
const result = idle_queue.waitShutdown() orelse return; | |
if (@cmpxchgWeak( | |
usize, | |
&self.idle_queue, | |
idle_queue.pack(), | |
result.changed.pack(), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
idle_queue = IdleQueue.unpack(updated); | |
continue; | |
} | |
return platform.call(.{ | |
.ShutdownWait = .{ | |
.scheduler = self, | |
}, | |
}); | |
} | |
} | |
}; | |
const IdleQueue = struct { | |
idle: usize = 0, | |
spawned: usize = 0, | |
notified: bool = false, | |
state: State = .pending, | |
const count_bits = (std.meta.bitCount(usize) - 4) / 2; | |
const Count = std.meta.Int(.unsigned, count_bits); | |
const State = enum(u2) { | |
pending = 0, | |
waking, | |
notified, | |
shutdown, | |
}; | |
pub fn pack(self: IdleQueue) usize { | |
return ((@as(usize, @boolToInt(self.notified)) << 0) | | |
(@as(usize, @enumToInt(self.state)) << 1) | | |
(@as(usize, @intCast(Count, self.idle)) << 4) | | |
(@as(usize, @intCast(Count, self.spawned)) << (4 + count_bits))); | |
} | |
pub fn unpack(value: usize) IdleQueue { | |
return .{ | |
.notified = value & (1 << 0) != 0, | |
.state = @intToEnum(State, @truncate(u2, value >> 1)), | |
.idle = @truncate(Count, value >> 4), | |
.spawned = @truncate(Count, value >> (4 + count_bits)), | |
}; | |
} | |
pub const Notify = union(enum) { | |
changed: IdleQueue, | |
resumed: IdleQueue, | |
spawned: IdleQueue, | |
pub const Context = struct { | |
is_waking: bool, | |
max_spawn: usize, | |
}; | |
}; | |
pub fn notify(self: IdleQueue, context: Notify.Context) ?Notify { | |
if (self.state == .shutdown) { | |
return null; | |
} | |
var updated = self; | |
updated.notified = true; | |
if (context.is_waking) { | |
std.debug.assert(self.state == .waking); | |
} | |
if (self.idle > 0 and (context.is_waking or self.state == .pending)) { | |
updated.state = .notified; | |
return Notified{ .resumed = updated }; | |
} | |
if (self.spawned < context.max_spawn and (context.is_waking or self.state == .pending)) { | |
updated.spawned += 1; | |
return Notified{ .spawned = updated }; | |
} | |
if (context.is_waking) { | |
updated.state = .pending; | |
} else if (self.notified) { | |
return null; | |
} | |
return Notified{ .changed = updated }; | |
} | |
pub const Wait = union(enum) { | |
running: IdleQueue, | |
waking: IdleQueue, | |
waiting: ?IdleQueue, | |
pub const Context = enum { | |
running, | |
waking, | |
waiting, | |
notified, | |
}; | |
}; | |
pub fn wait(self: IdleQueue, context: Wait.Context) ?Wait { | |
if (self.state == .shutdown) { | |
return null; | |
} | |
switch (context) { | |
.running => {}, | |
.waking => std.debug.assert(self.state == .waking), | |
.waiting, .notified => std.debug.assert(self.idle > 0), | |
} | |
var updated = self; | |
updated.notified = false; | |
if (self.notified or context == .notified) { | |
updated.notified = false; | |
if (context == .waiting) { | |
updated.idle -= 1; | |
} | |
if (context == .waking or self.state == .notified) { | |
updated.state = .waking; | |
return Wait{ .waking = updated }; | |
} | |
return Wait{ .running = updated }; | |
} | |
if (context == .waiting) { | |
return Wait{ .waiting = null }; | |
} | |
updated.idle += 1; | |
if (context == .waking) { | |
updated.state = .pending; | |
} | |
return Wait{ .waiting = updated }; | |
} | |
pub const Shutdown = struct { | |
changed: IdleQueue, | |
resumed: usize, | |
}; | |
pub fn shutdown(self: IdleQueue) ?Shutdown { | |
if (self.state == .shutdown) { | |
return null; | |
} | |
var updated = self; | |
updated.state = .shutdown; | |
updated.idle = 0; | |
updated.notified = true; | |
return Shutdown{ | |
.changed = updated, | |
.resumed = self.idle, | |
}; | |
} | |
pub const NotifyShutdown = struct { | |
changed: IdleQueue, | |
resumed: usize, | |
}; | |
pub fn notifyShutdown(self: IdleQueue) NotifyShutdown { | |
std.debug.assert(self.state == .shutdown); | |
std.debug.assert(self.spawned > 0); | |
var updated = self; | |
updated.spawned -= 1; | |
const resumed = self.spawned == 1 and self.idle > 0; | |
if (resumed) { | |
self.idle = 0; | |
} | |
return NotifyShutdown{ | |
.changed = updated, | |
.resumed = if (resumed) self.idle else 0, | |
}; | |
} | |
pub const WaitShutdown = struct { | |
changed: IdleQueue, | |
}; | |
pub fn waitShutdown(self: IdleQueue) ?WaitShutdown { | |
std.debug.assert(self.state == .shutdown); | |
if (self.spawned == 0) { | |
return null; | |
} | |
var updated = self; | |
updated.idle += 1; | |
return WaitShutdown{ .changed = updated }; | |
} | |
}; | |
const UnboundedQueue = struct { | |
be_fair: bool, | |
queue: Queue, | |
pub fn init(be_fair: bool) UnboundedQueue { | |
return UnboundedQueue{ | |
.be_fair = be_fair, | |
.queue = switch (be_fair) { | |
true => .{ .fifo = .{} }, | |
else => .{ .lifo = .{} }, | |
}, | |
}; | |
} | |
pub fn push(self: *UnboundedQueue, group: Group) void { | |
const head = group.head; | |
const tail = group.tail; | |
std.debug.assert(group.len > 0); | |
std.debug.assert(head.next != null); | |
std.debug.assert(tail.next == null); | |
return switch (self.be_fair) { | |
true => self.queue.fifo.push(head, tail), | |
else => self.queue.lifo.push(head, tail), | |
}; | |
} | |
pub fn popUnchecked(self: *UnboundedQueue) ?*Task { | |
return switch (self.be_fair) { | |
true => self.queue.fifo.pop(null), | |
else => self.queue.lifo.pop(null), | |
}; | |
} | |
pub fn tryAcquireConsumer(self: *UnboundedQueue) ?Consumer { | |
const be_fair = self.be_fair; | |
const consumer = switch (be_fair) { | |
true => self.queue.fifo.acquireConsumer(), | |
else => self.queue.lifo.acquireConsumer(), | |
}; | |
return Consumer{ | |
.be_fair = be_fair, | |
.queue = &self.queue, | |
.consumer = consumer orelse return null, | |
}; | |
} | |
pub const Consumer = struct { | |
be_fair: bool, | |
queue: *Queue, | |
consumer: usize, | |
pub fn pop(self: *Consumer) ?*Task { | |
return switch (self.be_fair) { | |
true => self.queue.fifo.pop(&self.consumer), | |
else => self.queue.lifo.pop(&self.consumer), | |
}; | |
} | |
pub fn release(self: Consumer) void { | |
return switch (self.be_fair) { | |
true => self.queue.fifo.releaseConsumer(self.consumer), | |
else => self.queue.lifo.releaseConsumer(self.consumer), | |
}; | |
} | |
}; | |
const Queue = extern union { | |
fifo: FifoQueue, | |
lifo: LifoQueue, | |
}; | |
const LifoQueue = struct { | |
stack: usize = 0, | |
local: ?*Task = null, | |
const HAS_LOCAL: usize = 1 << 1; | |
const HAS_CONSUMER: usize = 1 << 0; | |
const PTR_MASK = ~(HAS_CONSUMER | HAS_LOCAL); | |
comptime { | |
std.debug.assert((~PTR_MASK + 1) >= @alignOf(Task)); | |
} | |
fn push(self: *LifoQueue, head: *Task, tail: *Task) void { | |
var stack = @atomicLoad(usize, &self.stack, .Monotonic); | |
while (true) { | |
tail.next = @intToPtr(?*Task, stack & PTR_MASK); | |
stack = @cmpxchgWeak( | |
usize, | |
&self.stack, | |
stack, | |
@ptrToInt(head) | (stack & ~PTR_MASK), | |
.Release, | |
.Monotonic, | |
) orelse return; | |
} | |
} | |
fn pop(self: *LifoQueue, consumer_ref: ?*usize) ?*Task { | |
const local_ptr = blk: { | |
const ref = consumer_ref orelse break :blk &self.local; | |
break :blk @ptrCast(*?*Task, ref); | |
}; | |
if (local_ptr.*) |task| { | |
local_ptr.* = task.next; | |
return task; | |
} | |
var stack = @atomicLoad(usize, &self.stack, .Monotonic); | |
if (stack & PTR_MASK != 0) { | |
stack = @atomicRmw(usize, &self.stack, .Xchg, HAS_LOCAL | HAS_CONSUMER, .Acquire); | |
} | |
const task = @intToPtr(?*Task, stack & PTR_MASK) orelse return null; | |
local_ptr.* = task.next; | |
return task; | |
} | |
fn acquireConsumer(self: *LifoQueue) ?usize { | |
var stack = @atomicLoad(usize, &self.stack, .Monotonic); | |
while (true) { | |
if ((stack & ~HAS_CONSUMER == 0) or (stack & HAS_CONSUMER != 0)) { | |
return null; | |
} | |
var new_stack = stack | HAS_CONSUMER | HAS_LOCAL; | |
const take_stack = stack & HAS_LOCAL == 0; | |
if (take_stack) { | |
new_stack &= ~PTR_MASK; | |
} | |
stack = @cmpxchgWeak( | |
usize, | |
&self.stack, | |
stack, | |
new_stack, | |
.Acquire, | |
.Monotonic, | |
) orelse { | |
if (take_stack) return stack & PTR_MASK; | |
return @ptrToInt(self.local); | |
}; | |
} | |
} | |
fn releaseConsumer(self: *LifoQueue, consumer: usize) void { | |
var remove: usize = HAS_CONSUMER; | |
const local = @ptrCast(?*Task, consumer); | |
if (local == null) { | |
remove |= HAS_LOCAL; | |
} | |
self.local = local; | |
_ = @atomicRmw(usize, &self.stack, .Sub, remove, .Release); | |
} | |
}; | |
const FifoQueue = struct { | |
head: usize = 0, | |
tail: ?*Task = null, | |
stub_next: ?*Task = null, | |
fn stub(self: *FifoQueue) callconv(.Inline) *Task { | |
return @fieldParentPtr(Task, "next", &self.stub_next); | |
} | |
fn isEmpty(self: *const FifoQueue) callconv(.Inline) bool { | |
const tail = @atomicLoad(?*Task, &self.tail, .Monotonic); | |
return (tail == null) or (tail == self.stub()); | |
} | |
fn push(self: *FifoQueue, head: *Task, tail: *Task) void { | |
const queue_tail = @atomicRmw(?*Task, &self.tail, .Xchg, tail, .AcqRel); | |
const queue_next = if (queue_tail) |t| &t.next else &self.stub_next; | |
@atomicStore(?*Task, queue_next, head, .Release); | |
} | |
fn pop(self: *FifoQueue, consumer_ref: ?*usize) ?*Task { | |
const stub = self.stub(); | |
const head_ref = blk: { | |
const ref = consumer_ref orelse &self.head; | |
break :blk @ptrCast(**Task, ref); | |
}; | |
var head = head_ref.*; | |
if (head == stub) { | |
head = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null; | |
head_ref.* = head; | |
} | |
if (@atomicLoad(?*Task, &head.next, .Acquire)) |next| { | |
head_ref.* = next; | |
return head; | |
} | |
const tail = @atomicLoad(?*Task, &self.tail, .Acquire); | |
if (tail != head) { | |
return null; | |
} | |
self.stub_next = null; | |
self.push(stub, stub); | |
const next = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null; | |
head_ref.* = next; | |
return head; | |
} | |
const HAS_CONSUMER: usize = 1 << 0; | |
fn acquireConsumer(self: *FifoQueue) ?usize { | |
if (self.isEmpty()) return null; | |
return self.acquireConsumerSlow(); | |
} | |
fn acquireConsumerSlow(self: *FifoQueue) ?usize { | |
@setCold(true); | |
var head = @atomicLoad(usize, &self.head, .Monotonic); | |
while (true) { | |
if ((head & HAS_CONSUMER != 0) or self.isEmpty()) { | |
return null; | |
} | |
head = @cmpxchgWeak( | |
usize, | |
&self.head, | |
head, | |
head | HAS_CONSUMER, | |
.Acquire, | |
.Monotonic, | |
) orelse { | |
if (head == 0) return @ptrToInt(self.stub()); | |
return head; | |
}; | |
} | |
} | |
fn releaseConsumer(self: *FifoQueue, consumer: usize) void { | |
@atomicStore(usize, &self.head, consumer, .Release); | |
} | |
}; | |
}; | |
const BoundedQueue = struct { | |
head: Index = 0, | |
tail: Index = 0, | |
buffer: [capacity]*Task = undefined, | |
const capacity = 256; | |
const Index = std.meta.Int(.unsigned, std.meta.bitCount(*Task) / 2); | |
comptime { | |
std.debug.assert(std.math.maxInt(Index) >= capacity); | |
} | |
pub fn push(self: *BoundedQueue, group: Group) ?Group { | |
std.debug.assert(group.len > 0); | |
if (group.len > capacity) { | |
return group; | |
} | |
var tail = self.tail; | |
const head = @atomicLoad(Index, &self.head, .Monotonic); | |
std.debug.assert(tail -% head <= capacity); | |
const free_slots = capacity - (tail -% head); | |
if (group.len > free_slots) { | |
return group; | |
} | |
var tasks = group; | |
while (tasks.pop()) |task| { | |
@atomicStore(*Task, &self.buffer[tail % capacity], task, .Unordered); | |
tail +%= 1; | |
} | |
@atomicStore(Index, &self.tail, tail, .Release); | |
return null; | |
} | |
pub fn pop(self: *BoundedQueue, be_fair: bool) ?*Task { | |
const tail = self.tail; | |
var head = @atomicLoad(Index, &self.head, .Monotonic); | |
if (head == tail) { | |
return null; | |
} | |
while (be_fair) { | |
head = @cmpxchgWeak( | |
Index, | |
&self.head, | |
head, | |
head +% 1, | |
.Acquire, | |
.Monotonic, | |
) orelse return self.buffer[head % capacity]; | |
if (head == tail) { | |
return null; | |
} | |
} | |
const new_tail = tail -% 1; | |
@atomicStore(Index, &self.tail, new_tail, .SeqCst); | |
head = @atomicLoad(Index, &self.head, .SeqCst); | |
std.debug.assert(tail -% head <= capacity); | |
var task: ?*Task = null; | |
if (head != tail) { | |
task = self.buffer[new_tail % capacity]; | |
if (head != new_tail) { | |
return task; | |
} else if (@cmpxchgStrong(Index, &self.head, head, tail, .SeqCst, .Monotonic) != null) { | |
task = null; | |
} | |
} | |
@atomicStore(Index, &self.tail, tail, .Monotonic); | |
return task; | |
} | |
pub const Steal = struct { | |
task: *Task, | |
pushed: bool = false, | |
}; | |
pub fn steal(noalias self: *BoundedQueue, noalias queue: anytype) ?Steal { | |
const is_unbounded = switch (@TypeOf(queue)) { | |
*BoundedQueue => false, | |
*UnboundedQueue => true, | |
else => |T| @compileError("Cannot steal from " ++ @typeName(T)), | |
}; | |
const tail = self.tail; | |
const head = @atomicLoad(Index, &self.head, .Unordered); | |
std.debug.assert(tail == head); | |
if (is_unbounded) { | |
var consumer = queue.tryAcquireConsumer() orelse return null; | |
defer consumer.release(); | |
var pushed: usize = 0; | |
while (pushed < capacity) : (pushed += 1) { | |
const task = consumer.pop() orelse break; | |
const index = (tail +% pushed) % capacity; | |
@atomicStore(*Task, &self.buffer[index], task, .Unordered); | |
} | |
var task = consumer.pop(); | |
if (task == null and pushed > 0) { | |
pushed -= 1; | |
task = self.buffer[(tail +% pushed) % capacity]; | |
} | |
if (pushed > 0) @atomicStore(Index, &self.tail, tail +% pushed, .Release); | |
return Steal{ | |
.task = task, | |
.pushed = pushed > 0, | |
}; | |
} | |
var queue_head = @atomicLoad(Index, &queue.head, .SeqCst); | |
while (true) { | |
const queue_tail = @atomicLoad(Index, &queue.tail, .SeqCst); | |
if ((queue_head == queue_tail) or (queue_head == queue_tail -% 1)) { | |
return null; | |
} | |
std.debug.assert(queue_tail -% queue_head <= capacity); | |
const task = @atomicLoad(*Task, &queue.buffer[queue_head % capacity], .Unordered); | |
queue_head = @cmpxchgWeak( | |
Index, | |
&queue.head, | |
queue_head, | |
queue_head +% 1, | |
.SeqCst, | |
.SeqCst, | |
) orelse return Steal{ | |
.task = task, | |
.pushed = false, | |
}; | |
} | |
} | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Pool = @This(); | |
pub const Task = @import("./t2.zig"); | |
stack_size: usize, | |
optimize_for: Config.OptimizeFor, | |
scheduler: Task.Scheduler, | |
shutdown_event: ResetEvent = .{}, | |
start_thread: ?*Thread = null, | |
idle_queue: AbaAtomicUsize = .{ .value = 0 }, | |
pub const Config = struct { | |
optimize_for: OptimizeFor, | |
max_threads: usize, | |
stack_size: usize, | |
pub const OptimizeFor = enum{ | |
Latency, | |
Throughput, | |
}; | |
pub fn getDefault() Config { | |
return Config{ | |
.optimize_for = .Throughput, | |
.stack_size = 16 * 1024 * 1024, | |
.max_threads = blk: { | |
if (std.builtin.single_threaded) break :blk 1; | |
break :blk std.math.max(1, std.Thread.cpuCount() catch 1), | |
}, | |
}; | |
} | |
}; | |
pub fn init(config: Config) Pool { | |
return .{ | |
.stack_size = std.math.max(64 * 1024, config.stack_size), | |
.optimize_for = optimize_for, | |
.scheduler = Task.Scheduler.init(.{ | |
.be_fair = config.optimize_for == .Latency, | |
.max_workers = std.math.max(1, config.max_threads), | |
.platform = &pool_platform, | |
}), | |
}; | |
} | |
pub fn deinit(self: *Pool) void { | |
self.scheduler.shutdown(); | |
self.scheduler.join(); | |
self.shutdown_event.deinit(); | |
self.* = undefined; | |
} | |
pub const StartConfig = struct { | |
use_caller_thread: bool = false, | |
initial_group: Task.Group = .{}, | |
fn noop(task: *Task) void {}; | |
}; | |
pub fn start(self: *Pool, config: StartConfig) void { | |
var group = config.initial_group; | |
var noop_task = Task{ .onExecute = StartConfig.noop }; | |
if (config.user_caller_thread) { | |
self.start_thread = @intToPtr(*Thread, @alignOf(Thread)); | |
group.push(Task.Group.from(&noop_task)); | |
} | |
if (group.len > 0) { | |
self.scheduler.schedule(.{}, group); | |
} | |
} | |
pub fn stop(self: *Pool) void { | |
self.scheduler.shutdown(); | |
} | |
pub fn schedule(self: *Pool, groupable: anytype) void { | |
const group = switch (@TypeOf(groupable)) { | |
Task.Group => groupable, | |
*Task => Task.Group.from(groupable), | |
else => |T| @compileError(@typeName(T) ++ " is not a Task.Group"), | |
}; | |
if (group.len == 0) { | |
return; | |
} | |
if (Thread.getCurrent()) |thread| { | |
thread.worker.schedule(.{}, group); | |
} else { | |
self.scheduler.schedule(.{}, group); | |
} | |
} | |
const IDLE_PTR = ~IDLE_NOTIFIED; | |
const IDLE_NOTIFIED: usize = 0b1; | |
const IDLE_SHUTDOWN = std.math.maxInt(usize); | |
fn idleWait(self: *Pool, thread: *Thread) void { | |
@setCold(true); | |
var idle_queue = self.idle_queue.load(.Monotonic); | |
while (true) { | |
if (idle_queue.value == IDLE_SHUTDOWN) { | |
return; | |
} | |
var new_idle_queue: usize = 0; | |
if (idle_queue.value != IDLE_NOTIFIED) { | |
const idle_next = @intToPtr(?*Thread, idle_queue.value & IDLE_PTR); | |
@atomicStore(?*Thread, &thread.idle_next, idle_next, .Unordered); | |
new_idle_queue = @ptrToInt(thread); | |
} | |
if (self.idle_queue.cmpxchgWeak( | |
idle_queue, | |
new_idle_queue, | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
idle_queue = updated; | |
continue; | |
} | |
if (idle_queue.value != IDLE_NOTIFIED) { | |
thread.idle_event.wait(); | |
} | |
return; | |
} | |
} | |
fn idleWake(self: *Pool, wake_all: bool) void { | |
@setCold(true); | |
var idle_threads: ?*Thread = null; | |
defer while (idle_threads) |thread| { | |
idle_threads = thread.idle_next; | |
thread.idle_event.set(); | |
} | |
if (wake_all) { | |
const idle_queue = @atomicRmw( | |
usize, | |
&self.idle_queue.value, | |
.Xchg, | |
IDLE_SHUTDOWN, | |
.Acquire, | |
); | |
return switch (idle_queue) { | |
IDLE_SHUTDOWN => unreachable, | |
IDLE_NOTIFIED => {}, | |
else => idle_threads = @intToPtr(?*Thread, idle_queue & IDLE_PTR), | |
}; | |
} | |
var idle_queue = self.idle_queue.load(.Acquire); | |
while (true) { | |
const idle_thread = switch (idle_queue.value) { | |
IDLE_SHUTDOWN => return, | |
IDLE_NOTIFIED => return, | |
else => @intToPtr(?*Thread, idle_queue.value & IDLE_PTR), | |
}; | |
var new_idle_queue = IDLE_NOTIFIED; | |
if (idle_thread) |thread| { | |
const next_thread = @atomicLoad(?*Thread, &thread.idle_next, .Unordered); | |
new_idle_queue = @ptrToInt(next_thread); | |
} | |
if (self.idle_queue.cmpxchgWeak( | |
idle_queue, | |
new_idle_queue, | |
.AcqRel, | |
.Acquire, | |
)) |updated| { | |
idle_queue = updated; | |
continue; | |
} | |
const thread = idle_thread orelse return; | |
@atomicStore(?*Thread, &thread.idle_next, null, .Unordered); | |
idle_threads = thread; | |
return; | |
} | |
} | |
const Thread = struct { | |
worker: Task.Worker = undefined, | |
handle: ?*std.Thread, | |
idle_next: ?*Thread = null, | |
idle_event: ResetEvent = .{}, | |
fn spawn(pool: *Pool) !void { | |
const Spawner = struct { | |
put_event: ResetEvent = .{}, | |
got_event: ResetEvent = .{}, | |
handle: ?*std.Thread = undefined, | |
pool_ref: *Pool = undefined, | |
fn run(self: *@This()) void { | |
self.put_event.wait(); | |
const handle = self.handle; | |
const pool_ref = self.pool_ref; | |
self.got_event.set(); | |
return Thread.run(pool_ref, handle); | |
} | |
}; | |
var spawner = Spawner{}; | |
spawner.handle = try std.Thread.spawn(Spawner.run, &spawner); | |
spawner.pool_ref = pool; | |
spawner.put_event.set(); | |
spawner.get_event.get(); | |
spawner.put_event.deinit(); | |
spawner.get_event.deinit(); | |
} | |
fn run(pool: *Pool, handle: ?*std.Thread) void { | |
var self = Thread{ .handle = handle }; | |
defer self.idle_event.deinit(); | |
if (handle == null) { | |
pool.start_thread = &self; | |
} | |
const old_tls = ThreadLocalUsize.get(); | |
ThreadLocalUsize.set(@ptrToInt(&self)); | |
defer ThreadLocalUsize.set(old_tls); | |
self.worker.run(.{ | |
.be_fair = pool.optimize_for == .Latency, | |
.scheduler = &pool.scheduler, | |
}); | |
if (handle == null) { | |
pool.scheduler.join(); | |
} | |
} | |
}; | |
var pool_platform = Task.Platform{ | |
.callFn = onPlatformCommand, | |
}; | |
fn onPlatformCommand(_platform: *Task.Platform, command: Task.Platform.Command) void { | |
@compileError("TODO"); | |
switch (command) { | |
.WorkerWait => |cmd| { | |
const thread = @fieldParentPtr(Thread, "worker", cmd.worker); | |
const pool = @fieldParentPtr(Pool, "scheduler", thread.worker.getScheduler()); | |
pool.idleWait(thread); | |
}, | |
.WorkerNotify => |cmd| { | |
const thread = @fieldParentPtr(Thread, "worker", cmd.worker); | |
thread.event.set(); | |
}, | |
.WorkerSignal => |cmd| { | |
const pool = @fieldParentPtr(Pool, "scheduler", cmd.scheduler); | |
const wake_all = cmd.max_notify > 1; | |
pool.idleWake(wake_all); | |
}, | |
.WorkerSpawn => |cmd| { | |
const pool = @fieldParentPtr(Pool, "scheduler", cmd.scheduler); | |
if (self.start_thread == @intToPtr(*Thread, @alignOf(Thread))) { | |
self.start_thread = null; | |
Thread.run(pool, null); | |
cmd.spawned.* = true; | |
return; | |
} | |
cmd.spawned.* = true; | |
Thread.spawn(pool) catch { | |
cmd.spawned.* = false; | |
}; | |
}, | |
.WorkerPollFirst => |cmd| { | |
const thread = @fieldParentPtr(Thread, "worker", cmd.worker); | |
thread.idle_event.reset(); | |
const pool = @fieldParentPtr(Pool, "scheduler", thread.worker.getScheduler()); | |
cmd.be_fair.* = pool.optimize_for == .Latency; | |
cmd.attempts.* = switch (std.builtin.arch) { | |
.i386, .x86_64 => 16, | |
.aarch64 => 8, | |
else => 4, | |
}; | |
}, | |
.WorkerPollYield => |cmd| { | |
std.os.sched_yield() catch {}; | |
}, | |
.WorkerPollLast => |cmd| { | |
return; | |
}, | |
.WorkerExecute => |cmd| { | |
cmd.task.execute(); | |
}, | |
.WorkerJoinWait => |cmd| { | |
const thread = @fieldParentPtr(Thread, "worker", cmd.worker); | |
if (thread.handle != null) { | |
thread.shutdown_event.wait(); | |
} | |
}, | |
.WorkerJoinNotify => |cmd| { | |
const thread = @fieldParentPtr(Thread, "worker", cmd.worker); | |
if (thread.handle) |handle| { | |
thread.shutdown_event.set(); | |
handle.wait(); | |
} | |
}, | |
.ShutdownWait => |cmd| { | |
const pool = @fieldParentPtr(Pool, "scheduler", cmd.scheduler); | |
pool.shutdown_event.wait(); | |
}, | |
.ShutdownSignal => |cmd| { | |
const pool = @fieldParentPtr(Pool, "scheduler", cmd.scheduler); | |
pool.shutdown_event.set(); | |
}, | |
} | |
} | |
const AbaAtomicUsize = switch (std.builtin.arch) { | |
.i386, .x86_64 => extern struct { | |
const DoubleWord = std.meta.Int(.unsigned, std.meta.bitCount(usize) * 2); | |
value: usize align(@alignOf(DoubleWord)), | |
aba_id: usize = 0, | |
pub fn load( | |
self: *const AbaAtomicUsize, | |
comptime ordering: std.builtin.AtomicOrder, | |
) AbaAtomicUsize { | |
const value = @atomicLoad(usize, &self.value, ordering); | |
const aba_id = @atomicLoad(usize, &self.aba_id, .Unordered); | |
return .{ | |
.value = value, | |
.aba_id = aba_id, | |
}; | |
} | |
pub fn cmpxchgWeak( | |
self: *AbaAtomicUsize, | |
compare: AbaAtomicUsize, | |
exchange: usize, | |
comptime success: std.builtin.AtomicOrder, | |
comptime failure: std.builtin.AtomicOrder, | |
) ?AbaAtomicUsize { | |
const double_word = @cmpxchgWeak( | |
DoubleWord, | |
@ptrCast(*DoubleWord, self), | |
@bitCast(DoubleWord, compare), | |
@bitCast(DoubleWord, AbaAtomicUsize{ | |
.value = exchange, | |
.aba_id = compare.aba_id +% 1, | |
}), | |
success, | |
failure, | |
) orelse return null; | |
return @bitCast(AbaAtomicUsize, double_word); | |
} | |
}, | |
else => struct { | |
value: usize, | |
pub fn load( | |
self: *const AbaAtomicUsize, | |
comptime ordering: std.builtin.AtomicOrder, | |
) AbaAtomicUsize { | |
const value = @atomicLoad(usize, &self.value, ordering); | |
return .{ .value = value }; | |
} | |
pub fn cmpxchgWeak( | |
self: *AbaAtomicUsize, | |
compare: AbaAtomicUsize, | |
exchange: usize, | |
comptime success: std.builtin.AtomicOrder, | |
comptime failure: std.builtin.AtomicOrder, | |
) ?AbaAtomicUsize { | |
return .{ | |
.value = @cmpxchgWeak( | |
usize, | |
&self.value, | |
compare.value, | |
exchange, | |
success, | |
failure, | |
) orelse return null, | |
}; | |
} | |
}, | |
}; | |
const is_apple_silicon = std.builtin.os.tag == .macos and std.builtin.arch == .aarch64; | |
const ThreadLocalUsize = switch (is_apple_silicon) { | |
true => struct { | |
const pthread_key_t = c_ulong; | |
extern "c" fn pthread_key_create(key: *pthread_key_t, destructor: ?fn (value: *c_void) callconv(.C) void) c_int; | |
extern "c" fn pthread_getspecific(key: pthread_key_t) ?*c_void; | |
extern "c" fn pthread_setspecific(key: pthread_key_t, value: ?*c_void) c_int; | |
const dispatch_once_t = usize; | |
const dispatch_function_t = fn (?*c_void) callconv(.C) void; | |
extern "c" fn dispatch_once_f( | |
predicate: *dispatch_once_t, | |
context: ?*c_void, | |
function: dispatch_function_t, | |
) void; | |
var tls_once: dispatch_once_t = 0; | |
var tls_key: pthread_key_t = undefined; | |
fn tls_init(_: ?*c_void) callconv(.C) void { | |
std.debug.assert(pthread_key_create(&tls_key, null) == 0); | |
} | |
pub fn get() usize { | |
dispatch_once_f(&tls_key, null, tls_init); | |
return @ptrToInt(pthread_getspecific(tls_key)); | |
} | |
pub fn set(value: usize) void { | |
dispatch_once_f(&tls_key, null, tls_init); | |
std.debug.assert(pthread_setspecific(tls_key, @intToPtr(?*c_void, value)) == 0); | |
} | |
}, | |
else => struct { | |
threadlocal var tls_usize: usize = 0; | |
pub fn get() usize { | |
return tls_usize; | |
} | |
pub fn set(value: usize) void { | |
tls_usize = value; | |
} | |
}, | |
}; | |
const ResetEvent = struct { | |
state: State = .empty, | |
futex: Futex = .{}, | |
const State = enum(u32) { | |
empty, | |
waiting, | |
notified, | |
}; | |
pub fn deinit(self: *ResetEvent) void { | |
self.futex.deinit(); | |
self.* = undefined; | |
} | |
pub fn wait(self: *ResetEvent) void { | |
while (true) { | |
var state = @atomicLoad(State, &self.state, .Acquire); | |
while (state == .empty) { | |
state = .waiting; | |
state = @cmpxchgWeak( | |
State, | |
&self.state, | |
.empty, | |
.waiting, | |
.Acquire, | |
.Acquire, | |
) orelse break; | |
} | |
if (state == .notified) return; | |
self.futex.wait( | |
@ptrCast(*const u32, &self.state), | |
@enumToInt(State.waiting), | |
); | |
} | |
} | |
pub fn set(self: *ResetEvent) void { | |
switch (@atomicRmw(State, &self.state, .Xchg, .notified, .Release)) { | |
.empty => {}, | |
.notified => {}, | |
.waiting => self.futex.wake(@ptrCast(*const u32, &self.state)), | |
} | |
} | |
pub fn reset(self: *ResetEvent) void { | |
@atomicStore(State, &self.state, .empty, .Monotonic); | |
} | |
const Futex = if (std.builtin.os.tag == .windows) | |
WindowsFutex | |
else if (std.builtin.link_libc) | |
PosixFutex | |
else if (std.builtin.os.tag == .linux) | |
LinuxFutex | |
else | |
@compileError("ResetEvent unavailable for platform"); | |
const LinuxFutex = struct { | |
pub fn deinit(self: *Futex) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Futex, ptr: *const u32, cmp: u32) void { | |
switch (std.os.linux.getErrno(std.os.linux.futex_wait( | |
@ptrCast(*const i32, ptr), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT, | |
@bitCast(i32, cmp), | |
null, | |
))) { | |
0 => {}, | |
std.os.EINTR => {}, | |
std.os.EAGAIN => {}, | |
std.os.ETIMEDOUT => unreachable, | |
else => unreachable, | |
} | |
} | |
pub fn wake(self: *Futex, ptr: *const u32) void { | |
switch (std.os.linux.getErrno(std.os.linux.futex_wake( | |
@ptrCast(*const i32, ptr), | |
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE, | |
sstd.math.maxInt(i32), | |
))) { | |
0 => {}, | |
std.os.EINVAL => {}, | |
std.os.EACCES => {}, | |
std.os.EFAULT => {}, | |
else => unreachable, | |
} | |
} | |
}; | |
const WindowsFutex = struct { | |
lock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT, | |
cond: std.os.windows.CONDITION_VARIABLE = std.os.windows.CONDITION_VARIABLE_INIT, | |
pub fn deinit(self: *Futex) void { | |
self.* = undefined; | |
} | |
pub fn wait(self: *Futex, ptr: *const u32, cmp: u32) void { | |
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.lock); | |
defer std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock); | |
if (@atomicLoad(u32, ptr, .SeqCst) == cmp) { | |
_ = std.os.windows.kernel32.SleepConditionVariableSRW( | |
&self.cond, | |
&self.lock, | |
std.os.windows.INFINITE, | |
0, | |
); | |
} | |
} | |
pub fn wake(self: *Futex, ptr: *const u32) void { | |
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.lock); | |
defer std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock); | |
std.os.windows.kernel32.WakeAllConditionVariable(&self.cond); | |
} | |
}; | |
const PosixFutex = struct { | |
mutex: std.c.pthread_mutex_t = .{}, | |
cond: std.c.pthread_cond_t = .{}, | |
pub fn deinit(self: *Futex) void { | |
const rc = std.c.pthread_cond_destroy(&self.cond); | |
std.debug.assert(rc == 0 or rc == std.os.EINVAL); | |
const rm = std.c.pthread_mutex_destroy(&self.mutex); | |
std.debug.assert(rm == 0 or rm == std.os.EINVAL); | |
} | |
pub fn wait(self: *Futex, ptr: *const u32, cmp: u32) void { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
if (@atomicLoad(u32, ptr, .SeqCst) == cmp) { | |
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == 0); | |
} | |
} | |
pub fn wake(self: *Futex, ptr: *const u32) void { | |
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0); | |
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0); | |
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0) | |
} | |
}; | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Task = @This(); | |
next: ?*Task = undefined, | |
pub const Batch = struct { | |
head: ?*Task = null, | |
tail: *Task = undefined, | |
pub fn isEmpty(self: Batch) bool { | |
return self.head == null; | |
} | |
pub fn from(task: *Task) Batch { | |
task.next = null; | |
return Batch{ | |
.head = task, | |
.tail = task, | |
}; | |
} | |
pub const push = pushBack; | |
pub fn pushBack(self: *Batch, batch: Batch) void { | |
if (self.isEmpty()) { | |
self.* = batch; | |
} else if (!batch.isEmpty()) { | |
self.tail.next = batch.head; | |
self.tail = batch.tail; | |
} | |
} | |
pub fn pushFront(self: *Batch, batch: Batch) void { | |
if (self.isEmpty()) { | |
self.* = batch; | |
} else if (!batch.isEmpty()) { | |
batch.tail.next = self.head; | |
self.head = batch.head; | |
} | |
} | |
pub const pop = popFront; | |
pub fn popFront(self: *Batch) ?*Task { | |
const task = self.head orelse return null; | |
self.head = task.next; | |
return task; | |
} | |
}; | |
pub const System = struct { | |
callFn: fn(*System, Command) void, | |
pub fn call(self: *System, command: Command) void { | |
return (self.callFn)(self, command); | |
} | |
pub const Command = union(enum) { | |
Spawn: struct { | |
scheduler: *Scheduler, | |
spawned: *bool, | |
}, | |
Join: union(enum) { | |
Wait: *Scheduler, | |
Wake: *Scheduler, | |
}, | |
Resume: union(enum) { | |
Exit: *Worker, | |
Signal: *Scheduler, | |
Broadcast: *Scheduler, | |
}, | |
Suspend: union(enum) { | |
Exit: *Worker, | |
Wait: struct { | |
worker: *Worker, | |
scheduled: *bool, | |
}, | |
}, | |
Poll: struct { | |
worker: *Worker, | |
attempt: usize, | |
task: *?*Task, | |
scheduled: *bool, | |
targets: *?[]const PollTarget, | |
}, | |
Execute: struct { | |
worker: *Worker, | |
task: *Task, | |
}, | |
pub const PollTarget = union(enum) { | |
Local: bool, // be_fair: bool | |
Global, | |
Remove: *Worker, | |
}; | |
}; | |
}; | |
pub const Scheduler = struct { | |
idle: usize = 0, | |
system: *System, | |
max_workers: Idle.Count, | |
spawned: ?*Worker = null, | |
run_queue: GlobalQueue = .{}, | |
pub const Config = struct { | |
system: *System, | |
max_workers: usize, | |
}; | |
pub fn init(config: Config) Scheduler { | |
return .{ | |
.system = config.system, | |
.max_workers = blk: { | |
const num_workers = std.math.max(1, config.max_workers); | |
const max_workers = std.math.cast(Idle.Count, num_workers); | |
break :blk max_workers catch std.math.maxInt(Idle.Count); | |
}, | |
}; | |
} | |
pub fn schedule(self: *Scheduler, batch: Batch) void { | |
if (batch.isEmpty()) { | |
return; | |
} | |
self.run_queue.push(batch); | |
slf.notify(false); | |
} | |
fn notify(self: *Scheduler, is_waking: bool) void { | |
@setCold(true); | |
const result = Idle.notify(&self.idle, .{ | |
.is_waking = is_waking, | |
.max_spawn = self.max_workers, | |
}) orelse return; | |
if (result == .signal) { | |
self.syscall(.Resume, .{ .Signal = self }); | |
return; | |
} | |
var spawned = false; | |
self.syscall(.Spawn, .{ | |
.scheduler = self, | |
.spawned = &spawned, | |
}); | |
if (!spawned) { | |
self.endSpawn(); | |
} | |
} | |
fn wait(self: *Scheduler, is_waking: bool, worker: *Worker) ?bool { | |
@setCold(true); | |
var context = Idle.Wait.Context{ | |
.is_waking = is_waking, | |
.is_waiting = false, | |
.is_notified = false, | |
}; | |
while (true) { | |
const result = Idle.wait(&self.idle, context) orelse return null; | |
switch (result) { | |
.waking => return true, | |
.running => return false, | |
.waiting => {}, | |
}; | |
context = .{ | |
.is_waking = false, | |
.is_waiting = true, | |
.is_notified = false, | |
}; | |
self.syscall(.Suspend, .{ | |
.Wait = .{ | |
.worker = worker, | |
.scheduled = &context.is_notified, | |
}, | |
}); | |
} | |
} | |
fn beginSpawn(self: *Scheduler, worker: *Worker) ?bool { | |
@setCold(true); | |
const is_waking = Idle.beginSpawn(&self.idle) orelse return null; | |
worker.next = @atomicLoad(?*Worker, &self.spawned, .Monotonic); | |
while (true) { | |
worker.next = @cmpxchgWeak( | |
?*Worker, | |
&self.spawned, | |
worker.next, | |
worker, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
return is_waking; | |
} | |
fn endSpawn(self: *Scheduler) void { | |
@setCold(true); | |
const pending = Idle.endSpawn(&self.idle) orelse return; | |
if (pending > 0) { | |
self.syscall(.Join, .{ .Wake = self }); | |
} | |
} | |
pub fn getWorkerIter(self: *const Scheduler) Worker.Iter { | |
const workers = @atomicLoad(?*Worker, &self.spawned, .Acquire); | |
return Worker.Iter{ .workers = workers }; | |
} | |
pub fn getWorkerCount(self: *const Scheduler) Worker.Count { | |
const idle = Idle.unpack(@atomicLoad(usize, &self.idle, .Monotonic)); | |
return Worker.Count{ | |
.idle = idle.waiting, | |
.spawned = idle.spawned, | |
.max = self.max_workers, | |
}; | |
} | |
pub fn shutdown(self: *Scheduler) void { | |
@setCold(true); | |
const pending = Idle.shutdown(&self.idle) orelse return; | |
if (pending > 0) { | |
self.syscall(.Resume, .{ .Broadcast = self }); | |
} | |
} | |
pub fn deinit(self: *Scheduler) void { | |
@setCold(true); | |
self.shutdown(); | |
if (Idle.join(&self.idle)) |_| { | |
self.syscall(.Join, .{ .Wait = self }); | |
} | |
var workers = self.getWorkerIter(); | |
while (workers.next()) |worker| { | |
self.syscall(.Resume, .{ .Exit = worker }); | |
} | |
} | |
fn syscall( | |
self: Scheduler, | |
comptime cmd: std.meta.TagType(System.Command), | |
payload: @TypeOf(@field(@as(System.Command, undefined), @tagName(cmd))), | |
) callconv(.Inline) void { | |
const command = @unionInit(System.Command, @tagName(cmd), payload); | |
return self.system.call(command); | |
} | |
}; | |
pub const Worker = struct { | |
scheduler: *Scheduler, | |
next: ?*Worker = null, | |
run_queue: LocalQueue = .{}, | |
pub const Count = struct { | |
idle: usize = 0, | |
spawned: usize = 0, | |
max: usize = 0, | |
}; | |
pub const Iter = struct { | |
workers: ?*Worker = null, | |
pub fn next(self: *Iter) ?*Worker { | |
const worker = self.workers orelse return null; | |
self.workers = worker.next; | |
return worker; | |
} | |
}; | |
pub fn run(self: *Worker, scheduler: *Scheduler) void { | |
self.* = .{ .scheduler = scheduler }; | |
var is_waking = scheduler.beginSpawn(self) orelse { | |
scheduler.endSpawn(); | |
return; | |
}; | |
while (true) { | |
const stole = self.poll(scheduler) orelse { | |
is_waking = scheduler.wait(is_waking, self) orelse break; | |
continue; | |
}; | |
if (is_waking or stole.pushed) { | |
scheduler.notify(is_waking); | |
is_waking = false; | |
} | |
scheduler.syscall(.Execute, .{ | |
.worker = self, | |
.task = stole.task, | |
}); | |
} | |
scheduler.endSpawn(); | |
scheduler.syscall(.Suspend, .{ .Exit = self }); | |
} | |
pub fn getScheduler(self: Worker) *Scheduler { | |
return self.scheduler; | |
} | |
pub fn schedule(self: *Worker, batch: Batch) void { | |
if (batch.isEmpty()) { | |
return; | |
} | |
self.run_queue.push(batch); | |
self.scheduler.notify(false); | |
} | |
fn poll(self: *Worker, scheduler: *Scheduler) ?LocalQueue.Stole { | |
var attempt: usize = 0; | |
while (true) : (attempt += 1) { | |
var scheduled = false; | |
var polled: ?*Task = null; | |
var targets: ?[]const System.Command.PollTarget = null; | |
scheduler.syscall(.Poll, .{ | |
.worker = self, | |
.attempt = attempt, | |
.task = &polled, | |
.scheduled = &scheduled, | |
.targets = &targets, | |
}); | |
if (polled) |task| { | |
return LocalQueue.Stole{ | |
.task = task, | |
.pushed = scheduled, | |
}; | |
} | |
const poll_targets = targets orelse { | |
std.debug.assert(!scheduled); | |
return null; | |
}; | |
for (poll_targets) |poll_target| { | |
if (switch (poll_target) { | |
.Local => |be_fair| self.run_queue.pop(be_fair), | |
.Global => self.run_queue.steal(&scheduler.run_queue), | |
.Remote => |worker| blk: { | |
if (worker == self) | |
break :blk self.run_queue.pop(false); | |
break :blk self.run_queue.steal(&worker.run_queue); | |
}, | |
}) |stole| { | |
return stole; | |
} | |
} | |
} | |
} | |
}; | |
//////////////////////////////////////////////////////////// | |
const Idle = struct { | |
notified: bool = false, | |
state: State = .pending, | |
waiting: usize = 0, | |
spawned: usize = 0, | |
const count_bits = @divFloor(std.meta.bitCount(usize) - 3, 2); | |
const Count = std.meta.Int(.unsigned, count_bits); | |
const state_bits = 2; | |
const State = enum(std.meta.Int(.unsigned, state_bits)) { | |
pending = 0, | |
waking, | |
signaled, | |
shutdown, | |
}; | |
fn pack(self: Idle) usize { | |
var value = @as(usize, @boolToInt(self.notified)); | |
value |= @as(usize, @enumToInt(self.state)) << 1; | |
value |= @as(usize, @intCast(Count, self.waiting)) << (1 + state_bits); | |
value |= @as(usize, @intCast(Count, self.spawned)) << (1 + state_bits + count_bits); | |
return value; | |
} | |
fn unpack(value: usize) Idle { | |
return Idle{ | |
.notified = value & (1 << 0) != 0, | |
.state = @intToEnum(State, @truncate(std.meta.TagType(State), value >> 1)), | |
.waiting = value >> (1 + state_bits), | |
.spawned = value >> (1 + state_bits + count_bits), | |
}; | |
} | |
const Notify = enum { | |
signal, | |
spawn, | |
const Context = struct { | |
is_waking: bool, | |
max_spawn: Count, | |
}; | |
}; | |
fn notify(ptr: *usize, context: Notify.Context) ?Notify { | |
var idle = unpack(@atomicLoad(usize, ptr, .Monotonic)); | |
while (true) { | |
if (idle.state == .shutdown) { | |
return null; | |
} | |
const can_wake = context.is_waking or idle.state == .pending; | |
if (context.is_waking) { | |
std.debug.assert(idle.state == .waking); | |
} | |
var new_idle = idle; | |
new_idle.notified = true; | |
if (idle.waiting > 0 and can_wake) { | |
new_idle.state = .signaled; | |
} else if (idle.spawned < context.max_spawn and can_wake) { | |
new_idle.state = .signaled; | |
new_idle.spawned += 1; | |
} else if (context.is_waking) { | |
new_idle.state = .pending; | |
} else if (idle.notified) { | |
return null; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
ptr, | |
idle.pack(), | |
new_idle.pack(), | |
.Release, | |
.Monotonic, | |
)) |updated| { | |
idle = unpack(updated); | |
continue; | |
} | |
if (idle.waiting > 0 and can_wake) { | |
return Notify.signal; | |
} | |
if (idle.spawned < context.max_spawn and can_wake) { | |
return Notify.spawn; | |
} | |
return null; | |
} | |
} | |
fn beginSpawn(ptr: *usize) ?bool { | |
var idle = unpack(@atomicLoad(usize, ptr, .Monotonic)); | |
while (true) { | |
const can_wake = switch (idle.state) { | |
.pending => false, | |
.waking => return false, | |
.signaled => true, | |
.shutdown => return null, | |
}; | |
if (can_wake or idle.notified) { | |
var new_idle = idle; | |
new_idle.notified = false; | |
if (can_wake) { | |
new_idle.state = .waking; | |
} | |
if (@cmpxchgWeak( | |
usize, | |
ptr, | |
idle.pack(), | |
new_idle.pack(), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
idle = unpack(updated); | |
continue; | |
} | |
} | |
return can_wake; | |
} | |
} | |
fn endSpawn(ptr: *usize) ?usize { | |
const idle = unpack(@atomicRmw( | |
usize, | |
ptr, | |
.Sub, | |
(Idle{ .spawned = 1 }).pack(), | |
.Release, | |
)); | |
std.debug.assert(idle.spawned > 0); | |
if (idle.state == .shutdown and idle.spawned == 1) { | |
return idle.waiting; | |
} | |
} | |
const Wait = enum { | |
waking, | |
waiting, | |
running, | |
const Context = struct { | |
is_waking: bool, | |
is_waiting: bool, | |
is_notified: bool, | |
}; | |
}; | |
fn wait(ptr: *usize, context: Wait.Context) ?Wait { | |
var idle = unpack(@atomicLoad(usize, ptr, .Monotonic)); | |
while (true) { | |
if (idle.state == .shutdown) { | |
return null; | |
} | |
const can_wake = context.is_notified or idle.notified; | |
if (context.is_waking) { | |
std.debug.assert(idle.state == .waking); | |
} | |
var new_idle = idle; | |
if (context.is_waiting and can_wake) { | |
new_idle.waiting -= 1; | |
} else if (context.is_waiting) { | |
return Wait.waiting; | |
} else { | |
std.debug.assert(!context.is_notified); | |
} | |
if (can_wake) { | |
new_idle.notified = false; | |
if (context.is_waking or idle.state == .signaled) { | |
new_idle.state = .waking; | |
} | |
} else { | |
new_idle.waiting += 1; | |
if (context.is_waking) { | |
new_idle.state = .pending; | |
} | |
} | |
if (@cmpxchgWeak( | |
usize, | |
ptr, | |
idle.pack(), | |
new_idle.pack(), | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
idle = unpack(updated); | |
continue; | |
} | |
if (!can_wake) { | |
return Wait.waiting; | |
} | |
if (context.is_waking or idle.state == .signaled) { | |
return Wait.waking; | |
} | |
return Wait.running; | |
} | |
} | |
fn shutdown(ptr: *usize) ?usize { | |
var idle = unpack(@atomicLoad(usize, ptr, .Monotonic)); | |
while (true) { | |
if (idle.state == .shutdown) { | |
return null; | |
} | |
var new_idle = idle; | |
new_idle.state = .shutdown; | |
new_idle.waiting = 0; | |
if (@cmpxchgWeak( | |
usize, | |
ptr, | |
idle.pack(), | |
new_idle.pack(), | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
idle = unpack(updated); | |
continue; | |
} | |
return idle.waiting; | |
} | |
} | |
fn join(ptr: *usize) ?void { | |
var idle = unpack(@atomicLoad(usize, ptr, .Acquire)); | |
while (true) { | |
std.debug.assert(idle.state == .shutdown); | |
if (idle.spawned == 0) { | |
return null; | |
} | |
var new_idle = idle; | |
new_idle.waiting += 1; | |
if (@cmpxchgWeak( | |
usize, | |
ptr, | |
idle.pack(), | |
new_idle.pack(), | |
.Acquire, | |
.Acquire, | |
)) |updated| { | |
idle = unpack(updated); | |
continue; | |
} | |
return; | |
} | |
} | |
}; | |
const GlobalQueue = struct { | |
head: usize = 0, | |
tail: ?*Task = null, | |
stub: Task = .{ .next = null }, | |
fn isEmpty(self: *const GlobalQueue) bool { | |
const tail = @atomicLoad(?*Task, &self.tail, .Monotonic); | |
return (tail == null) or (tail == &self.stub); | |
} | |
fn push(self: *GlobalQueue, batch: Batch) void { | |
std.debug.assert(!batch.isEmpty()); | |
const tail = @atomicRmw(?*Task, &self.tail, .Xchg, batch.tail, .AcqRel); | |
const prev = tail orelse &self.stub; | |
@atomicStore(?*Task, &prev.next, batch.head, .Release); | |
} | |
const Consumer = struct { | |
queue: *GlobalQueue, | |
head: *Task, | |
const IS_CONSUMING: usize = 0b1; | |
fn tryAcquire(queue: *GlobalQueue) ?Consumer { | |
if (queue.isEmpty()) { | |
return null; | |
} else { | |
return tryAcquireSlow(queue); | |
} | |
} | |
fn tryAcquireSlow(queue: *GlobalQueue) ?Consumer { | |
@setCold(true); | |
while (true) { | |
const head = @atomicLoad(usize, &queue.head, .Monotonic); | |
if (head & IS_CONSUMING != 0) { | |
return null; | |
} | |
_ = @cmpxchgWeak( | |
usize, | |
&queue.head, | |
head, | |
head | IS_CONSUMING, | |
.Acquire, | |
.Monotonic, | |
) orelse return Consumer{ | |
.queue = queue, | |
.head = @intToPtr(?*Task, head) orelse &queue.stub, | |
}; | |
if (queue.isEmpty()) { | |
return null; | |
} | |
} | |
} | |
fn release(self: Consumer) void { | |
@atomicStore( | |
usize, | |
&self.queue.head, | |
@ptrToInt(self.head) & ~IS_CONSUMING, | |
.Release, | |
); | |
} | |
fn pop(self: *Consumer) ?*Task { | |
var head = self.head; | |
const stub = &self.queue.stub; | |
if (head == stub) { | |
head = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null; | |
self.head = head; | |
} | |
if (@atomicLoad(?*Task, &head.next, .Acquire)) |new_head| { | |
self.head = new_head; | |
return head; | |
} | |
const tail = @atomicLoad(?*Task, &queue.tail, .Monotonic); | |
if (head != tail) { | |
return null; | |
} | |
const batch = Batch.from(stub); | |
self.queue.push(batch); | |
const new_head = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null; | |
self.head = new_head; | |
return head; | |
} | |
}; | |
}; | |
const LocalQueue = struct { | |
head: Index = 0, | |
tail: Index = 0, | |
overflow: GlobalQueue = .{}, | |
buffer: [256]*Task = undefined, | |
const Index = std.meta.Int(.unsigned, std.meta.bitCount(usize) / 2); | |
comptime { | |
std.debug.assert(std.math.maxInt(Index) >= self.buffer.len); | |
} | |
fn push(self: *LocalQueue, batch: Batch) void { | |
const task = batch.head orelse unreachable; | |
if (task == batch.tail) { | |
self.overflow.push(batch); | |
return; | |
} | |
var tail = self.tail; | |
var head = @atomicLoad(Index, &self.head, .Monotonic); | |
while (true) { | |
const size = tail -% head; | |
std.debug.assert(size <= self.buffer.len); | |
if (size < self.buffer.len) { | |
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], task, .Unordered); | |
@atomicStore(Index, &self.tail, tail +% 1, .Release); | |
return; | |
} | |
var migrate: Index = self.buffer.len / 2; | |
if (@cmpxchgWeak( | |
Index, | |
&self.head, | |
head, | |
head +% migrate, | |
.Acquire, | |
.Monotonic, | |
)) |updated| { | |
head = updated; | |
continue; | |
} | |
var overflowed = Batch{}; | |
while (migrate > 0) : (migrate -= 1) { | |
const migrated = self.buffer[(head +% (migrate - 1)) % self.buffer.len]; | |
overflowed.pushFront(Batch.from(migrated)); | |
} | |
overflowed.pushBack(batch); | |
return; | |
} | |
} | |
const Scope = enum { | |
lifo, | |
fifo, | |
remote, | |
global, | |
}; | |
fn pop(self: *LocalQueue, be_fair: bool) ?Steal { | |
var scopes = [_]Scope{ .lifo, .overflow }; | |
if (be_fair) { | |
scopes = [_]Scope{ .overflow, .fifo }; | |
} | |
for (scopes) |scope| { | |
if (switch (scope) { | |
.lifo => self.steal(Scope.lifo), | |
.fifo => self.steal(Scope.fifo), | |
.global => self.steal(&self.overflow), | |
.remote => null, | |
}) |stole| { | |
return stole; | |
} | |
} | |
return null; | |
} | |
const Stole = struct { | |
task: *Task, | |
pushed: bool, | |
}; | |
fn steal(self: *LocalQueue, target: anytype) ?Stole { | |
const tail = self.tail; | |
const scope = switch (@TypeOf(target)) { | |
Scope => target, | |
*LocalQueue => .remote, | |
*GlobalQueue => .global, | |
else => |T| @compileError(@typeName(T) ++ " is not stealable"), | |
}; | |
switch (scope) { | |
.global => { | |
var consumer = GlobalQueue.Consumer.tryAcquire(target) orelse return null; | |
defer consumer.release(); | |
const free_slots = blk: { | |
const head = @atomicLoad(Index, &self.head, .Monotonic); | |
const size = tail -% head; | |
std.debug.assert(size <= self.buffer.len); | |
break :blk self.buffer.len - size; | |
}; | |
var pushed: Index = 0; | |
while (pushed < free_slots) { | |
const task = consumer.pop() orelse break; | |
@atomicStore(*Task, &self.buffer[(tail +% pushed) % self.buffer.len], task, .Unordered); | |
pushed += 1; | |
} | |
var task = consumer.pop(); | |
if (task == null and pushed > 0) { | |
pushed -= 1; | |
task = self.buffer[(tail +% pushed) % self.buffer.len]; | |
} | |
if (pushed > 0) { | |
@atomicStore(Index, &self.tail, tail +% pushed, .Release); | |
} | |
return Stole{ | |
.task = task, | |
.pushed = pushed > 0, | |
}; | |
}, | |
.remote => { | |
if (self.steal(&target.overflow)) |stole| { | |
return stole; | |
} | |
var target_head = @atomicLoad(Index, &target.head, .SeqCst); | |
while (true) { | |
const target_tail = @atomicLoad(Index, &target.tail, .SeqCst); | |
const target_size = target_tail -% target_head; | |
if (target_size == 0 or target_size > self.buffer.len) { | |
return null; | |
} | |
const task = @atomicLoad(*Task, &target.buffer[target_head % target.buffer.len], .Unordered); | |
target_head = @cmpxchgWeak( | |
Index, | |
&target.head, | |
target_head, | |
target_head +% 1, | |
.SeqCst, | |
.SeqCst, | |
) orelse return task; | |
} | |
}, | |
.fifo => { | |
var head = @atomicLoad(Index, &self.head, .Monotonic); | |
while (true) { | |
if (head == tail) { | |
return null; | |
} | |
const task = self.buffer[head % self.buffer.len]; | |
head = @cmpxchgStrong( | |
Index, | |
&self.head, | |
head, | |
head +% 1, | |
.Acquire, | |
.Monotonic, | |
) orelse return head; | |
} | |
}, | |
.lifo => { | |
var head = @atomicLoad(Index, &self.head, .Monotonic); | |
std.debug.assert(tail -% head <= self.buffer.len); | |
if (head == tail) { | |
return null; | |
} | |
const new_tail = tail -% 1; | |
@atomicStore(Index, &self.tail, new_tail, .SeqCst); | |
head = @atomicLoad(Index, &self.head, .SeqCst); | |
std.debug.assert(tail -% head <= self.buffer.len); | |
const popped = blk: { | |
if (head == tail) break :blk false; | |
if (head != new_tail) break :blk true; | |
break :blk @cmpxchgStrong( | |
Index, | |
&self.head, | |
head, | |
tail, | |
.SeqCst, | |
.Monotonic, | |
) == null; | |
}; | |
if (popped) { | |
return Stole{ | |
.task = self.buffer[new_tail % self.buffer.len], | |
.pushed = false, | |
}; | |
} | |
@atomicStore(Index, &self.tail, tail, .Monotonic); | |
return null; | |
}, | |
} | |
} | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
/// Internal state used by the scheduler | |
next: ?*Task = undefined, | |
/// Callback function invoked when the task is executed | |
executeFn: fn(*Task) void, | |
pub fn execute(self: *Task) void { | |
return (self.executeFn)(self); | |
} | |
/// An ordered set of Tasks used to schedule them in batch. | |
pub const Group = struct { | |
len: usize = 0, | |
head: *Task = undefined, | |
tail: *Task = undefined, | |
/// Create a Group from a Task which can then be pushed to other groups. | |
pub fn from(task: *Task) Group { | |
task.next = null, | |
return Group{ | |
.len = 1, | |
.head = task, | |
.tail = task, | |
}; | |
} | |
/// Push a group into this group using FIFO ordering. | |
pub const push = pushBack; | |
/// Pop a task from this group using FIFO ordering. | |
pub const pop = popFront; | |
/// Push a group into this group using FIFO ordering. | |
pub fn pushBack(self: *Group, group: Group) callconv(.Inline) void { | |
return self.pushMany(group, .fifo); | |
} | |
/// Push a group into this group using LIFO ordering. | |
pub fn pushFront(self: *Group, group: Group) callconv(.Inline) void { | |
return self.pushMany(group, .lifo); | |
} | |
fn pushMany(self: *Group, group: Group, comptime order: enum{fifo, lifo}) void { | |
if (group.len == 0) { | |
return; | |
} | |
if (self.len == 0) { | |
self.* = group; | |
return; | |
} | |
self.len += group.len; | |
switch (order) { | |
.fifo => { | |
self.tail.next = group.head; | |
self.tail = group.tail; | |
}, | |
.lifo => { | |
group.tail.next = self.head; | |
self.head = group.head; | |
}, | |
} | |
} | |
/// Pop a task from this group using FIFO ordering. | |
pub fn popFront(self: *Group) ?*Task { | |
if (self.len == 0) { | |
return null; | |
} | |
const task = self.head; | |
self.head = task.next orelse undefined; | |
self.len -= 1; | |
return task; | |
} | |
}; | |
pub const Worker = struct { | |
state: usize, | |
next: ?*Worker = null, | |
run_queue: struct { | |
direct: LifoQueue = .{}, | |
overflow: FifoQueue = .{}, | |
buffer: BufferQueue = .{}, | |
}, | |
const State = struct { | |
mode: Mode, | |
scheduled: bool, | |
lifo_queue: bool, | |
scheduler: *Scheduler, | |
const Mode = enum(u2) { | |
waking = 0, | |
polling, | |
running, | |
shutdown, | |
}; | |
comptime { | |
std.debug.assert(@alignOf(Scheduler) >= 0b10000); | |
} | |
fn pack(self: State) usize { | |
return @ptrToInt(self.scheduler) | |
| (@as(usize, @boolToInt(self.scheduled)) << 0) | |
| (@as(usize, @boolToInt(self.lifo_queue)) << 1) | |
| (@as(usize, @enumToInt(self.mode)) << 2); | |
} | |
fn unpack(value: usize) State { | |
return .{ | |
.scheduler = @intToPtr(*Scheduler, value & ~@as(usize, 0b1111)), | |
.scheduled = value & (1 << 0) != 0, | |
.lifo_queue = value & (1 << 1) != 0, | |
.mode = @intToEnum(Mode, @truncate(u2, value >> 2)), | |
}; | |
} | |
}; | |
// The type of queue/queue-operation for a Worker's local queue. | |
pub const LocalQueueOrder = enum{ | |
/// Dequeue and steal from the opposite side that Tasks are scheduled. | |
Fifo, | |
/// Dequeue and try to steal from the same side that Tasks are scheduled. | |
Lifo, | |
}; | |
/// Run a Worker under the given Scheduler that it was spawned from. | |
/// The LocalQueueOrder influences what type of local queue is used for the Worker. | |
pub fn run(self: *Worker, scheduler: *Scheduler, queue_order: LocalQueueOrder) void { | |
self.* = Worker{ | |
.state = (State{ | |
.mode = .waking, | |
.scheduled = false, | |
.lifo_queue = queue_order == .Lifo, | |
.scheduler = scheduler, | |
}).pack(), | |
}; | |
// Register the worker into the scheduler as a spawned | |
self.next = @atomicLoad(?*Worker, &scheduler.spawned, .Monotonic); | |
while (true) { | |
self.next = @cmpxchgWeak( | |
?*Worker, | |
&scheduler.spawned, | |
self.next, | |
self, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
// Worker event loop | |
while (running: { | |
const state = State.unpack(@ptrCast(*volatile usize, &self.state).*); | |
break :running switch (state.mode) { | |
.running, .waking => true, | |
.polling => unreachable, | |
.shutdown => false, | |
}; | |
}) { | |
const task = self.poll(scheduler) orelse { | |
scheduler.wait(self); | |
continue; | |
}; | |
scheduler.syscall(.Execute, .{ | |
.worker = self, | |
.task = task, | |
}); | |
} | |
// Wait for the worker to be joined. | |
// All workers are joined when the scheduler is shut down. | |
// This is needed due to the Workers being intrusive and possibly referencing other shutdown workers. | |
scheduler.syscall(.JoinWait, .{ .worker = self }); | |
self.* = undefined; | |
} | |
/// Schedule a Group of Tasks to run on some Worker in the Scheduler. | |
/// Can only be called by the thread which owns this Worker. | |
pub fn schedule(self: *Worker, group: Group) void { | |
if (group.len == 0) return; | |
if (self.run_queue.buffer.push(group)) |overflowed| { | |
self.run_queue.overflow.push(overflowed); | |
} | |
var state = State.unpack(self.state); | |
std.debug.assert(state.mode != .shutdown); | |
if (state.mode != .polling) { | |
return state.scheduler.notify(false); | |
} | |
if (!state.scheduled) { | |
state.scheduled = true; | |
self.state = state.pack(); | |
} | |
} | |
/// Schedule a Group of Tasks to run only on this Worker. | |
/// Can be called by any thread, not only the Worker's thread. | |
pub fn scheduleDirectly(self: *Worker, group: Group) void { | |
if (group.len == 0) return; | |
self.run_queue.direct.push(group); | |
const scheduler = self.getScheduler(); | |
scheduler.syscall(.Wake, .{ | |
.scheduler = scheduler, | |
.worker = .{ .direct = self }, | |
}); | |
} | |
/// Get a reference to the Scheduler this thread was spawned by. | |
pub fn getScheduler(self: Worker) callconv(.Inline) *Scheduler { | |
return State.unpack(self.state).scheduler; | |
} | |
/// Returns true if the Worker has no local Tasks it could be processing. | |
pub fn hasLocalTasks(self: *const Worker) bool { | |
return !self.run_queue.local.isEmpty() | |
and !self.run_queue.buffer.isEmpty() | |
and !self.run_queue.overflow.isEmpty(); | |
} | |
fn poll(self: *Worker, scheduler: *Scheduler) ?*Task { | |
var state = State.unpack(self.state); | |
std.debug.assert(state.mode != .shutdown); | |
std.debug.assert(state.mode != .polling); | |
std.debug.assert(!state.scheduled); | |
// Put the Worker in a `polling` state. | |
// This causes any subsequent Worker.schedule()s to set `.scheduled` instead of doing a notification. | |
const old_mode = state.mode; | |
state.mode = .polling; | |
self.state = state.pack(); | |
// Poll for a task, recording whether it pushed/scheduled some more tasks. | |
const polled_task = self.pollForTask(scheduler, &state); | |
const poll_scheduled = state.scheduled; | |
// Re-check the state to see if any syscalls made in `pollForTask()` did any Worker.schedule()s. | |
// Volatile read to ensure that the state is actually re-read to just be on the safe side. | |
state = State.unpack(@ptrCast(*volatile usize, &self.state).*); | |
std.debug.assert(state.mode == .polling); | |
// If we're the "waking" Worker then we need to transfer "waking" status & resume another worker. | |
// If we pushed tasks or did anything akin to a Worker.schedule(), we need to make other workers aware of that. | |
const is_waking = old_mode == .waking; | |
if (is_waking or state.scheduled or poll_scheduled) { | |
scheduler.notify(is_waking); | |
} | |
// Reset everything back and return the polled task. | |
state.mode = old_mode; | |
state.scheduled = false; | |
self.state = state.pack(); | |
return polled_task; | |
} | |
pub const PollType = union(enum) { | |
/// Poll for Tasks on the Worker's local queue. | |
/// Any Tasks schedule()'d or stolen from other queues are in the Worker's local queue. | |
/// The provided queue order is used as a hint for how the Worker should dequeue from its local queue. | |
Local: LocalQueueOrder, | |
/// Poll for Tasks on the Scheduler's queue. | |
/// Any extra Tasks stolen from the Scheduler's queue are moved to the Worker's local queue. | |
Global, | |
/// Poll for Tasks on the Worker's direct queue. | |
/// Any tasks scheduleDirectly()'d to the Worker are in it's direct queue. | |
/// A Worker's direct queue cannot be stolen from by other Workers. | |
Direct, | |
/// Poll for Tasks by stealing from other Worker local queues. | |
/// Any extra Tasks stolen from other Worker local queues are moved to this Worker's local queue. | |
Steal, | |
}; | |
fn pollForTask(self: *Worker, scheduler: *Scheduler, state: *State) ?*Task { | |
var poll_attempt: usize = 0; | |
var poll_task: ?*Task = null; | |
var poll_order: ?[]const PollType = null; | |
while (true) : (poll_attempt +%= 1) { | |
scheduler.syscall(.Poll, .{ | |
.worker = self, | |
.attempt = poll_attempt, | |
.order = &poll_order, | |
.task = &poll_task, | |
}); | |
if (poll_task) |task| { | |
return task; | |
} | |
const poll_types = poll_order orelse return null; | |
polling: for (poll_types) |poll_type| { | |
const result = switch (poll_type) { | |
.Local => |order| blk: { | |
const pop_context = BufferQueue.Pop.Context{ | |
.be_fair = order == .Fifo, | |
.is_lifo = state.lifo_queue, | |
}; | |
break :blk self.run_queue.buffer.pop(pop_context) | |
orelse self.run_queue.buffer.steal(&self.run_queue.overflow, pop_context) | |
orelse continue; | |
}, | |
.Global => blk: { | |
break :blk self.run_queue.buffer.steal( | |
&scheduler.run_queue, | |
.{ .is_lifo = state.lifo_queue }, | |
) orelse continue; | |
}, | |
.Direct => blk: { | |
const task = self.run_queue.direct.pop() orelse continue; | |
break :blk BufferQueue.Pop{ .task = task }; | |
}, | |
.Steal => blk: { | |
var steal_attempt: usize = 0; | |
var steal_target: ?*Worker = null; | |
while (true) : (steal_attempt += 1) { | |
scheduler.syscall(.Steal, .{ | |
.worker = self, | |
.attempt = steal_attempt, | |
.target = &steal_target, | |
.task = &poll_task, | |
}); | |
if (poll_task) |task| { | |
return BufferQueue.Pop{ .task = task }; | |
} | |
const target_worker = steal_target orelse continue :polling; | |
if (target_worker == self) { | |
break :blk self.run_queue.buffer.pop(.{ .is_lifo = state.lifo_queue }) orelse continue; | |
} | |
const target_is_lifo = State.unpack(target_worker.state).lifo_queue; | |
const target_context = BufferQueue.Pop.Context{ .is_lifo = target_is_lifo }; | |
break :blk self.run_queue.buffer.steal(&target.run_queue.overflow, target_context) | |
orelse self.run_queue.buffer.steal(&target.run_queue.buffer, target_context) | |
orelse continue; | |
} | |
}, | |
}; | |
state.scheduled = result.pushed; | |
return result.tastk; | |
} | |
} | |
} | |
}; | |
pub const Scheduler = struct { | |
config: Config align(std.math.max(16, @alignOf(Config))), | |
idle_queue: usize = 0, | |
spawned: ?*Worker = null, | |
run_queue: FifoQueue = .{}, | |
pub const Config = struct { | |
/// System which is called by the Scheduler/Worker when necessary for the implementation. | |
system: *System, | |
/// Maximum amount of workers that can be spawned on this Scheduler. | |
max_workers: usize, | |
}; | |
/// Create a Scheduler using the provided Config. | |
/// This can be called statically and at comptime. | |
pub fn init(config: Config) Scheduler { | |
return .{ .config = config }; | |
} | |
/// Schedules a Group of Tasks to eventually be executed on a Worker in the Scheduler. | |
/// Can be called by any thread. | |
pub fn schedule(self: *Scheduler, group: Group) void { | |
if (group.len == 0) return; | |
self.run_queue.push(group); | |
self.notify(false); | |
} | |
fn notify(self: *Scheduler, is_waking: bool) void { | |
@setCold(true); | |
const result = IdleQueue.notify(&self.idle_queue, .{ | |
.is_waking = is_waking, | |
.max_spawn = self.config.max_workers, | |
}); | |
switch (result) { | |
.updated, .ignored => return, | |
.resumed => { | |
self.syscall(.Wake, .{ | |
.scheduler = self, | |
.workers = .{ .waiters = 1 }, | |
}); | |
}, | |
.spawned => { | |
var spawned = false; | |
self.syscall(.Spawn, .{ | |
.scheduler = self, | |
.spawned = &spawned, | |
}); | |
if (!spawned) { | |
self.markShutdown(); | |
} | |
}, | |
}; | |
} | |
fn wait(self: *Scheduler, worker: *Worker) void { | |
@setCold(true); | |
var wait_state: ?bool = null; | |
var worker_state = Worker.State.unpack(worker.state); | |
while (true) { | |
const result = IdleQueue.wait(&self.idle_queue, blk: { | |
if (worker_state.mode == .waking) break :blk .waking; | |
const resumed = wait_state orelse break :blk .running; | |
if (resumed) break :blk .waiting; | |
break :blk .notified; | |
}); | |
notified: { | |
worker_state.mode = switch (result) { | |
.running => .running, | |
.waking => .waking, | |
.waiting => break :notified, | |
.shutdown => .shutdown, | |
} | |
} | |
if (worker_state.mode == .waking) { | |
worker_state.mode = .running; | |
worker.state = worker_state.pack(); | |
} | |
var resumed = true; | |
defer wait_state = resumed; | |
self.syscall(.Wait, .{ | |
.scheduler = self, | |
.worker = worker, | |
.resumed = &resumed, | |
}); | |
} | |
} | |
/// Mark the Scheduler as shutdown, waking up any idle Workers in the process. | |
/// Workers will eventually see that the Scheduler is shut down and enter a Join state. | |
pub fn shutdown(self: *Scheduler) void { | |
@setCold(true); | |
if (IdleQueue.beginShutdown(&self.idle_queue)) |idle_waiters| { | |
self.syscall(.Wake, .{ | |
.scheduler = self, | |
.worker = .{ .waiters = idle_waiters }, | |
}); | |
} | |
} | |
fn markShutdown(self: *Scheduler) void { | |
@setCold(true); | |
if (IdleQueue.notifyOneShutdown(&self.idle_queue)) |shutdown_waiters| { | |
self.syscall(.ShutdownWake, .{ | |
.scheduler = self, | |
.waiters = shutdown_waiters, | |
}); | |
} | |
} | |
/// Waits for the Scheduler to be shut down and for all Workers to enter a Join state. | |
/// Once all workers are in the Join state, it joins them effectively releasing them from the Scheduler. | |
/// | |
/// After a call to join(), the Scheduler's is considered `undefined` and further use is illegabl behavior. | |
/// Can only be called by one thread and only once. | |
pub fn join(self: *Scheduler) void { | |
@setCold(true); | |
if (IdleQueue.waitForShutdown(&self.idle_queue)) { | |
self.syscall(.ShutdownWait, .{ .scheduler = self }); | |
} | |
var workers = Worker.Iter{ .workers = spawned_workers }; | |
while (workers.next()) |worker| { | |
self.syscall(.JoinNotify, .{ .worker = worker }); | |
} | |
} | |
fn syscall(self: Scheduler, comptime opcode: anytype, payload: anytype) callconv(.Inline) void { | |
const command = @unionInit(System.Command, std.meta.tagName(opcode), payload); | |
return self.config.system.call(command); | |
} | |
}; | |
pub const System = struct { | |
callFn: fn(*System, Command) void, | |
pub fn call(self: *System, command: Command) callconv(.Inline) void { | |
return (self)(self, command); | |
} | |
pub const Command = union(enum) { | |
Spawn: struct { | |
scheduler: *Scheduler, | |
spawned: *bool, | |
}, | |
Wait: struct { | |
scheduler: *Scheduler, | |
worker: *Worker, | |
resumed: *bool, | |
}, | |
Wake: struct { | |
scheduler: *Scheduler, | |
worker: union(enum){ | |
direct: *Worker, | |
waiters: usize, | |
}, | |
}, | |
Poll: struct { | |
worker: *Worker, | |
attempt: usize, | |
order: *?[]const Worker.PollType, | |
task: *?*Task, | |
}, | |
Steal: struct { | |
worker: *Worker, | |
attempt: usize, | |
target: *?*Worker, | |
task: *?*Task, | |
}, | |
Execute: struct { | |
worker: *Worker, | |
task: *Task, | |
}, | |
JoinWait: struct { | |
worker: *Worker, | |
}, | |
JoinNotify: struct { | |
worker: *Worker, | |
}, | |
ShutdownWait: struct { | |
scheduler: *Scheduler, | |
}, | |
ShutdownNotify: struct { | |
scheduler: *Scheduler, | |
waiters: usize, | |
}, | |
}; | |
}; | |
/// Lock-free state manager system which coordinates the sleeping and waking of worker threads. | |
/// The algorithm was developed as a way to amortize the cost of thread wake ups and synchronize shutdown. | |
/// | |
/// The idea is to have the concept of a "waking" thread which is either resumed from idle state or freshly spawned. | |
/// There can be only one "waking" thread at any given moment and notifications in the mean time just leave a token. | |
/// If the "waking" thread finds work, then it transfers its "waking" status to another thread via resuming/spawning one. | |
/// This has the effect of a smooth ramp up in threads looking for work to decrease contention. | |
/// It also amortizes the resume/spawn cost of multiple notifications in succession which is common when one thread is producing work. | |
/// | |
/// When any thread fails to find work, it transitions to the idle state. | |
/// If the thread sees a notification token left behind, it cosumes it and tries to search for work again. | |
/// If the "waking" thread consumes the notification token, it remains "waking". If not, it relenquishes its "waking" status to be taken. | |
/// The notification token ensures that notifications after a "waking" thread was chosen do not go unseen. | |
/// The ability for any thread (not only the "waking" thread) to grab the token improves latency of a task being processed sooner. | |
/// | |
/// Finally, the state also acts as a way to synchronize shutting down all of the worker threads. | |
/// On shutdown, the state is unconditionally transitioned to `.shutdown` and all idle threads are woken up. | |
/// Any subsequent notifications are ignored and ongoing idle transitions are cancelled and direct the thread to shut down. | |
/// When a thread shuts down, it decreased the `spawned` count by one. When `spawned` reaches 0, all worker threads have shut down. | |
/// If there are outside threads waiting for all spawned threads to shutdown, they can increment `idle` while `spawned` isnt 0. | |
/// The last worker thread to decrement `spawned` to 0 should wake up any outside threads waiting for shut down sync by checking `idle` count. | |
const IdleQueue = struct { | |
idle: usize = 0, | |
spawned: usize = 0, | |
notified: bool = false, | |
state: State = .pending, | |
const count_bits = (std.meta.bitCount(usize) - 4) / 2; | |
const Count = std.meta.Int(.unsigned, count_bits); | |
const State = enum(u2) { | |
pending = 0, | |
waking, | |
notified, | |
shutdown, | |
}; | |
fn pack(self: IdleQueue) usize { | |
return ((@as(usize, @boolToInt(self.notified)) << 0) | | |
(@as(usize, @enumToInt(self.state)) << 1) | | |
(@as(usize, @intCast(Count, self.idle)) << 4) | | |
(@as(usize, @intCast(Count, self.spawned)) << (4 + count_bits))); | |
} | |
fn unpack(value: usize) IdleQueue { | |
return .{ | |
.notified = value & (1 << 0) != 0, | |
.state = @intToEnum(State, @truncate(u2, value >> 1)), | |
.idle = @truncate(Count, value >> 4), | |
.spawned = @truncate(Count, value >> (4 + count_bits)), | |
}; | |
} | |
pub const Notify = enum { | |
/// The IdleQueue state was just updated. Nothing left to do | |
updated, | |
/// A "waking" notification was performed indicating that an idle thread should be resumed | |
resumed, | |
/// A "waking" notification was performed indicating that a new thread should be spawned | |
spawned, | |
/// The notification request was ignored. Either due to a previous shutdown or notif token already being set. | |
ignored, | |
pub const Context = struct { | |
/// True if the caller is the "waking" thread who found work and is trying to transfer "waking" status. | |
is_waking: bool, | |
/// Max amount of threads being spawned at once if a spawn may occur. | |
max_spawn: usize, | |
}; | |
}; | |
pub fn notify(ptr: *usize, context: Notify.Context) Notify { | |
var ptr_value = @atomicLoad(usize, ptr, .Monotonic); | |
while (true) { | |
const self = IdleQueue.unpack(ptr_value); | |
if (self.state == .shutdown) { | |
return .ignored; | |
} | |
const can_wake = context.is_waking or self.state == .pending; | |
if (context.is_waking) { | |
std.debug.assert(self.state == .waking); | |
} | |
var result = Notify.updated; | |
var new_self = self; | |
new_self.notified = true; | |
if (self.idle > 0 and can_wake) { | |
new_self.state = .notified; | |
result = .resumed; | |
} else if (self.spawned < context.max_spawn and can_wake) { | |
new_self.spawned += 1; | |
result = .spawned; | |
} else if (context.is_waking) { | |
new_self.state = .pending; // if "waking" thread can't wake, relenquish "waking" status. | |
} else if (self.notified) { | |
return .ignored; // if notification token already set, nothing to do. | |
} | |
// Release barrier synchronises with Acquire in wait(), | |
// so the possibly resumed waiter can see any tasks made available before this notify(). | |
ptr_value = IdleQueue.unpack@cmpxchgWeak( | |
usize, | |
ptr, | |
self.pack(), | |
new_self.pack(), | |
.Release, | |
.Monotonic, | |
) orelse return result; | |
} | |
} | |
pub const Wait = enum { | |
/// The caller was resumed/notified without being a "waking" thread. | |
running, | |
/// The caller was resumed/notified as the "waking" thread. | |
waking, | |
/// The caller failed to consume a "waking" or notification token and is now idle | |
waiting, | |
/// The idle/wait state transition was cancelled as a shutdown() was previously called. | |
shutdown, | |
pub const Context = enum { | |
/// The caller is a normal thread which didn't find any work. | |
running, | |
/// The caller is the "waking" thread which didn't find any work. | |
waking, | |
/// The caller was resumed having been in the idle/wait state. | |
waiting, | |
/// The caller's idle/wait state was interrupted as it thinks there are tasks available. | |
notified, | |
}; | |
}; | |
pub fn wait(ptr: *usize, context: Wait.Context) Wait { | |
var ptr_value = @atomicLoad(usize, ptr, .Monotonic); | |
while (true) { | |
const self = IdleQueue.unpack(ptr_value); | |
if (self.state == .shutdown) { | |
return .shutdown; | |
} | |
const is_waiting = switch (context) { | |
.running => false, | |
.waking => blk: { | |
std.debug.assert(self.state == .waking); | |
break :blk false; | |
}, | |
.waiting, .notified => blk: { | |
std.debug.assert(self.idle > 0); | |
break :blk true; | |
}, | |
}; | |
var new_self = self; | |
var result: Wait = undefined; | |
if (self.notified or context == .notified) { | |
new_self.notified = false; | |
if (is_waiting) { | |
new_self.idle -= 1; | |
} | |
result = .running; | |
if (context.is_waking or self.state == .notified) { | |
new_self.state = .waking; | |
result = .waking; | |
} | |
} else if (!is_waiting) { | |
result = .waiting; | |
new_self.idle += 1; | |
if (context.is_waking) { | |
new_self.state = .pending; | |
} | |
} else { | |
return .waiting; | |
} | |
// Acquire barrier which synchronizes with the Release in notify() | |
// In order to see any tasks made avaialable by the matching notify(). | |
ptr_value = IdleQueue.unpack@cmpxchgWeak( | |
usize, | |
ptr, | |
self.pack(), | |
new_self.pack(), | |
.Acquire, | |
.Monotonic, | |
) orelse return result; | |
} | |
} | |
pub fn beginShutdown(ptr: *usize) ?usize { | |
var ptr_value = @atomicLoad(usize, ptr, .Monotonic); | |
while (true) { | |
const self = IdleQueue.unpack(ptr_value); | |
if (self.state == .shutdown) { | |
return null; | |
} | |
var new_self = self; | |
new_self.idle = 0; | |
new_self.state = .shutdown; | |
ptr_value = IdleQueue.unpack@cmpxchgWeak( | |
usize, | |
ptr, | |
self.pack(), | |
new_self.pack(), | |
.AcqRel, | |
.Monotonic, | |
) orelse { | |
if (self.idle == 0) return null; | |
return self.idle; | |
}; | |
} | |
} | |
pub fn waitForShutdown(ptr: *usize) bool { | |
var ptr_value = @atomicLoad(usize, ptr, .Acquire); | |
while (true) { | |
const self = IdleQueue.unpack(ptr_value); | |
std.debug.assert(self.state == .shutdown); | |
var new_self = self; | |
new_self.idle += 1; | |
if (self.spawned == 0) { | |
return false; | |
} | |
ptr_value = IdleQueue.unpack@cmpxchgWeak( | |
usize, | |
ptr, | |
self.pack(), | |
new_self.pack(), | |
.Acquire, | |
.Acquire, | |
) orelse return true; | |
} | |
} | |
pub fn notifyOneShutdown(ptr: *usize) ?usize { | |
const new_self = blk: { | |
const sub_self = IdleQueue{ .spawned = 1 }; | |
const ptr_value = @atomicRmw(usize, ptr, .Sub, sub_self.pack(), .Release); | |
break :blk IdleQueue.unpack(ptr_value - sub_self.pack()); | |
}; | |
if (new_self.spawned == 0) { | |
return new_self.idle; | |
} else { | |
return null; | |
} | |
} | |
}; | |
/// Unbounded Multi-Producer Single-Consumer LIFO queue for Tasks. | |
/// | |
/// This acts as a generic sink for tasks produced by other threads. | |
/// It also has the property that it amortizes synchronization for the consumer. | |
const LifoQueue = struct { | |
stack: ?*Task = null, | |
local: ?*Task = null, | |
pub fn isEmpty(self: *const LifoQueue) callconv(.Inline) bool { | |
if (self.local != null) return false; | |
return @atomicLoad(?*Task, &self.stack, .Monotonic) == null; | |
} | |
pub fn push(self: *LifoQueue, group: Group) void { | |
std.debug.assert(group.len > 0); | |
const head = group.head; | |
const tail = group.tail; | |
tail.next = @atomicLoad(?*Task, &self.stack, .Monotonic); | |
while (true) { | |
tail.next = @cmpxchgWeak( | |
?*Task, | |
&self.stack, | |
tail.next, | |
head, | |
.Release, | |
.Monotonic, | |
) orelse break; | |
} | |
} | |
pub fn pop(self: *LifoQueue) ?*Task { | |
if (self.local) |task| { | |
self.local = task.next; | |
return task; | |
} | |
var stack = @atomicLoad(?*Task, &self.stack, .Monotonic); | |
if (stack != null) { | |
stack = @atomicRmw(?*Task, &self.stack, .Xchg, null, .Acquire); | |
std.debug.assert(stack != null); | |
} | |
const task = stack orelse return null; | |
self.locak = task.next; | |
return task; | |
} | |
}; | |
/// Unbounded Multi-Producer Multi-Consumer FIFO queue for Tasks. | |
/// Based on Dmitry Vyukov's intrusive mpsc: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue | |
/// | |
/// The consumer-side is made multi by employing non-blocking mutual exclusion using an atomic flag. | |
/// It's better this way as, instead of blocking on the queue, threads can check other thread queues in the mean time. | |
/// | |
/// The point in the algorithm in which pop() may falsely return null due to a preempted push() thread is also taken into account. | |
/// A disconnected pop() is just treated as empty since worst case is that the queue would be empty and the popping thread will fall asleep via wait(). | |
/// This is fine as a push() is always followed by a notify() which ensures a thread waiting on the queue will eventually start consuming or see it empty. | |
/// This effectively delegates the consumer side blocking of the queue to the external wait()/notify() mechanisms which can do more efficient "blocking". | |
const FifoQueue = struct { | |
head: usize = 0, | |
tail: ?*Task = null, | |
stub: Task = Task{ | |
.next = null, | |
.executeFn = undefined, | |
}, | |
const HAS_CONSUMER: usize = 1 << 0; | |
/// Returns true if the queue is currently empty. | |
/// `head` and `tail` default to null for static initialization. | |
/// The algorithms account for this by treating null == &self.stub | |
pub fn isEmpty(self: *const FifoQueue) callconv(.Inline) bool { | |
const tail = @atomicLoad(?*Task, &self.tail, .Monotonic); | |
return (tail == null) or (tail == &self.stub); | |
} | |
pub fn push(self: *FifoQueue, group: Group) void { | |
std.debug.assert(group.len > 0); | |
const tail = @atomicRmw(?*Task, &self.tail, .Xchg, group.tail, .AcqRel); | |
const prev = tail orelse self.stub; | |
@atomicStore(?*Task, &prev.next, group.head, .Release); | |
} | |
pub fn tryAcquireConsumer(self: *FifoQueue) callconv(.Inline) ?Consumer { | |
// Fast path check of tail here improves perf by decreasing i-cache hit | |
// as most queues will be empty when worker threads are polling them. | |
if (self.isEmpty()) { | |
return null; | |
} | |
return self.tryAcquireConsumerSlow(); | |
} | |
fn tryAcquireConsumerSlow(self: *FifoQueue) ?Consumer { | |
@setCold(true); | |
while (true) { | |
const head = @atomicLoad(usize, &self.head, .Monotonic); | |
if (head & HAS_CONSUMER != 0) { | |
return null; | |
} | |
_ = @cmpxchgWeak( | |
usize, | |
&self.head, | |
head, | |
head | HAS_CONSUMER, | |
.Acquire, | |
.Monotonic, | |
) orelse return Consumer{ | |
.queue = self, | |
.head = @ptrToInt(?*Task, head) orelse &self.stub, | |
}; | |
// re-check the head under contention in case the queue became empty | |
if (self.isEmpty()) { | |
return null; | |
} | |
} | |
} | |
pub const Consumer = struct { | |
queue: *FifoQueue, | |
head: *Task, | |
pub fn release(self: Consumer) void { | |
// Update the head with the pop()-updated head | |
// while at the same time unsetting the HAS_CONSUMER bit. | |
const new_head = @ptrToInt(self.head); | |
@atomicStore(usize, &self.queue.head, new_head, .Release); | |
} | |
pub fn pop(self: *Consumer) ?*Task { | |
var head = self.head; | |
const stub = &self.queue.stub; | |
if (head == stub) { | |
head = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null; | |
self.head = head; | |
} | |
if (@atomicLoad(?*Task, &head.next, .Acquire)) |next| { | |
self.head = next; | |
return head; | |
} | |
const tail = @atomicLoad(?*Task, &self.tail, .Acquire); | |
if (tail != head) { | |
return null; | |
} | |
self.queue.push(Group.from(stub)); | |
const next = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null; | |
self.head = next; | |
return head; | |
} | |
}; | |
}; | |
/// Bounded Single-Producer, Multi-Consumer queue for Tasks. | |
/// Based on the Go scheduler's runq: https://tokio.rs/blog/2019-10-scheduler#a-better-run-queue | |
/// | |
/// This acts as the primary queue for workers when pushing/popping Tasks. | |
/// The capacity was chosen to be a nice tradeoff for throughput vs memory usage under heavy task load. | |
/// | |
/// push() is cheap in that it only does a store(Release) at most which is free on x86 but slightly costly on other platforms. | |
/// Given the queue is bounded, it returns the Group of tasks which overflow if it is unable to fill it's buffer. | |
/// steal() from other bounded queues grab half of the tasks at once which amortizes the contention with the queue owner's pop(). | |
/// | |
/// It was opted not to migrate half of the buffer on push() overflow like the original algorithm | |
/// as it was measured that handling overflow via an FifoQueue was cheaper than having to link the migrated tasks together due to: | |
/// - assumed cache effects of re-accessing the migrated tasks. This is still a major slowdown for steal(*FifoQueue). | |
/// - push()ing to worker-local wait-free FifoQueue being cheaper than pushing to shared Mutex protected injector queue. | |
const BufferQueue = extern struct { | |
head: Index = 0, | |
tail: Index = 0, | |
buffer: [capacity]*Task = undefined, | |
const Index = std.meta.Int(.unsigned, std.meta.bitCount(usize) / 2); | |
const capacity = 256; | |
comptime { | |
std.debug.assert(std.math.maxInt(Index) >= capacity); | |
} | |
pub fn isEmpty(self: *const BufferQueue) bool { | |
var head: Index = undefined; | |
var tail: Index = undefined; | |
// For platforms known to support it, try to load both the head & tail in one atomic load instead of two. | |
switch (std.builtin.cpu.arch) { | |
.i386, .x86_64, .aarch64 => { | |
comptime std.debug.assert(@byteOffsetOf(BufferQueue, "head") == 0); | |
comptime std.debug.assert(@byteOffsetOf(BufferQueue, "tail") == @sizeOf(Index)); | |
const head_tail = @bitCast([2]Index, @atomicLoad(usize, @ptrCast(*const usize, self), .Monotonic)); | |
head = head_tail[0]; | |
tail = head_tail[1]; | |
}, | |
else => { | |
head = @atomicLoad(Index, &self.head, .Acquire); | |
tail = @atomicLoad(Index, &self.tail, .Monotonic); | |
}, | |
} | |
return (tail == head) or (tail == (head -% 1)); | |
} | |
pub fn push(self: *BufferQueue, group: Group) ?Group { | |
// Preemptively check if group is too much without actually looking at current size | |
std.debug.assert(group.len > 0); | |
if (group.len > capacity) { | |
return group; | |
} | |
var tail = self.tail; | |
const head = @atomicLoad(Index, &self.head, .Monotonic)); | |
const size = tail -% head; | |
std.debug.assert(size <= capacity); | |
// Overflow if we cant fit the whole group in one go. | |
// This avoids the cache effects of accessing the tasks if some will be overflowed anyway. | |
const free_slots = capacity - size; | |
if (group.len > free_slots) { | |
return group; | |
} | |
var tasks = group; | |
while (tasks.pop()) |task| { | |
@atomicStore(*Task, &self.buffer[tail % capacity], task, .Unordered); | |
tail +%= 1; | |
} | |
@atomicStore(Index, &self.tail, tail, .Release); | |
return null; | |
} | |
pub const Pop = struct { | |
task: *Task, | |
pushed: bool = false, | |
pub const Context = struct { | |
be_fair: bool = false, | |
is_lifo: bool = false, | |
}; | |
}; | |
pub fn pop(self: *BufferQueue, context: Pop.Context) ?Pop { | |
const tail = self.tail; | |
var head = @atomicLoad(Index, &self.head, .Monotonic); | |
if (head == tail) { | |
return null; | |
} | |
const be_fair = context.be_fair or !context.is_lifo; | |
while (be_fair) { | |
if (head == tail) { | |
return null; | |
} | |
head = @cmpxchgWeak( | |
Index, | |
&self.head, | |
head, | |
head +% 1, | |
.Acquire, | |
.Monotonic, | |
) orelse return Pop{ | |
.task = self.buffer[head % capacity], | |
}; | |
} | |
const new_tail = tail -% 1; | |
@atomicStore(Index, &self.tail, new_tail, .SeqCst); | |
head = @atomicLoad(Index, &self.head, .SeqCst); | |
std.debug.assert(tail -% head <= capacity); | |
var task: ?*Task = null; | |
if (head != tail) { | |
task = self.buffer[new_tail % capacity]; | |
if (head != new_tail) { | |
return task; | |
} | |
if (@cmpxchgStrong(Index, &self.head, head, tail, .SeqCst, .Monotonic)) |new_head| { | |
task = null; | |
std.debug.assert(new_head == tail); | |
} | |
} | |
@atomicStore(Index, &self.tail, tail, .Monotonic); | |
return task; | |
} | |
pub fn steal(noalias self: *BufferQueue, noalias target: anytype, context: Pop.Context) ?Pop { | |
const is_unbounded = switch (@TypeOf(target)) { | |
*BufferQueue => false, | |
*FifoQueue => true, | |
else => |T| @compileError("Cannot steal from " ++ @typeName(T)), | |
}; | |
if (is_unbounded) { | |
var consumer = target.tryAcquireConsumer() orelse return null; | |
defer consumer.release(); | |
// TODO: Figure out a good limit to the max amount of Tasks | |
// that can be popped from the consumer to reduce the cache | |
// polution effects of iterating an intrusive linked list. | |
const MAX_PUSH = std.math.maxInt(usize); | |
const tail = self.tail; | |
const max_push = blk: { | |
const head = @atomicLoad(Index, &self.head, .Unordered); | |
const size = tail -% head; | |
std.debug.assert(size <= capacity); | |
const free_slots = capacity - size; | |
break :blk std.math.min(free_slots, MAX_PUSH); | |
}; | |
var pushed: Index = 0; | |
while (pushed < max_push) : (pushed += 1) { | |
const task = consumer.pop() orelse break; | |
const index = (tail +% pushed) % capacity; | |
@atomicStore(*Task, &self.buffer[index], task, .Unordered); | |
} | |
var task = consumer.pop(); | |
if (task == null and pushed > 0) { | |
pushed -= 1; | |
task = self.buffer[(tail +% pushed) % capacity]; | |
} | |
if (pushed > 0) @atomicStore(Index, &self.tail, tail +% pushed, .Release); | |
return Pop{ | |
.task = task, | |
.pushed = pushed > 0, | |
}; | |
} | |
const tail = self.tail; | |
const head = @atomicLoad(usize, &self.head, .Monotonic); | |
const size = tail -% head; | |
std.debug.assert(size == 0); | |
var target_head = @atomicLoad(Index, &target.head, .SeqCst); | |
while (true) { | |
const target_tail = @atomicLoad(Index, &target.tail, .SeqCst); | |
if (context.is_lifo and (target_tail == (target_head -% 1))) { | |
return null; | |
} | |
const target_size = target_tail -% target_head; | |
if (target_size == 0) { | |
return null; | |
} | |
var target_steal = target_size - (target_size / 2); | |
if (context.is_lifo) target_steal = 1; | |
if (target_steal > capacity / 2) { | |
target_head = @atomicLoad(Index, &target.head, .Acquire); | |
continue; | |
} | |
var pushed: usize = 0; | |
while (pushed < target_steal) : (pushed += 1) { | |
const task = @atomicLoad(*Task, &target.buffer[(target_head +% pushed) % capacity], .Unordered); | |
@atomicStore(*Task, &self.buffer[(tail +% pushed) % capacity], task, .Unordered); | |
} | |
if (@cmpxchgStrong( | |
Index, | |
&target.head, | |
target_head, | |
target_head +% target_steal, | |
.AcqRel, | |
.Acquire, | |
)) |updated| { | |
target_head = updated; | |
continue; | |
} | |
pushed -= 1; | |
if (pushed > 0) { | |
@atomicStore(Index, &self.tail, tail +% pushed, .Release); | |
} | |
return Pop{ | |
.task = self.buffer[(tail +% pushed) % capacity], | |
.pushed = pushed > 0, | |
}; | |
} | |
} | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Task = @import("./Task.zig"); | |
const ThreadPool = @This(); | |
scheduler: Task.Scheduler, | |
optimize_for: ?OptimizeFor, | |
use_caller_thread: bool, | |
idle_queue: AtomicUsize = .{}, | |
join_event: std.Thread.StaticResetEvent = .{}, | |
pub const Config = struct { | |
max_threads: ?usize = null, | |
optimize_for: ?OptimizeFor = null, | |
pub const OptimizeFor = enum { | |
Latency, | |
Throughput, | |
}; | |
}; | |
pub fn init(config: Config) ThreadPool { | |
const max_threads = if (std.builtin.single_threaded) @as(usize, 1) else blk: { | |
const max_threads = config.max_threads orelse std.Thread.cpuCount() catch 1; | |
break :blk std.math.max(1, max_threads); | |
}; | |
return ThreadPool{ | |
.scheduler = Task.Scheduler.init(.{ | |
.system = &system_impl, | |
.max_workers = max_threads, | |
}), | |
.optimize_for = config.optimize_for, | |
.use_caller_thread = false, | |
}; | |
} | |
pub fn deinit(self: *ThreadPool) void { | |
self.scheduler.deinit(); | |
self.join_event.deinit(); | |
self.* = undefined; | |
} | |
pub const StartConfig = struct { | |
use_caller_thread: bool = std.builtin.single_threaded, | |
first_runnable: ?*Runnable = null, | |
}; | |
pub fn start(self: *ThreadPool, config: StartConfig) void { | |
self.use_caller_thread = std.builtin.single_threaded or config.use_caller_thread; | |
if (self.use_caller_thread) { | |
std.debug.assert(self.first_runnable != null); | |
} | |
if (self.first_runnable) |runnable| { | |
self.scheduler.schedule(Task.Batch.from(&runnable.task)); | |
} | |
} | |
pub fn stop(self: *ThreadPool) void { | |
self.scheduler.shutdown(); | |
} | |
pub const Runnable = struct { | |
task: Task = undefined, | |
runFn: fn(*Runnable) void, | |
}; | |
pub fn schedule(self *ThreadPool, runnable: *Runnable) void { | |
const batch = Task.Batch.from(&runnable.task); | |
if (Thread.getCurrent()) |thread| { | |
thread.worker.schedule(batch); | |
} else { | |
self.scheduler.schedule(batch); | |
} | |
} | |
pub const SpawnConfig = struct { | |
allocator: *std.mem.Allocator, | |
}; | |
pub fn spawn( | |
self: *ThreadPool, | |
config: SpawnConfig, | |
comptime function: anytype, | |
parameters: anytype, | |
) !void { | |
const Params = @TypeOf(parameters); | |
const Closure = struct { | |
params: Params, | |
allocator: *std.mem.Allocator, | |
runnable: Runnable = .{ .runFn = run }, | |
fn run(runnable: *Runnable) void { | |
const self = @fieldParentPtr(@This(), "runnable", runnable); | |
const result = @call(.{}, function, self.params); | |
self.allocator.destroy(self); | |
} | |
}; | |
const allocator = config.allocator; | |
const closure = try allocator.create(Closure); | |
errdefer allocator.destroy(closure); | |
closure.* = .{ | |
.params = parameters, | |
.allocator = allocator, | |
}; | |
return self.schedule(&closure.runnable); | |
} | |
const IDLE_QUEUE_EMPTY: usize = 0; | |
const IDLE_QUEUE_NOTIFIED: usize = 1; | |
const IDLE_QUEUE_SHUTDOWN: usize = 2; | |
fn idleWait(self: *ThreadPool, thread: *Thread) void { | |
var idle_queue = self.idle_queue.load(.Monotonic); | |
while (true) { | |
if (idle_queue.value == IDLE_QUEUE_SHUTDOWN) { | |
return; | |
} | |
var new_idle_queue: usize = undefined; | |
if (idle_queue.value == IDLE_QUEUE_NOTIFIED) { | |
new_idle_queue = IDLE_QUEUE_EMPTY; | |
} else { | |
const next = @intToPtr(?*Thread, idle_queue.value); | |
@atomicStore(?*Thread, &thread.idle_next, next, .Unordered); | |
new_idle_queue = @ptrToInt(thread); | |
} | |
if (idle_queue.tryCompareAndSwap( | |
idle_queue, | |
new_idle_queue, | |
.AcqRel, | |
.Monotonic, | |
)) |updated| { | |
idle_queue = updated; | |
continue; | |
} | |
if (idle_queue.value != IDLE_QUEUE_NOTIFIED) { | |
thread.idle_event.wait(); | |
thread.idle_event.reset(); | |
} | |
return; | |
} | |
} | |
fn idleWake(self: *ThreadPool, shutdown: bool) void { | |
if (shutdown) { | |
const idle_queue = @atomicRmw( | |
usize, | |
&self.idle_queue.value, | |
.Xchg, | |
IDLE_QUEUE_SHUTDOWN, | |
.AcqRel, | |
); | |
std.debug.assert(idle_queue != IDLE_QUEUE_SHUTDOWN); | |
if (idle_queue != IDLE_QUEUE_NOTIFIED) { | |
var idle_threads = @intToPtr(?*Thread, idle_queue); | |
while (idle_threads) |thread| { | |
idle_threads = thread.idle_next; | |
thread.idle_event.set(); | |
} | |
} | |
return; | |
} | |
var idle_queue = self.idle_queue.load(.Acquire); | |
while (true) { | |
const new_idle_queue = switch (idle_queue.value) { | |
IDLE_QUEUE_EMPTY => IDLE_QUEUE_NOTIFIED, | |
IDLE_QUEUE_SHUTDOWN => return, | |
IDLE_QUEUE_NOTIFIED => return, | |
else => |idle_queue| blk: { | |
const thread = @intToPtr(*Thread, idle_queue); | |
const next = @atomicLoad(?*Thread, &thread.idle_next, .Unordered); | |
break :blk @ptrToInt(next); | |
}, | |
}; | |
if (idle_queue.tryCompareAndSwap( | |
idle_queue, | |
new_idle_queue, | |
.AcqRel, | |
.Acquire, | |
)) |updated| { | |
idle_queue = updated; | |
continue; | |
} | |
if (new_idle_queue != IDLE_QUEUE_NOTIFIED) { | |
const thread = @intToPtr(*Thread, idle_queue.value); | |
thread.idle_event.set(); | |
} | |
return; | |
} | |
} | |
const Thread = struct { | |
worker: Task.Worker, | |
handle: ?*std.Thread, | |
idle_event: std.Thread.StaticResetEvent = .{}, | |
exit_event: std.Thread.StaticResetEvent = .{}, | |
sched_tick: u8 = 0, | |
iter: Task.Worker.Iter = .{}, | |
fn getCurrent() ?*Thread { | |
return @intToPtr(?*Thread, ThreadLocal.get()); | |
} | |
fn trySpawn(pool: *ThreadPool) bool { | |
if (pool.use_caller_thread) { | |
pool.use_caller_thread = false; | |
Thread.run(pool, null); | |
return true; | |
} | |
const Spawner = struct { | |
_pool: *ThreadPool, | |
_handle: *std.Thread = undefined, | |
put_event: std.Thread.StaticResetEvent = .{}, | |
got_event: std.Thread.StaticResetEvent = .{}, | |
fn run(self: *@This()) void { | |
self.put_event.wait(); | |
const _pool = self._pool; | |
const _handle = self._handle; | |
self.got_event.set(); | |
Thread.run(_pool, _handle); | |
} | |
}; | |
var spawner = Spawner{ ._pool = pool }; | |
spawner._handle = std.Thread.spawn(Spawner.run, &spawner); | |
spawner.put_event.set(); | |
spawner.got_event.wait(); | |
spawner.put_event.deinit(); | |
spawner.got_event.deinit(); | |
return true; | |
} | |
fn run(pool: *ThreadPool, handle: ?*std.Thread) void { | |
var self = Thread{ | |
.worker = undefined, | |
.handle = handle, | |
.tick_trigger = blk: { | |
const optimize_for = pool.optimize_for orelse break :blk 61; | |
break :blk switch (optimize_for) { | |
.Latency => 0, | |
.Throughput => null, | |
}; | |
}, | |
}; | |
ThreadLocal.set(@ptrToInt(&self)); | |
self.worker.run(&pool.scheduler); | |
} | |
}; | |
const ThreadLocal = if (std.Target.current.isDarwin()) | |
struct { | |
const pthread_key_t = c_ulong; | |
extern "c" fn pthread_key_create(key: *pthread_key_t, destructor: ?fn (value: *c_void) callconv(.C) void) c_int; | |
extern "c" fn pthread_getspecific(key: pthread_key_t) ?*c_void; | |
extern "c" fn pthread_setspecific(key: pthread_key_t, value: ?*c_void) c_int; | |
const dispatch_once_t = usize; | |
const dispatch_function_t = fn (?*c_void) callconv(.C) void; | |
extern "c" fn dispatch_once_f(predicate: *dispatch_once_t, context: ?*c_void, function: dispatch_function_t) void; | |
var tls_once: dispatch_once_t = 0; | |
var tls_key: pthread_key_t = undefined; | |
fn tls_init(_: ?*c_void) callconv(.C) void { | |
std.debug.assert(pthread_key_create(&tls_key, null) == 0); | |
} | |
fn get() usize { | |
dispatch_once_f(&tls_once, null, tls_init); | |
return @ptrToInt(pthread_getspecific(tls_key)); | |
} | |
fn set(value: usize) void { | |
dispatch_once_f(&tls_once, null, tls_init); | |
std.debug.assert(pthread_setspecific(tls_key, @intToPtr(?*c_void, value)) == 0); | |
} | |
} | |
else | |
struct { | |
threadlocal var tls_value: usize = 0; | |
fn get() usize { | |
return tls_value; | |
} | |
fn set(value: usize) void { | |
tls_value = value; | |
} | |
}; | |
const AtomicUsize = switch (std.Target.current.cpu.arch) { | |
.i386, .x86_64 => extern struct { | |
const DoubleWord = std.meta.Int(.unsigned, std.meta.bitCount(usize) * 2); | |
value: usize align(@alignOf(DoubleWord))= 0, | |
aba_tag: usize = 0, | |
pub fn load( | |
self: *const AtomicUsize, | |
comptime order: std.builtin.AtomicOrder, | |
) AtomicUsize { | |
return AtomicUsize{ | |
.value = @atomicLoad(usize, &self.value, order), | |
.aba_tag = @atomicLoad(usize, &self.aba_tag, .Unordered), | |
}; | |
} | |
pub fn tryCompareAndSwap( | |
self: *AtomicUsize, | |
compare: AtomicUsize, | |
exchange: usize, | |
comptime success: std.builtin.AtomicOrder, | |
comptime failure: std.builtin.AtomicOrder, | |
) ?AtomicUsize { | |
const double_word = @cmpxchgWeak( | |
DoubleWord, | |
@ptrCast(*DoubleWord, self), | |
@bitCast(DoubleWord, compare), | |
@bitCast(DoubleWord, AtomicUsize{ | |
.value = exchange, | |
.aba_tag = compare.aba_tag +% 1, | |
}), | |
success, | |
failure, | |
) orelse return null; | |
return @bitCast(AtomicUsize, double_word); | |
} | |
}, | |
else => extern struct { | |
value: usize = 0, | |
pub fn load( | |
self: *const AtomicUsize, | |
comptime order: std.builtin.AtomicOrder, | |
) AtomicUsize { | |
const value = @atomicLoad(usize, &self.value, order); | |
return AtomicUsize{ .value = value }; | |
} | |
pub fn tryCompareAndSwap( | |
self: *AtomicUsize, | |
compare: AtomicUsize, | |
exchange: usize, | |
comptime success: std.builtin.AtomicOrder, | |
comptime failure: std.builtin.AtomicOrder, | |
) ?AtomicUsize { | |
const value = @cmpxchgWeak( | |
usize, | |
&self.value, | |
compare.value, | |
exchange, | |
success, | |
failure, | |
) orelse return null; | |
return AtomicUsize{ .value = value }; | |
} | |
}, | |
}; | |
var system_impl = Task.System{ .callFn = systemCall }; | |
fn systemCall(_: *Task.System, command: Task.System.Command) void { | |
switch (command) { | |
.Spawn => |cmd| { | |
const pool = @fieldParentPtr(ThreadPool, "scheduler", cmd.scheduler); | |
cmd.spawned.* = Thread.trySpawn(pool); | |
}, | |
.Join => |cmd| switch (cmd) { | |
.Wait => |scheduler| { | |
const pool = @fieldParentPtr(ThreadPool, "scheduler", scheduler); | |
pool.join_event.wait(); | |
}, | |
.Wake => |scheduler| { | |
const pool = @fieldParentPtr(ThreadPool, "scheduler", scheduler); | |
pool.join_event.set(); | |
}, | |
}, | |
.Resume => |cmd| switch (cmd) { | |
.Exit => |worker| { | |
const thread = @fieldParentPtr(Thread, "worker", worker); | |
if (thread.handle) |handle| { | |
thread.exit_event.set(); | |
handle.wait(); | |
} | |
}, | |
.Signal => |scheduler| { | |
const pool = @fieldParentPtr(ThreadPool, "scheduler", scheduler); | |
pool.idleWake(false); | |
}, | |
.Broadcast => |scheduler| { | |
const pool = @fieldParentPtr(ThreadPool, "scheduler", scheduler); | |
pool.idleWake(true); | |
}, | |
}, | |
.Suspend => |cmd| switch (cmd) { | |
.Exit => |worker| { | |
const thread = @fieldParentPtr(Thread, "worker", worker); | |
if (thread.handle) |handle| { | |
thread.exit_event.wait(); | |
} | |
}, | |
.Wait => |wait| { | |
const thread = @fieldParentPtr(Thread, "worker", wait.worker); | |
const pool = @fieldParentPtr(ThreadPool, "scheduler", wait.worker.getScheduler()); | |
wait.scheduled.* = false; | |
pool.idleWait(thread); | |
}, | |
}, | |
.Poll => |cmd| { | |
const thread = @fieldParentPtr(Thread, "worker", cmd.worker); | |
const pool = @fieldParentPtr(ThreadPool, "scheduler", cmd.worker.getScheduler()); | |
const max_attempts: usize = blk: { | |
const optimize_for = pool.optimize_for orelse break :blk 8; | |
break :blk switch (optimize_for) { | |
.Latency => 4, | |
.Throughput => 32, | |
}; | |
}; | |
if (cmd.attempt >= max_attempts) { | |
cmd.targets.* = null; | |
return; | |
} | |
const be_fair: bool = blk: { | |
const optimize_for = pool.optimize_for orelse thread.sched_tick % 61 == 0; | |
break :blk switch (optimize_for) { | |
.Latency => true, | |
.Throughput => false, | |
} | |
}; | |
}, | |
.Execute => |cmd| { | |
const thread = @fieldParentPtr(Thread, "worker", cmd.worker); | |
const runnable = @fieldParentPtr(Runnable, "task", cmd.task); | |
thread.sched_tick +%= 1; | |
(runnable.runFn)(runnable); | |
}, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment