Skip to content

Instantly share code, notes, and snippets.

@kprotty
Last active July 24, 2023 04:26
Show Gist options
  • Save kprotty/08b9bc0658f57cb9412ca48ebe653a66 to your computer and use it in GitHub Desktop.
Save kprotty/08b9bc0658f57cb9412ca48ebe653a66 to your computer and use it in GitHub Desktop.
const std = @import("std");
const Futex = std.Thread.Futex;
const assert = std.debug.assert;
const Atomic = std.atomic.Atomic;
const Task = @This();
next: ?*Task = null,
runFn: fn(*Task) void,
pub const Group = struct {
head: ?*Task = null,
tail: ?*Task = null,
pub fn from(task: *Task) Group {
return .{ .head = task, .tail = task };
}
pub fn push(self: *Group, group: Group) {
const tail = group.tail orelse return;
tail.next = self.head;
self.head = group.head;
self.tail = self.tail orelse tail;
}
pub fn pop(self: *Group) ?*Task {
const task = self.head orelse return null;
self.head = task.next;
if (self.head == null) self.tail = null;
return task;
}
};
pub const Scheduler = struct {
max_threads: u14,
stack_size: usize,
runnable: List = .{},
workers: Atomic(?*Worker) = Atomic(?*Worker).init(null),
sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})),
const Sync = packed struct {
idle: u14 = 0,
spawned: u14 = 0,
reserved: bool = false,
notified: bool = false,
state: enum(u2) {
pending = 0,
waking,
signaled,
shutdown,
},
};
pub fn schedule(self: *Scheduler, group: Group) error{Shutdown}!void {
assert(group.head != null);
assert(group.tail != null);
var maybe_worker = Worker.current;
if (maybe_worker) |worker| {
worker.push(group);
} else {
self.runnable.push(group);
}
return self.notifyWith(maybe_worker);
}
fn notifyWith(
noalias self: *Scheduler,
noalias worker: ?*Worker,
) error{Shutdown}!void {
var max_spawned = self.max_threads;
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
var is_waking = if (worker) |w| w.state == .waking else false;
while (true) {
if (sync.state == .shutdown) return error.Shutdown;
if (is_waking) assert(sync.state == .waking);
const can_wake = is_waking or sync.state == .pending;
var new_sync = sync;
new_sync.notified = true;
if (can_wake and sync.idle > 0) {
new_sync.state = .signaled;
} else if (can_wake and sync.spawned < max_spawned) {
new_sync.state = .signaled;
new_sync.spawned += 1;
} else if (is_waking) {
new_sync.state = .pending;
} else if (sync.notified) {
break;
}
if (self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.Release,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
if (can_wake and sync.idle > 0) {
Futex.wake(&self.sync, 1);
} else if (can_wake and sync.spawned < max_spawned) {
Worker.spawn(self) catch self.complete();
}
return;
}
}
fn waitWith(
noalias self: *Scheduler,
noalias worker: *Worker,
) error{Shutdown}!void {
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
wait_loop: while (true) {
if (sync.state == .shutdown) {
worker.state = .shutdown;
self.complete();
return error.Shutdown;
}
const can_wake = sync.notified or sync.state == .signaled;
if (can_wake or worker.state != .idle) {
var new_sync = sync;
new_sync.notified = false;
if (can_wake) {
if (sync.state == .signaled) new_sync.state = .waking;
if (worker.state == .idle) new_sync.idle -= 1;
} else {
if (worker.state == .waking) new_sync.state = .pending;
new_sync.idle += 1;
}
if (self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.Acquire,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
if (can_wake) {
if (worker.state == .idle) worker.state = .running;
if (sync.state == .signaled) worker.state = .waking;
return;
}
sync = new_sync;
worker.state = .idle;
}
var spin: u8 = 10;
while (spin > 0) : (spin -= 1) {
std.atomic.spinLoopHint();
sync = @bitCast(Sync, self.sync.load(.Monotonic));
if (sync.notified or sync.state == .shutdown) {
continue :wait_loop;
}
}
Futex.wait(&self.sync, @bitCast(u32, sync), null) catch unreachable;
sync = @bitCast(Sync, self.sync.load(.Monotonic));
}
}
pub fn shutdown(self: *Scheduler) void {
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (sync.state != .shutdown) {
var new_sync = sync;
new_sync.idle = 0;
new_sync.notified = false;
new_sync.state = .shutdown;
if (self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.AcqRel,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
if (sync.idle > 0) Futex.wake(&self.sync, 1);
return;
}
}
pub fn join(self: *Scheduler) void {
completed: {
var spin: u8 = 10;
var is_waiting = false;
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (true) {
assert(sync.state == .shutdown);
if (sync.spawned == 0) {
break :completed;
}
if (spin > 0) {
spin -= 1;
std.atomic.spinLoopHint();
sync = @bitCast(Sync, self.sync.load(.Monotonic));
continue;
}
if (!is_waiting) {
var new_sync = sync;
new_sync.idle += 1;
if (self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.Monotonic,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
sync = new_sync;
is_waiting = true;
}
Futex.wait(&self.sync, @bitCast(u32, sync), null) catch unreachable;
sync = @bitCast(Sync, self.sync.load(.Monotonic));
}
}
var worker = self.workers.load(.Monotonic) orelse return;
worker = self.workers.swap(null, .Acquire) orelse return;
worker.complete();
}
fn complete(self: *Scheduler) void {
var sync = Sync{ .spawned = 1 };
sync = @bitCast(Sync, self.sync.fetchSub(@bitCast(u32, sync), .AcqRel));
assert(sync.state == .shutdown);
assert(sync.spawned >= 1);
if (sync.spawned == 0 and sync.idle > 0) {
Futex.wake(&self.sync, sync.idle);
}
}
};
const Worker = struct {
next: ?*Worker = null,
buffer: Buffer = .{},
runnable: List = .{},
target: ?*Worker = null,
joined: Atomic(u32) = Atomic(u32).init(0),
state: enum {
running,
waking,
idle,
shutdown,
} = .running,
threadlocal var current: ?*Worker = null;
fn spawn(noalias scheduler: *Scheduler) !void {
const thread = try std.Thread.spawn(
.{ .stack_size = scheduler.stack_size },
Worker.run,
.{ scheduler },
);
thread.detach();
}
fn run(noalias scheduler: *Scheduler) void {
var self = Worker{};
current = &self;
var workers = scheduler.workers.load(.Monotonic);
while (true) {
self.next = workers;
workers = scheduler.workers.tryCompareAndSwap(
workers,
self,
.Release,
.Monotonic,
) orelse break;
}
while (true) {
const result = self.pop() orelse {
scheduler.waitWith(&self) catch return self.join();
continue;
};
if (result.pushed or self.state == .waking) {
scheduler.notifyWith(&self);
}
self.state = .running;
(result.task.runFn)(result.task);
}
}
fn join(self: *Worker) void {
while (self.joined.load(.Acquire) == 0) {
Futex.wait(&self.joined, 0, null) catch unreachable;
}
const next_worker = self.next orelse return;
next_worker.complete();
}
fn complete(self: *Worker) void {
self.joined.store(1, .Release);
Futex.wake(&self.joined, 1);
}
fn push(self: *Worker, group: Group) void {
if (self.buffer.push(group)) |overflowed| {
self.runnable.push(overflowed);
}
}
fn pop(noalias self: *Worker, noalias scheduler: *Scheduler) ?Buffer.Pop {
var check_runnable = true;
if (self.buffer.pop()) |result| {
return result;
}
var attempts: u8 = 4;
while (attempts > 0) : (attempts -= 1) {
if (check_runnable) blk: {
return self.buffer.steal(&self.runnable) catch |err| {
check_runnable = err != error.Empty;
break :blk;
};
}
var was_contended = false;
defer if (was_contended) {
attempts += 1;
};
var num_workers = @bitCast(Sync, scheduler.sync.load(.Monotonic)).spawned;
while (num_workers > 0) : (num_workers -= 1) {
const target = self.target orelse blk: {
self.target = scheduler.workers.load(.Acquire);
break :blk self.target.?;
};
if (target != self) blk: {
return self.buffer.steal(&target.runnable) catch |err| {
if (err == error.Contended) was_contended = true;
return self.buffer.steal(&target.buffer) catch |run_err| {
if (run_err == error.Contended) was_contended = true;
break :blk;
};
};
}
self.target = target.next;
std.atomic.spinLoopHint();
}
return self.buffer.steal(&scheduler.runnable) catch |err| {
if (err == error.Contended) was_contended = true;
continue;
};
}
return null;
}
};
const List = struct {
input: Atomic(?*Task) = Atomic(?*Task).init(null),
output: Atomic(usize) = Atomic(usize).init(0),
const IS_OUTPUT_EMPTY: usize = 0;
const IS_OUTPUT_CONSUMING: usize = 1;
fn push(
self: *List,
group: Group,
comptime producer_type: enum{ single_producer, multi_producer },
) void {
const head = group.head orelse unreachable;
const tail = group.tail orelse unreachable;
var input = self.input.load(.Monotonic);
while (true) {
tail.next = input;
if (producer_type == .single_producer and input == null) {
self.input.store(head, .Release);
break;
}
input = self.input.tryCompareAndSwap(
input,
head,
.Release,
.Monotonic,
) orelse break;
}
}
fn consume(self: *List) error{Empty, Contended}!Consumer {
const output = self.output.load(.Monotonic);
if (output == IS_OUTPUT_CONSUMING) {
return error.Contended;
}
if (output == IS_OUTPUT_EMPTY) {
if (self.input.load(.Monotonic) == null) {
return error.Empty;
}
}
if (self.output.compareAndSwap(
output,
IS_OUTPUT_CONSUMING,
.Acquire,
.Monotonic,
)) |updated| {
return error.Contended;
}
return Consumer{
.output = @intToPtr(*Task, output),
.list = self,
};
}
const Consumer = struct {
output: ?*Task,
list: *List,
fn pop(self: *Consumer) ?*Task {
if (self.output) |task| {
self.output = task.next;
return task;
}
var input = self.list.input.load(.Monotonic) orelse return null;
input = self.list.input.swap(null, .Acquire) orelse unreachable;
self.output = input.next;
return input;
}
fn release(self: Consumer) void {
assert(self.list.output.loadUnchecked() == IS_OUTPUT_CONSUMING);
const output = @ptrToInt(self.output);
self.list.output.store(output, .Release);
}
};
};
const Buffer = struct {
head: Atomic(i32) = Atomic(i32).init(0),
tail: Atomic(i32) = Atomic(i32).init(0),
array: [256]Atomic(*Task) = undefined,
fn push(self: *Buffer, group: Group) ?Group {
const task = group.head orelse unreachable;
if (group.tail != task) {
return group;
}
var tail = self.tail.loadUnchecked();
var head = self.head.load(.Monotonic);
while (true) {
if (tail -% head < self.array.len) {
self.array[tail % self.array.len].store(task, .Unordered);
self.tail.store(tail +% 1, .Release);
return null;
}
var migrate = @intCast(i32, self.array.len / 2);
if (self.head.tryCompareAndSwap(
head,
head +% migrate,
.Acquire,
.Monotonic,
)) |updated| {
head = updated;
continue;
}
head +%= migrate - 1;
var overflowed = Group.from(task);
while (migrate > 0) : (migrate -= 1) {
const index = head % self.array.len;
const task = self.array[index].loadUnchecked();
overflowed.push(Group.from(task));
head -%= 1;
}
return overflowed;
}
}
const Pop = struct {
task: *Task,
pushed: bool = false,
};
fn pop(self: *Buffer) ?Pop {
var tail = self.tail.loadUnchecked();
var head = self.head.load(.Monotonic);
if (head == tail) {
return null;
}
const new_tail = tail -% 1;
self.tail.store(new_tail, .SeqCst);
head = self.head.load(.SeqCst);
var task: ?*Task = null;
if (head != tail) {
task = self.array[new_tail % self.array.len].loadUnchecked();
if (head == new_tail) {
return Pop{ .task = task };
}
_ = self.head.compareAndSwap(
head,
tail,
.SeqCst,
.Monotonic,
) orelse return Pop{ .task = task };
task = null;
}
self.tail.store(tail, .Monotonic);
return null;
}
fn steal(
noalias self: *Buffer,
noalias target: anytype,
) error{Empty, Contended}!Pop {
const is_buffer = switch (@TypeOf(target)) {
*List => false,
*Buffer => true,
else => |t| @compileError("Can't steal from " ++ @typeName(t)),
};
if (is_buffer) {
const head = target.head.load(.Acquire);
const tail = target.tail.load(.Acquire);
if (tail -% head <= 0) {
return error.Empty;
}
const index = head % target.array.len;
const task = target.array[index].load(.Unordered);
_ = target.head.compareAndSwap(
head,
head +% 1,
.SeqCst,
.Monotonic,
) orelse return Pop{ .task = task };
return error.Contended;
}
var consumer = try target.consume();
defer consumer.release();
const tail = self.tail.loadUnchecked();
var new_tail = tail;
if (std.debug.runtime_safety) {
const head = self.head.load(.Monotonic);
assert(tail == head);
}
var free_slots = self.array.len;
while (free_slots > 0) : (free_slots -= 1) {
const task = consumer.pop() orelse break;
self.array[new_tail % self.array.len].store(task, .Unordered);
new_tail +%= 1;
}
const task = consumer.pop() orelse blk: {
if (new_tail == tail) break :blk null;
new_tail -%= 1;
break :blk self.array[new_tail % self.array.len].loadUnchecked();
};
const pushed = new_tail != tail;
if (pushed) {
self.tail.store(new_tail, .Release);
}
return Pop{
.task = task,
.pushed = pushed,
};
}
};
const std = @import("std");
const Futex = std.Thread.Futex;
const assert = std.debug.assert;
const Atomic = std.atomic.Atomic;
const arch = std.Target.current.cpu.arch;
const ThreadPool = @This();
config: Config,
runnable: List = .{},
workers: Atomic(?*Worker) = Atomic(?*Worker).init(null),
sync: Atomic(u32) = Atomic(u32).init(@bitCast(u32, Sync{})),
pub const Config = struct {
max_threads: u14,
stack_size: usize = (std.Thread.SpawnConfig{}).stack_size,
};
pub fn init(config: Config) ThreadPool {
return .{ .config = config };
}
pub fn deinit(self: *ThreadPool) void {
return self.waitForShutdown();
}
pub const Runnable = struct {
next: ?*Runnable = undefined,
runFn: fn(*Runnable, *ThreadPool) void,
};
pub const Batch = struct {
head: ?*Batch = null,
tail: *Batch = undefined,
pub fn from(runnable: *Runnable) Batch {
runnable.next = null;
return Batch{
.head = runnable,
.tail = runnable,
};
}
pub fn isEmpty(self: Batch) bool {
return self.head == null;
}
pub fn push(noalias self: *Batch, batch: Batch) void {
if (batch.isEmpty()) return;
if (self.isEmpty()) {
self.* = batch;
} else {
batch.tail.next = self.head;
self.head = batch.head;
}
}
};
pub fn getScheduler(self: *ThreadPool) Scheduler {
return Scheduler {
.thread_pool = self,
.producer = blk: {
if (Worker.tls_current) |worker| {
break :blk .{ .local = worker.getProducer() };
}
const is_single_producer = false;
break :blk .{ .remote = self.runnable.getProducer(is_single_producer) };
},
};
}
pub const Scheduler = struct {
thread_pool: *ThreadPool,
producer: union(enum) {
local: Worker.Producer,
remote: List.Producer,
},
pub fn submit(
noalias self: *Scheduler,
noalias runnable: *Runnable,
) void {
return self.submitBatch(Batch.from(runnable));
}
pub fn submitBatch(noalias self: *Scheduler, batch: Batch) void {
if (batch.isEmpty()) return;
switch (self.producer) {
.local => |*producer| producer.push(batch),
.remove => |*producer| producer.push(batch),
}
}
pub fn schedule(self: Scheduler) void {
if (switch (self.producer) {
.local => |producer| producer.commit(),
.remove => |producer| producer.commit(),
}) {
const is_waking = false;
self.thread_pool.notify(is_waking);
}
}
};
const Spin = struct {
count: u8 = if (arch.isX86()) 100 else 10,
fn yield(self: *Spin) bool {
if (self.count == 0) return false;
std.atomic.spinLoopHint();
self.count -= 1;
}
};
const Sync = packed struct {
idle: u14 = 0,
spawned: u14 = 0,
_reserved: u1 = 0,
notified: bool = false,
state: enum(u2) {
pending = 0,
waking,
signaled,
shutdown,
};
};
fn register(noalias self: *ThreadPool, noalias worker: *Worker) bool {
var workers = self.workers.load(.Monotonic);
while (true) {
worker.next = workers;
workers = self.workers.tryCompareAndSwap(
workers,
worker,
.Release,
.Monotonic,
) orelse break;
}
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (true) {
if (sync.state == .shutdown) {
return false;
}
var new_sync = sync;
new_sync.notified = false;
if (sync.state == .signaled) {
new_sync.state = .waking;
} else if (sync.notified) {
return false;
}
if (self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.Acquire,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
const is_waking = sync.state == .signaled;
return is_waking;
}
}
fn unregister(noalias self: *ThreadPool, noalias maybe_worker: ?*Worker) void {
const remove = @bitCast(u32, Sync{ .spawned = 1 });
const updated = self.sync.fetchSub(remove, .AcqRel);
const sync = @bitCast(Sync, updated);
assert(sync.state == .shutdown);
assert(sync.spawned >= 1);
if (sync.spawned == 1 and sync.idle != 0) {
const notify_all = std.math.maxInt(u32);
Futex.wake(&self.sync, notify_all);
}
const worker = maybe_worker orelse return;
worker.join();
const next_worker = worker.next orelse return;
next_worker.shutdown();
}
pub fn isShutdown(self: *const ThreadPool) bool {
const sync = @bitCast(Sync, self.sync.load(.Acquire));
return sync.state == .shutdown;
}
pub fn shutdown(self: *ThreadPool) void {
@setCold(true);
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (sync.state != .shutdown) {
var new_sync = sync;
new_sync.idle = 0;
new_sync.notified = true;
new_sync.state = .shutdown;
if (self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.AcqRel,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
if (sync.idle > 0) {
const notify_all = std.math.maxInt(u32);
Futex.wake(&self.sync, notify_all);
}
return;
}
}
fn join(noalias self: *ThreadPool) void {
@setCold(true);
var spin = Spin{};
var is_waiting = false;
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (true) {
assert(sync.state == .shutdown);
if (sync.spawned == 0) {
break;
}
if (spin.yield()) {
sync = @bitCast(Sync, self.sync.load(.Monotonic));
continue;
}
if (!is_waiting) {
var old_sync = sync;
sync.idle += 1;
if (self.sync.tryCompareAndSwap(
@bitCast(u32, old_sync),
@bitCast(u32, sync),
.Monotonic,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
}
Futex.wait(&self.sync, @bitCast(u32, sync), null) catch unreachable;
sync = @bitCast(Sync, self.sync.load(.Monotonic));
is_waiting = true;
}
cosnt worker = self.workers.load(.Acquire) orelse return;
worker.shutdown();
}
fn notify(noalias self: *ThreadPool, is_waking: bool) callconv(.Inline) void {
const sync = @bitCast(Sync, self.sync.load(.Monotonic));
if (!sync.notified) {
self.notifySlow(is_waking);
}
}
fn notifySlow(noalias self: *ThreadPool, is_waking: bool) void {
@setCold(true);
const max_spawn = self.max_threads;
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (true) {
if (sync.state == .shutdown) return;
if (is_waking) assert(sync.state == .waking);
const can_wake = is_waking or (sync.state == .pending);
var new_sync = sync;
new_sync.notified = true;
if (sync.idle > 0 and can_wake) {
new_sync.state = .signaled;
} else if (sync.spawned < max_spawn and can_wake) {
new_sync.state = .signaled;
new_sync.spawned += 1;
} else if (is_waking) {
new_sync.state = .pending;
} else if (sync.notified) {
return;
}
if (self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.Release,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
if (sync.idle > 0 and can_wake) {
Futex.wake(&self.sync, 1);
} else if (sync.spawned < max_spawn and can_wake) {
Worker.spawn(self) catch self.unregister(null);
}
return;
}
}
fn wait(noalias self: *ThreadPool, is_waking: bool) error{Shutdown}!bool {
@setCold(true);
var spin = Spin{};
var is_waiting = false;
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (true) {
if (sync.state == .shutdown) {
return error.Shutdown;
}
if (sync.notified or !is_waiting) {
var new_sync = sync;
new_sync.notified = false;
if (sync.notified) {
if (sync.state == .signaled) new_sync.state = .waking;
if (is_waiting) new_sync.idle -= 1;
} else {
if (is_waking) new_sync.state = .pending;
new_sync.idle += 1;
}
if (self.sync.tryCompareAndSwap(
@bitCast(u32, sync),
@bitCast(u32, new_sync),
.Acquire,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
is_waiting = !sync.notified;
if (!is_waiting) {
if (sync.state == .signaled) return true;
if (was_waiting) return false;
return is_waking;
}
}
if (spin.yield()) {
sync = @bitCast(Sync, self.sync.load(.Monotonic));
continue;
}
Futex.wait(&self.sync, @bitCast(u32, sync), null) catch unreachable;
sync = @bitCast(Sync, self.sync.load(.Monotonic));
}
}
const Worker = struct {
runnable: List = .{},
buffer: Buffer = .{},
next: ?*Worker = undefined,
state: Atomic(State) = Atomic(State).init(.running),
const State = enum(u32) {
running,
joining,
shutdown,
};
threadlocal var tls_current: ?*Worker = null;
fn spawn(noalias thread_pool: *ThreadPool) !void {
const thread = try std.Thread.spawn(
.{ .stack_size = thread_pool.config.stack_size },
Worker.run,
.{ thread_pool },
);
thread.detach();
}
fn run(noalias thread_pool: *ThreadPool) void {
var self = Worker{};
tls_current = &self;
var is_waking = thread_pool.register(&self);
defer thread_pool.unregister(&self);
var steal_target: ?*Worker = null;
while (true) {
const popped = self.pop(thread_pool, &steal_target) orelse {
is_waking = thread_pool.wait(is_waking) catch break;
continue;
};
if (popped.pushed or is_waking) {
thread_pool.notify(is_waking);
}
is_waking = false;
(popped.runnable.runFn)(popped.runnable, thread_pool);
}
}
fn getProducer(noalias self: *Worker) Producer {
const is_single_producer = true;
return Producer{
.buffer_producer = self.buffer.getProducer(),
.list_producer = self.runnable.getProducer(is_single_producer),
};
}
const Producer = struct {
buffer_producer: Buffer.Producer,
list_producer: List.Producer,
fn push(noalias self: *Producer, batch: Batch) void {
const head = batch.head orelse unreachable;
if (batch.tail == head) {
self.pushRunnable(head);
} else {
self.list_producer.push(batch);
}
}
fn pushRunnable(noalias self: *Producer, noalias runnable: *Runnable) void {
const overflowed = self.buffer_producer.push(runnable) orelse return;
self.list_producer.push(runnable);
}
fn commit(self: Producer) bool {
const buffer_committed = self.buffer_producer.commit();
const list_committed = self.list_producer.commit();
return buffer_committed or list_committed;
}
};
fn pop(
self: *Worker,
noalias thread_pool: *ThreadPool,
noalias steal_target_ptr: *?*Worker,
) callconv(.Inline) ?Buffer.Popped {
if (self.buffer.pop()) |runnable| {
return Buffer.Popped{ .runnable = runnable };
}
return self.steal(thread_pool, steal_target_ptr);
}
fn steal(
self: *Worker,
noalias thread_pool: *ThreadPool,
noalias steal_target_ptr: *?*Worker,
) ?Buffer.Popped {
@setCold(true);
var attempts: u8 = if (arch.isX86()) 32 else 8;
while (attempts > 0) : (attempts -= 1) {
if (self.buffer.consume(&self.runnable)) |popped| {
return popped;
}
var num_workers: u16 = @bitCast(Sync, thread_pool.sync.load(.Monotonic)).spawned;
while (num_workers > 0) : (num_workers -= 1) {
const target_worker = steal_target_ptr.* orelse blk: {
const new_target = thread_pool.workers.load(.Acquire) orelse unreachable;
steal_target_ptr.* = new_target.next;
break :blk new_target;
};
if (target_worker != self) {
if (self.buffer.consume(&target_worker.runnable)) |popped| {
return popped;
} else if (target_worker.steal()) |runnable| {
return Buffer.Popped{ .runnable = runnable };
}
}
}
if (self.buffer.consume(&thread_pool.runnable)) |popped| {
return popped;
}
std.atomic.spinLoopHint();
}
return null;
}
fn shutdown(noalias self: *Worker) void {
const state = self.state.swap(.shutdown, .Release);
const ptr = @ptrCast(*const Atomic(u32), &self.state);
switch (state) {
.running => {},
.joining => Futex.wake(ptr, 1),
.shutdown => unreachable,
}
}
fn join(noalias self: *Worker) void {
var spin = Spin{};
while (true) {
const state = self.state.load(.Acquire);
if (state == .shutdown) {
return;
}
if (state == .running) {
if (spin.yield()) {
continue;
}
if (self.state.compareAndSwap(
.running,
.joining,
.Acquire,
.Acquire,
)) |updated| {
assert(updated == .shutdown);
return;
}
}
Futex.wait(
@ptrCast(*const Atomic(u32), &self.state),
@enumToInt(State.joining),
null,
) catch unreachable;
}
}
};
const List = struct {
input: Atomic(?*Runnable) = Atomic(?*Runnable).init(null),
output: Atomic(usize) = Atomic(usize).init(IS_EMPTY),
const IS_EMPTY: usize = 0b0;
const IS_CONSUMING: usize = 0b1;
comptime {
assert(@alignOf(Runnable) >= (IS_CONSUMING << 1));
}
fn getProducer(noalias self: *List, is_single_producer: bool) Producer {
return Producer{
.list = self,
.is_single_producer = is_single_producer,
};
}
const Producer = struct {
list: *List,
batch: Batch = .{},
is_single_producer: bool,
fn push(noalias self: *Producer, batch: Batch) void {
self.batch.push(batch);
}
fn commit(self: Producer) bool {
const head = self.batch.head orelse return false;
const tail = self.batch.tail;
var input = self.list.input.load(.Monotonic);
while (true) {
tail.next = input;
if (input == null and self.is_single_producer) {
self.list.input.store(head, .Release);
return true;
}
input = self.list.input.tryCompareAndSwap(
input,
head,
.Release,
.Monotonic,
) orelse return true;
}
}
};
fn getConsumer(noalias self: *List) ?Consumer {
var output = self.output.load(.Monotonic);
if (output == IS_CONSUMING) return null;
if (output == 0 and self.input.load(.Monotonic) == null) {
return null;
}
acquired: {
if (comptime arch.isX86()) {
output = self.output.swap(IS_CONSUMING, .Acquire);
if (output == IS_CONSUMING) return null;
break :acquired;
}
while (true) {
output = self.output.tryCompareAndSwap(
output,
IS_CONSUMING,
.Acquire,
.Monotonic,
) orelse break :acquired;
if (output == IS_CONSUMING) return null;
if (output == 0 and self.input.load(.Monotonic) == null) {
return null;
}
}
}
return Consumer{
.list = self,
.output = @intToPtr(?*Runnable, output),
};
}
const Consumer = struct {
list: *List,
output: ?*Runnable,
fn pop(self: *Consumer) ?*Runnable {
if (self.output) |runnable| {
self.output = runnable.next;
return runnable;
}
var input = self.list.input.load(.Monotonic) orelse return null;
input = self.list.input.swap(null, .Acquire) orelse unreachable;
self.output = input.next;
return input;
}
fn release(self: Consumer) void {
const output = @ptrToInt(self.output);
self.list.output.store(output, .Release);
}
};
};
const Buffer = struct {
head: Atomic(Index) align(std.atomic.cache_line) = Atomic(Index).init(0),
tail: Atomic(Index) align(std.atomic.cache_line) = Atomic(Index).init(0),
array: [capacity]Atomic(*Runnable) = undefined,
const Index = i32;
const capacity = 256;
comptime {
assert(@typeInfo(Index).Int.signedness == .signed);
assert(std.math.maxInt(Index) >= capacity);
}
fn getProducer(self: *Buffer) Producer {
const tail = self.tail.loadUnchecked();
const head = self.head.load(.Monotonic);
const size = tail -% head;
assert(size >= 0 and size <= capacity);
return Producer {
.buffer = self,
.tail = tail,
.pushed = 0,
.remaining = capacity - size,
};
}
const Producer = struct {
buffer: *Buffer,
tail: Index,
pushed: Index,
remaining: Index,
fn push(noalias self: *Producer, noalias runnable: *Runnable) ?Batch {
return switch (self.remaining) {
0 => self.pushOverflow(runnable),
else => self.pushRunnable(runnable),
};
}
fn pushRunnable(noalias self: *Producer, noalias runnable: *Runnable) ?Batch {
const index = self.tail +% self.pushed;
self.pushed += 1;
runnable.next = null;
if (self.pushed > 0) {
runnable.next = self.buffer.array[(index -% 1) % capacity].loadUnchecked();
}
self.buffer.array[index % capacity].store(runnable, .Unordered);
self.remaining -= 1;
return null;
}
fn pushOverflow(noalias self: *Producer, noalias runnable: *Runnable) ?Batch {
@setCold(true);
const head = self.buffer.head.load(.Monotonic);
const size = (self.tail +% self.pushed) -% self.head;
assert(size >= 0 and size <= capacity);
self.remaining = capacity - size;
if (self.remaining != 0) {
return self.pushRunnable(runnable);
}
const migrate = capacity / 2;
if (self.buffer.head.compareAndSwap(
head,
head +% migrate,
.Acquire,
.Monotonic,
)) |updated| {
self.remaining = capacity - (index -% updated);
return self.pushRunnable(runnable);
}
const first = runnable;
first.next = self.buffer.array[(head +% (overflow - 1)) % capacity].loadUnchecked();
const last = self.buffer.array[head % capacity].loadUnchecked();
last.next = null;
return Batch{
.head = first,
.tail = last,
};
}
fn commit(self: Producer) bool {
if (self.pushed == 0) {
return false;
}
const tail = self.tail +% self.pushed;
self.buffer.tail.store(tail, .Release);
return true;
}
};
fn pop(noalias self: *Buffer) ?*Runnable {
const tail = self.tail.loadUnchecked();
var head = self.head.load(.Monotonic);
if (head == tail) {
return null;
}
const new_tail = tail -% 1;
head = switch (arch) {
.i386, .x86_64 => blk: {
_ = self.tail.fetchSub(1, .SeqCst);
break :blk self.head.load(.SeqCst);
},
.arm, .armeb, .thumb, .thumbeb => blk: {
self.tail.store(new_tail, .Monotonic);
std.atomic.fence(.SeqCst);
break :blk self.head.load(.Monotonic);
},
else => blk: {
self.tail.store(new_tail, .SeqCst);
break :blk self.head.load(.SeqCst);
},
};
const size = tail -% head;
assert(size <= capacity);
if (size >= 1) {
const runnable = self.array[new_tail % capacity].loadUnchecked();
if (size > 1) {
return runnable;
}
_ = self.head.compareAndSwap(
head,
tail,
.Acquire,
.Monotonic,
) orelse return runnable;
}
self.tail.store(tail, .Monotonic);
return null;
}
fn steal(noalias self: *Buffer) ?*Runnable {
const use_fence = switch (arch) {
.arm, .armeb, .thumb, .thumbeb => true,
else => false,
};
var head = self.head.load(if (use_fence) .Monotonic else .Acquire);
while (true) {
if (use_fence) std.atomic.fence(.SeqCst);
const tail = self.tail.load(if (use_fence) .Monotonic else .Acquire);
const size = tail -% head;
if (size <= 0) {
return null;
}
const runnable = self.array[head % capacity].load(.Unordered);
head = self.head.tryCompareAndSwap(
head,
head +% 1,
.SeqCst,
.Monotonic,
) orelse return runnable;
std.atomic.spinLoopHint();
}
}
const Popped {
runnable: *Runnable,
pushed: bool = false,
};
fn consume(noalias self: *Buffer, noalias list: *List) ?Popped {
const tail = self.tail.loadUnchecked();
if (std.debug.runtime_safety) {
assert(tail == self.head.load(.Monotonic));
}
var consumer = list.getConsumer() orelse return null;
defer consumer.release();
var pushed: i32 = 0;
while (pushed < capacity) : (pushed += 1) {
const runnable = consumer.pop() orelse break;
self.array[(tail +% pushed) % capacity].store(runnable, .Unordered);
runnable.next = null;
if (pushed != 0) {
runnable.next = self.array[(tail +% (pushed - 1)) % capacity].loadUnchecked();
}
}
const popped = consumer.pop() orelse blk: {
if (pushed == 0) break :blk null;
pushed -= 1;
break :blk self.array[(tail +% pushed) % capacity].loadUnchecked();
};
if (pushed > 0) self.tail.store(tail +% pushed, .Release);
return Popped {
.runnable = popped,
.pushed = pushed > 0,
};
}
};
const std = @import("std");
const assert = std.debug.assert;
const arch = std.Target.current.cpu.arch;
const Thread = std.Thread;
const Atomic = std.atomic.Atomic;
const Semaphore = std.Thread.Semaphore;
pub const ThreadPool = struct {
workers: []Atomic(usize),
idle_sema: Semaphore = .{},
run_queue: UnboundedQueue = .{},
waker: Atomic(usize) = Atomic(usize).init(@bitCast(usize, Waker{})),
pub fn init(threads: []usize) ThreadPool {
const workers = @ptrCast([*]Atomic(usize), threads.ptr)[0..threads.len];
for (workers) |*ptr| ptr.* = 0;
return .{ .workers = workers };
}
pub fn deinit(self: *ThreadPool) void {
self.shutdown();
self.join();
self.* = undefined;
}
pub const Runnable = struct {
runFn: fn(*Runnable) void,
next: ?*Runnable = undefined,
pub const Group = struct {
len: usize = 0,
head: *Runnable = undefined,
tail: *Runnable = undefined,
pub fn from(runnable: *Runnable) Group {
runnable.next = null;
return .{
.len = 1,
.head = runnable,
.tail = runnable,
};
}
pub fn push(self: *Group, group: Group) void {
if (group.len == 0) return;
if (self.len == 0) {
self.* = group;
} else {
self.tail.next = group.head;
self.tail = group.tail;
self.len += group.len;
}
}
pub fn pop(self: *Group) ?*Runnable {
if (self.len == 0) return null;
const runnable = self.head;
self.head = runnable.next orelse undefined;
self.len -= 1;
return runnable;
}
};
};
pub fn schedule(self: *ThreadPool, group: Runnable.Group) void {
if (group.len == 0) {
return;
}
if (Worker.getCurrent()) |worker| {
if (worker.buffer.push(group)) |overflowed| {
worker.queue.push(overflowed);
}
} else {
self.run_queue.push(group);
}
self.notify(false);
}
pub fn shutdown(self: *ThreadPool) void {
var waker = @bitCast(Waker, self.waker.load(.Monotonic));
while (waker.state != .shutdown) {
var new_waker = waker;
new_waker.state = .shutdown;
new_waker.waiting = false;
if (self.waker.tryCompareAndSwap(
@bitCast(u32, waker),
@bitCast(u32, new_waker),
.Acquire,
.Monotonic,
)) |updated| {
waker = @bitCast(Waker, updated);
continue;
}
while (waker.waiting and waker.active > 0) {
self.idle_sema.post();
waker.active -= 1;
}
return;
}
}
const State = enum(u2) {
pending = 0,
signaled,
waking,
shutdown,
};
const Waker = packed struct {
state: State = .pending,
notified: bool = false,
waiting: bool = false,
active: u28 = 0,
};
fn notify(self: *ThreadPool, is_waking: bool) void {
var waker = @bitCast(Waker, self.waker.load(.Monotonic));
while (true) {
if (waker.state == .shutdown) return;
if (is_waking) assert(waker.state == .waking);
const can_wake = is_waking or waker.state == .pending;
var new_waker = waker;
new_waker.notified = true;
if (waker.waiting and can_wake) {
new_waker.waiting = false;
new_waker.state = .signaled;
} else if (waker.active < self.workers.len and can_wake) {
new_waker.active += 1;
new_waker.state = .signaled;
} else if (is_waking) {
new_waker.state = .pending;
} else if (waker.notified) {
return;
}
if (self.waker.tryCompareAndSwap(
@bitCast(u32, waker),
@bitCast(u32, new_waker),
.Release,
.Monotonic,
)) |updated| {
waker = @bitCast(Waker, updated);
continue;
}
if (waker.waiting and can_wake) {
self.idle_sema.post();
return;
}
if (waker.active < self.workers.len and can_wake) {
if (Worker.spawn(self, waker.active)) return;
self.unregister(null, undefined);
}
return;
}
}
fn spawned(self: *ThreadPool) error{Shutdown}!bool {
var waker = @bitCast(Waker, self.waker.load(.Monotonic));
while (true) {
if (waker.state == .shutdown) return error.Shutdown;
if (!waker.notified) return false;
var new_waker = waker;
new_waker.notified = false;
if (waker.state == .signaled) {
new_waker.state = .waking;
}
if (self.waker.tryCompareAndSwap(
@bitCast(u32, waker),
@bitCast(u32, new_waker),
.Acquire,
.Monotonic,
)) |updated| {
waker = @bitCast(u32, updated);
continue;
}
retrn waker.state == .waking;
}
}
fn wait(self: *ThreadPool, _is_waking: bool) error{Shutdown}!bool {
var is_waiting = false;
var is_waking = _is_waking;
var waker = @bitCast(Waker, self.waker.load(.Monotonic));
while (true) {
if (waker.state == .shutdown) return error.Shutdown;
if (is_waking) assert(waker.state == .waking);
const is_notified = waker.notified or waker.state == .signaled;
if (is_notified or !is_waiting) {
var new_waker = waker;
new_waker.notified = false;
if (is_notified) {
if (is_waking or waker.state == .signaled) {
new_waker.state = .waking;
}
if (is_waiting) {
new_waker.waiting = true;
}
} else {
new_waker.waiting = true;
if (is_waking) {
new_waker.state = .pending;
}
}
if (@bitCast(u32, waker) != @bitCast(u32, new_waker)) {
if (self.waker.tryCompareAndSwap(
@bitCast(u32, waker),
@bitCast(u32, new_waker),
.Acquire,
.Monotonic,
)) |updated| {
waker = @bitCast(u32, updated);
continue;
}
if (is_notified) {
return is_waking or waker.state == .signaled;
}
}
}
is_waking = false;
is_waiting = true;
self.idle_sema.wait();
waker = @bitCast(Waker, self.waker.load(.Monotonic));
}
}
fn register(
noalais self: *ThreadPool,
noalias worker: *Worker,
worker_index: u32,
) void {
const ptr = &self.workers[worker_index];
assert(ptr.loadUnchecked() == 0);
comptime assert(@alignOf(Worker) >= 0b10);
const worker_ptr = @ptrToInt(worker) | 1;
ptr.store(worker_ptr, .Release);
}
fn unregister(
noalais self: *ThreadPool,
noalias maybe_worker: ?*Worker,
worker_index: u32,
) void {
if (maybe_worker) |worker| {
const ptr = &self.workers[worker_index];
assert(ptr.loadUnchecked() == @ptrToInt(worker) | 1);
const thread_ptr = @ptrToInt(worker.thread);
ptr.store(thread_ptr, .Release);
}
var inactive = @bitCast(u32, Waker{ .active = 1 });
var waker = @bitCast(Waker, self.waker.fetchSub(inactive, .Release));
assert(waker.state == .shutdown);
assert(waker.active >= 1);
if (waker.active == 1 and waker.waiting) {
self.idle_sema.post();
}
}
fn join(self: *ThreadPool) void {
var spin: usize = 10;
var is_waiting = false;
var waker = @bitCast(Waker, self.waker.load(.Acquire));
while (waker.active > 0) {
assert(waker.state == .shutdown);
if (!waker.waiting) {
if (spin > 0) {
spin -= 1;
std.atomic.spinLoopHint();
waker = @bitCast(Waker, self.waker.load(.Acquire));
continue;
}
var new_waker = waker;
new_waker.waiting = true;
if (self.waker.tryCompareAndSwap(
@bitCast(u32, waker),
@bitCast(u32, new_waker),
.Acquire,
.Acquire,
)) |updated| {
waker = @bitCast(Waker, updated);
continue;
}
}
self.idle_sema.wait();
waker = @bitCast(Waker, self.waker.load(.Acquire));
}
for (self.workers) |*ptr| {
const thread_ptr = ptr.load(.Acquire);
if (thread_ptr == 0) {
continue;
}
assert(thread_ptr & 1 != 0);
const thread = @intToPtr(*Thread, thread_ptr);
thread.wait();
}
}
const Worker = struct {
thread: *Thread,
buffer: BoundedQueue = .{},
queue: UnboundedQueue = .{},
threadlocal var tls_current: ?*Worker = null;
fn getCurrent() ?*Worker {
return tls_current;
}
fn spawn(pool: *ThreadPool, index: u32) bool {
const Spawner = struct {
_pool: *ThreadPool,
_index: u32,
_thread: *Thread = undefined,
_put_sema: Semaphore = .{},
_got_sema: Semaphore = .{},
fn entry(self: *@This()) void {
self._put_sema.wait();
const copy = self;
self._got_sema.post();
var worker: Worker = undefined;
return worker.run(
copy._pool,
copy._thread,
copy._index,
);
}
};
var spawner = Spawner{
._pool = pool,
._index = index,
};
spawner._thread = Thread.spawn(Spawner.entry, &spawner) catch return false;
spawner._put_sema.post();
spawner._got_sema.wait();
return true;
}
fn run(
noalais self: *Worker,
noalias pool: *ThreadPool,
noalias thread: *Thread,
index: u32,
) void {
tls_current = self;
self.* = { .thread = thread };
pool.register(self, index);
defer pool.unregister(self, index);
var is_waking = pool.spawned() catch return;
var prng = @truncate(u32, @ptrToInt(self) *% 31) | 1;
while (true) {
var did_push = false;
const runnable = self.pop(pool, &prng, &did_push) orelse {
is_waking = pool.wait(is_waking) catch return;
continue;
};
if (is_waking or did_push) {
pool.notify(is_waking);
}
is_waking = false;
(runnable.runFn)(runnable);
}
}
fn pop(
noalias self: *Worker,
noalias pool: *ThreadPool,
noalias prng: *u32,
noalais did_push: *bool,
) ?*Runnable {
if (self.buffer.pop()) |runnable| {
return runnable;
}
var attempts: usize = switch (arch) {
.i386, .x86_64 => 8,
else => 4,
};
while (attempts > 0) : (attempts -= 1) {
var was_contended = false;
if (self.buffer.consume(&self.queue, did_push) catch blk: {
was_contended = true;
break :blk null;
}) |runnable| {
return runnable;
}
const num_workers = @intCast(u32, pool.workers.len);
var target_index = blk: {
var rng = prng.*;
rng ^= rng << 13;
rng ^= rng >> 17;
rng ^= rng << 7;
prng.* = rng;
break :blk rng % num_workers;
};
var worker_iter = num_workers;
while (worker_iter > 0) : (worker_iter -= 1) {
const target_ptr = &pool.workers[target_index];
target_index = (target_index + 1) % num_workers;
const target = blk: {
const ptr = target_ptr.load(.Acquire);
if (ptr & 1 == 0) {
continue;
}
const worker = @intToPtr(*Worker, ptr);
if (worker == self) {
continue;
}
break :blk worker;
}
if (self.buffer.consume(&target.queue, did_push) catch blk: {
was_contended = true;
break :blk null;
}) |runnable| {
return runnable;
}
if (target.buffer.steal() catch blk: {
was_contended = true;
break :blk null;
}) |runnable| {
return runnable;
}
}
if (self.buffer.consume(&pool.run_queue, did_push) catch blk: {
was_contended = true;
break :blk null;
}) |runnable| {
return runnable;
}
if (was_contended) {
attempts += 1;
} else {
std.os.sched_yield() catch {};
}
}
return null;
}
};
const UnboundedQueue = struct {
input: Atomic(?*Runnable) = Atomic(?*Runnable).init(null),
output: Atomic(usize) = Atomic(usize).init(0),
fn push(
self: *UnboundedQueue,
group: Group,
comptime single_producer: bool,
) void {
assert(group.len > 0);
var input = self.input.load(.Monotonic);
while (true) {
group.tail.next = input;
if (input == null and single_producer) {
self.input.store(group.head, .Release);
return;
}
input = self.input.tryCompareAndSwap(
input,
group.head,
.Release,
.Monotonic,
) orelse break;
}
}
const HAS_CONSUMER: usize = 0b1;
comptime {
const alignment = ~HAS_CONSUMER + 1;
assert(@alignOf(Runnable) >= alignment);
}
fn acquire(self: *UnboundedQueue) error{Contended}?!Consumer {
var output = self.output.load(.Monotonic);
if (output == HAS_CONSUMER) {
return error.Contended;
}
if (output == 0 and self.input.load(.Monotonic) == null) {
return null;
}
acquired: {
if (arch.isX86() or arch.isRISCV()) {
output = self.output.swap(HAS_CONSUMER, .Acquire);
if (output == HAS_CONSUMER) return error.Contended;
break :blk acquired;
}
const is_aarch64 = arch == .aarch64 or arch == .aarch64_be or arch == .aarch64_32;
if (arch.isARM() or arch.isThumb() or arch.isPPC() or arch.isPPC64() or arch.isMIPS() or is_aarch64) {
while (true) {
output = self.output.tryCompareAndSwap(
output,
HAS_CONSUMER,
.Acquire,
.Monotonic,
) orelse break :blk acquired;
if (output == HAS_CONSUMER) {
return error.Contended;
}
if (output == 0 and self.input.load(.Monotonic) == null) {
return null;
}
}
}
_ = self.output.compareAndSwap(
output,
HAS_CONSUMER,
.Acquire,
.Monotonic,
) orelse break :blk acquired;
return error.Contended;
}
assert(output & HAS_CONSUMER == 0);
return Consumer{
.queue = self,
.output = @intToPtr(?*Runnable, output),
};
}
const Consumer = struct {
queue: *UnboundedQueue,
output: ?*Runnable,
fn pop(self: *Consumer) ?*Runnable {
if (self.output) |runnable| {
self.output = runnable.next;
return runnable;
}
var input = self.queue.input.load(.Monotonic);
if (input == null) {
return null;
}
input = self.queue.input.swap(null, .Acquire);
const runnable = input orelse unreachable;
self.output = runnable.next;
return runnable;
}
fn release(self: Consumer) void {
const output = @ptrToInt(self.output);
self.queue.output.store(output, .Release);
}
};
};
const BoundedQueue = struct {
head: Atomic(isize) = Atomic(isize).init(0),
tail: Atomic(isize) = Atomic(isize).init(0),
array: [capacity]Atomic(*Runnable) = undefined,
const capacity = 256;
fn push(self: *BoundedQueue, group: Group) ?Group {
assert(group.len > 0);
if (group.len > 1) {
return group;
}
var mutable_group = group;
const runnable = mutable_group.pop();
assert(mutable_group.len == 0);
var tail = self.tail.loadUnchecked();
var head = self.head.load(.Monotonic);
while (true) {
const size = tail -% head;
assert(size >= 0);
assert(size <= capacity);
if (size < capacity) {
self.array[tail % capacity].store(runnable, .Unordered);
self.tail.store(tail +% 1, .Release);
return null;
}
const new_head = head +% (capacity / 2);
if (self.head.tryCompareAndSwap(head, new_head, .Acquire, .Monotonic)) |updated| {
head = updated;
continue;
}
var overflowed = Group.from(runnable);
while (head != new_head) : (head +%= 1) {
const migrated = self.array[head % capacity].loadUnchecked();
overflowed.push(Group.from(migrated));
}
return overflowed;
}
}
fn pop(self: *BoundedQueue) ?*Runnable {
const tail = self.tail.loadUnchecked();
var head = self.head.load(.Unordered);
if (head == tail) {
return null;
}
const new_tail = tail +% 1;
self.tail.store(new_tail, .SeqCst);
head = self.head.load(.SeqCst);
const size = tail -% head;
assert(size >= 0);
assert(size <= capacity);
if (head == tail) {
self.tail.store(tail, .Monotonic);
return null;
}
const runnable = self.array[new_tail % capacity].loadUnchecked();
if (head != new_tail) {
return runnable;
}
if (self.head.compareAndSwap(head, tail, .SeqCst, .Monotonic)) |_| {
self.tail.store(tail, .Monotonic);
return null;
}
return runnable;
}
fn steal(self: *BoundedQueue) error{Contended}!?*Runnable {
const head = self.head.load(.Acquire);
const tail = self.tail.load(.Acquire);
const size = tail -% head;
if (size <= 0) {
return null;
}
const runnable = self.array[head % capacity].load(.Unordered);
if (self.head.compareAndSwap(head, head +% 1, .SeqCst, .Monotonic)) |_| {
return error.Contended;
}
return runnable;
}
fn consume(
noalias self: *BoundedQueue,
noalias unbounded: *UnboundedQueue,
noalias did_push: *bool,
) error{Contended}!?*Runnable {
var consumer = (try unbounded.acquire()) orelse return null;
defer consumer.release();
var pushed = false;
var runnable = consumer.pop();
var tail = self.tail.loadUnchecked();
var head = self.head.load(.Monotonic);
while (true) {
const size = tail -% head;
assert(size >= 0);
assert(size <= capacity);
if (size == capacity) {
if (pushed) {
head = self.head.load(.Monotonic);
if (head != tail) {
pushed = false;
continue;
}
}
break;
}
const consumed = consumer.pop() orelse break;
self.array[tail % capacity].store(consumed, .Unordered);
pushed = true;
tail +%= 1;
}
if (runnable == null) runnable = consumer.pop();
if (runnable == null and tail != head) {
tail -%= 1;
runnable = self.array[tail % capacity].loadUnchecked();
}
if (tail != head) {
did_push.* = true;
self.tail.store(tail, .Release);
}
return runnable;
}
};
};
const Scheduler = struct {
registry: Registry,
injected: Queue.List = .{},
pub fn init() void {
}
pub const Resumed = enum {
Signaled,
Spawned,
};
pub const Notified = struct {
resumed: Resumed,
scheduler: *Scheduler,
pub fn complete(self: Notified, succeeded: bool) void {
if (self.resumed == .Spawned and !succeeded) {
self.registry.unregister(null)
}
}
}
pub fn schedule(self: *Scheduler) void {
self.injected.push(batch);
self.registry.notify();
}
pub fn shutdown(self: *Scheduler) void {
self.registry.shutdown();
}
};
const Worker = struct {
node: Registry.Node = undefined,
scheduler: *Scheduler,
queue: Queue = .{},
pub fn register(
noalias self: *Worker,
noalias scheduler: *Scheduler,
) error{Shutdown}!void {
}
pub fn shutdown(self: *Worker) error{Ready}!void {
self.scheduler.no
}
pub const Poll = struct {
node: *Queue.Node,
notified: ?Scheduler.Notified,
};
pub fn poll(self: *Worker) error{Shutdown}!?Poll {
switch (self.node.getStatus()) {
.uninit => unreachable,
.waiting => unreachable,
.waking, .running => {},
}
const result = self.pop() orelse return null;
const notify = try self.scheduler.registry.processed(
&self.node,
result.pushed,
);
var notified: ?Scheduler.Notified = null;
if (notify) |notification| {
notified = .{
.scheduler = self.scheduler,
.resumed = switch (notification) {
.Signal => .Signaled,
.Register => .Spawned,
},
};
}
return Poll{
.node = result.node,
.notified = notified,
};
}
pub fn wait(self: *Worker, externally_notified: bool) error{Ready, Shutdown}!void {
return self.scheduler.registry.wait(
&self.node,
externally_notified,
);
}
fn pop(self: *Worker) ?Queue.PopResult {
var local_queue_empty = false;
var attempts: usize = switch (arch) {
.i386, .x86_64 => 8,
else => 4,
};
const scheduler = self.scheduler;
while (true) {
var was_contended = contended: {
if (local_queue_empty) {
break :contended false;
}
return self.queue.pop() catch |err| {
local_queue_empty = err == error.Empty;
break :contended (err == error.Contended);
}
};
var num_workers = scheduler.registry.count();
while (num_workers > 0) : (num_workers -= 1) {
const target_node = self.nodes.next() orelse blk: {
self.nodes = scheduler.registry.iter();
breal :blk self.nodes.next() orelse unreachable;
};
const target_worker = @fieldParentPtr(Worker, "node", target_node);
if (target_worker == self) {
continue;
}
return self.queue.steal(&target_worker.queue) catch |err| {
was_contended = was_contended or (err == error.Contended);
continue;
};
}
was_contended = contended: {
return self.queue.consume(&scheduler.injected) catch |err| {
break :blk (was_contended or (err == error.Contended));
};
};
if (was_contended) {
continue;
}
attempts -= 1;
if (attempts == 0) {
return null;
}
}
}
};
const Registry = struct {
pub const Node = struct {
data: Atomic(usize) align(std.math.max(@alignOf(usize), 4)),
pub const Status = enum(u2) {
uninit,
waiting,
waking,
running,
};
fn getData(self: *const Node) usize {
return self.data.load(.Unordered);
}
fn setData(self: *Node, data: usize) void {
self.data.store(data, .Unordered);
}
pub fn getStatus(self: Node) Status {
return @intToEnum(Status, @truncate(u2, self.getData()));
}
fn setStatus(self: *Node, status: Status) void {
self.setData((self.data & ~@as(usize, 0b11)) | @enumToInt(status));
}
fn getNext(self: Node) ?*Node {
return @intToPtr(?*Node, self.getData() & ~@as(usize, 0b11));
}
fn setNext(self: *Node, next: ?*Node) void {
self.setData((self.data & 0b11) | @ptrToInt(next));
}
};
const State = enum(u2) {
pending,
signaled,
waking,
shutdown,
};
const Sync = packed struct {
state: State = .pending,
waiting: bool = false,
notified: bool = false,
active: NodeCount = 0,
};
pub const NodeCount = std.meta.Int(
.unsigned,
std.meta.bitCount(usize) - 2 - 1 - 1,
);
max_nodes: NodeCount,
active: Atomic(?*Node) = Atomic(?*Node).init(null),
sync: Atomic(usize) = Atomic(usize).init(@bitCast(usize, Sync{})),
pub const Notify = union(enum) {
Signal,
Register,
};
pub fn notify(self: *Registry) error{Pending, Shutdown}!Notify {
const sync = @bitCast(Sync, self.sync.load(.Monotonic));
if (sync.notified) {
return error.Pending;
}
return self.@"resume"(false);
}
fn @"resume"(
self: *Registry,
is_waking: bool,
) error{Pending, Shutdown}!Notify {
@setCold(true);
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (true) {
const can_wake = switch (sync.state) {
.pending => if (is_waking) unreachable else true,
.waking => is_waking,
.signaled => if (is_waking) unreachable else false,
.shutdown => return error.Shutdown,
};
var new_sync = sync;
new_sync.notified = true;
if (sync.waiting and can_wake) {
new_sync.waiting = false;
new_sync.state = .signaled;
} else if (sync.active < self.max_nodes and can_wake) {
new_sync.active += 1;
new_sync.state = .signaled;
} else if (is_waking) {
new_sync.state = .pending;
} else if (sync.notified) {
return error.Pending;
}
if (self.sync.tryCompareAndSwap(
@bitCast(usize, sync),
@bitCast(usize, new_sync),
.Release,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
if (sync.waiting and can_wake) {
return .Signal;
} else if (sync.active < self.max_nodes and can_wake) {
return .Register;
} else {
return error.Pending;
}
}
}
pub fn register(
noalias self: *Registry,
noalias node: *Node,
) error{Shutdown}!void {
var active = self.active.load(.Monotonic);
while (true) {
node.setNext(active);
node.setStatus(.running);
active = self.active.tryCompareAndSwap(
active,
node,
.Release,
.Monotonic,
) orelse break;
}
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (true) {
var new_sync = sync;
new_sync.notified = false;
switch (sync.state) {
.pending, .waking => if (!sync.notified) return,
.signaled => new_sync.state = .waking,
.shutdown => return error.Shutdown,
}
if (self.sync.tryCompareAndSwap(
@bitCast(usize, sync),
@bitCast(usize, new_sync),
.Acquire,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
return node.setStatus(switch (sync.state) {
.signaled => .waking,
else => .running,
});
}
}
pub fn unregister(
noalias self: *Registry,
noalias registered_node: ?*Node,
) error{Pending}!void {
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
assert(sync.state == .shutdown);
assert(sync.active > 0);
sync = @bitCast(Sync, blk: {
const inactive = @bitCast(usize, Sync{ .active = 1 });
break :blk self.sync.fetchSub(inactive, .AcqRel);
});
assert(sync.state == .shutdown);
assert(sync.active > 0);
if (sync.active > 1) {
return error.Pending;
}
}
pub fn processed(
noalias self: *Registry,
noalias node: *Node,
should_notify: bool,
) error{Shutdown}!?Notify {
const is_waking = switch (node.getStatus()) {
.uninit => unreachable,
.waiting => unreachable,
.waking => true,
.running => false,
};
var notified: ?Notify = null;
if (should_notify or is_waking) {
notified = try self.@"resume"(is_waking);
}
node.setStatus(.running);
return notified;
}
pub fn count(self: *const Registry) NodeCount {
const sync_value = self.sync.load(.Monotonic);
const sync = @bitCast(Sync, sync_value);
return sync.active;
}
pub fn iter(self: *const Registry) Iter {
const top = self.active.load(.Acquire);
return Iter{ .node = top };
}
pub const Iter = struct {
node: ?*Node,
pub fn next(self: *Iter) ?*Node {
const node = self.node orelse return null;
self.node = node.getNext();
return node;
}
};
pub fn wait(
noalias self: *Registry,
noalias node: *Node,
is_notified: bool,
) error{Ready, Shutdown}!void {
@setCold(true);
const status = node.getStatus();
assert(status != .uninit);
const is_waking = status == .waking;
const is_waiting = status == .waiting;
self.@"suspend"(is_waking, is_waiting, is_notified);
}
fn @"suspend"(
self: *Registry,
is_waking: bool,
is_waiting: bool,
is_notified: bool,
) error{Ready, Shutdown}!void {
@setCold(true);
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (true) {
const signaled = switch (sync.state) {
.pending => if (is_waking) unreachable else false,
.waking => false,
.signaled => if (is_waking) unreachable else true,
.shutdown => return error.Shutdown;
};
var new_sync = sync;
new_sync.notified = false;
const was_notified = signaled or sync.notified or is_notified;
if (was_notified) {
if (signaled or is_waking) new_sync.state = .waking;
if (is_waiting) new_sync.waiting = true;
} else if (!sync.waiting or is_waking) {
if (is_waking) new_sync.state = .pending;
new_sync.waiting = true;
} else {
node.setStatus(.waiting);
return;
}
if (self.sync.tryCompareAndSwap(
@bitCast(usize, sync),
@bitCast(usize, new_sync),
.Acquire,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
if (was_notified) {
const new_status = if (signaled or is_waking) .waking else .runninng;
node.setStatus(new_status);
return error.Ready;
}
node.setStatus(.waiting);
return;
}
}
pub fn shutdown(self: *Registry) bool {
var sync = @bitCast(Sync, self.sync.load(.Monotonic));
while (true) {
assert(sync.state != .shutdown);
var new_sync = sync;
new_sync.state = .shutdown;
new_sync.waiting = false;
if (self.sync.tryCompareAndSwap(
@bitCast(usize, sync),
@bitCast(usize, new_sync),
.AcqRel,
.Monotonic,
)) |updated| {
sync = @bitCast(Sync, updated);
continue;
}
return sync.waiting;
}
}
};
const Queue = struct {
pub const Node = struct {
next: ?*Node,
};
/// An ordered set of *Nodes forming a linked list.
pub const Batch = struct {
head: ?*Node = null,
tail: *Node = undefined,
pub fn from(node: *Node) Batch {
node.next = null;
return .{
.head = node,
.tail = node,
};
}
pub fn push(self: *Batch, other: Batch) void {
if (self.head == null) {
self.* = other;
} else if (other.head != null) {
self.tail.next = other.head;
self.tail = other.tail;
}
}
pub fn pop(self: *Batch) ?*Node {
const node = self.head orelse return null;
self.head = node.next;
return node;
}
};
/// An unbounded MPSC of Nodes that are only available to this Queue
local: List = .{},
/// An unbounded, non-blocking MPMC of Nodes that can be stolen from by other Queues
backlog: List = .{},
/// A bounded, SPMC, fast-access ring-buffer of *Nodes for `backlog` that can be stolen from by other Queues
buffer: Buffer = .{},
/// The context in which Queue.push() is being called under
pub const PushContext = enum {
/// Push to the queue's local list. This can used by any thread.
local,
/// Push to the queue's backlog list. This can be used by any thread, generally not the Queue's thread.
remote,
/// Push to either the queue's buffer or backlog. Can be called only by the thread which owns the Queue.
buffered,
};
pub fn push(
self: *Queue,
batch: Batch,
comptime context: PushContext,
) void {
switch (context) {
.local => self.local.push(batch),
.remote => self.backlog.push(batch),
.buffered => {
const overflowed = self.buffer.push(batch) orelse return;
self.backlog.pushUnchecked(overflowed);
},
}
}
pub const PopResult = struct {
/// The *Node which was dequeued from the Queue
node: *Node,
/// True if the dequeuing of this *Node resulting in pushing extra *Nodes
pushed: bool = false,
};
pub fn pop(self: *Queue) error{Empty, Contended}!PopResult {
// Nodes local to this Queue take priority over everything else.
if (self.local.popUnchecked()) |node| {
return PopResult{ .node = node };
}
// Check the buffer first before the backlog as it's much faster to access.
if (self.buffer.pop()) |node| {
return PopResult{ .node = node };
}
// The buffer is empty, try to pop *Nodes from the backlog and refill the buffer in the process.
var consumer = try self.backlog.tryAcquireConsumer();
var pushed: bool = false;
const node = self.buffer.consume(&consumer, &pushed);
consumer.release();
return PopResult{
.node = node,
.pushed = pushed,
};
}
pub fn consume(
noalias self: *Queue,
noalais list: *List,
) error{Empty, Contended}!PopResult {
var consumer = try target.backlog.tryAcquireConsumer();
defer consumer.release();
var pushed: bool = false;
const node = self.buffer.consume(consumer, &pushed);
return PopResult{
.node = node,
.pushed = pushed,
};
}
pub fn steal(
noalias self: *Queue,
noalias target: *Queue,
) error{Empty, Contended}!PopResult {
// Check the target Queues backlog before the buffer.
// This minimizes contention with the Queue's pop() thread.
var was_contended = contended: {
return self.consume(&target.backlog) catch |err| {
break :blk contended (err == error.Contended);
};
};
// Try to steal from the target Queues buffer if the backlog is unavailable.
// Never steal from its local List as that is private to the target Queue.
was_contended = blk: {
const node = target.buffer.steal() catch |err| switch (err) {
error.Empty => break :blk was_contended,
error.Contended => break :blk true,
};
return PopResult{ .node = node };
};
if (was_contended) return error.Contended;
return error.Empty;
}
/// Unbounded, Multi-Producer-(Single | non-blocking-Multi)-Consumer queue of *Nodes
const List = struct {
input: Atomic(?*Node) = Atomic(?*Node).init(null),
output: Atomic(usize) = Atomic(usize).init(0),
// Use the low bit of a node pointer to indicate that the output has an active consumer.
const HAS_CONSUMER: usize = 0b1;
comptime {
const consumer_alignment = ~HAS_CONSUMER + 1;
assert(@alignOf(Node) >= consumer_alignment);
}
pub fn push(self: *List, batch: Batch) void {
return self.pushToStack(false, batch);
}
pub fn pushUnchecked(self: *List, batch: Batch) void {
return self.pushToStack(true, batch);
}
fn pushToStack(self: *List, comptime is_only_producer: bool, batch: Batch) void {
const head = batch.head orelse return;
const tail = batch.tail;
var stack = self.stack.load(.Monotonic);
while (true) {
// Prepend our batch on top of the existing stack if any
tail.next = stack;
// Optimization to avoid the RMW operation below if there is only one producer
if (stack == null and is_only_producer) {
self.stack.store(head, .Release);
break;
}
// Update the stack with our prepended batch.
// Release barrier so that Acquire swap in Consumer.pop() sees well-formed .next links.
stack = self.stack.tryCompareAndSwap(
stack,
head,
.Release,
.Monotonic,
) orelse break;
}
}
pub fn popUnchecked(self: *List) ?*Node {
// Effectively the following, but without consumer-side synchronization:
//
// var consumer = self.tryAcquireConsumer() catch unreachable;
// defer consumer.release();
// return consumer.pop();
//
var output = self.output.loadUnchecked();
assert(output & HAS_CONSUMER == 0);
self.output.storeUnchecked(HAS_CONSUMER);
var consumer = Consumer{
.stack = @intToPtr(?*Node, output),
.list = self,
};
const node = consumer.pop();
output = @ptrToInt(consumer.stack); // & ~HAS_CONSUMER
self.output.storeUnchecked(output);
return node;
}
pub fn tryAcquireConsumer(self: *List) error{Empty, Contended}!Consumer {
// Check if the List can be consumed without invalidating its cache-lines with an RMW op
var output = self.output.load(.Monotonic);
if (output == HAS_CONSUMER) {
return error.Contended;
}
if (output == 0 and self.input.load(.Monotonic) == null) {
return error.Empty;
}
// Select the best strategy to use for acquiring the Consumer based on the architecture.
const Strategy = enum { xchg, cmpxchg, ll_sc };
comptime var strategy = Strategy.cmpxchg;
inline for (.{
.{Strategy.xchg, .{"x86", "riscv"}},
.{Strategy.ll_sc, .{"arm", "aarch64", "mips", "powerpc"}},
}) |cpu_strategy| {
inline for (cpu_strategy[1]) |cpu_name| {
if (comptime std.mem.eql(u8, comptime arch.genericName(), cpu_name)) {
strategy = cpu_strategy[0];
}
}
}
switch (strategy) {
// Optimized for platforms where swap is fast or a single instruction
.xchg => {
output = self.output.swap(HAS_CONSUMER, .Acquire);
if (output == HAS_CONSUMER) {
return error.Contended;
}
},
// The default strategy
.cmpxchg => acquired: {
_ = self.output.compareAndSwap(
output,
HAS_CONSUMER,
.Acquire,
.Monotonic,
) orelse break :acquired {};
return error.Contended;
},
// Optimized for platforms where the cmpxchg default strategy is a loop
// in which case the cmpxchg loop can be implemented more efficiently.
.llsc => while (true) {
output = self.output.tryCompareAndSwap(
output,
HAS_CONSUMER,
.Acquire,
.Monotonic,
) orelse break;
if (output == HAS_CONSUMER) {
return error.Contended;
}
if (output == 0 and self.input.load(.Monotonic) == null) {
return error.Empty;
}
},
}
return Consumer{
.stack = @intToPtr(?*Node, output & ~HAS_CONSUMER),
.list = self,
};
}
pub const Consumer = struct {
stack: ?*Node,
list: *List,
pub fn pop(self: *Consumer) ?*Node {
// Check the local stack of Nodes first.
if (self.stack) |node| {
self.stack = node.next;
return node;
}
// Check if there's any Nodes pushed without doing the RMW operation below.
var input = self.list.input.load(.Monotonic);
if (input == null) {
return null;
}
// Steal any pushed Nodes and use the remaining for the local stack
const node = self.list.input.swap(null, .Acquire) orelse unreachable;
self.stack = node.next;
return node;
}
pub fn release(self: Consumer) void {
// Update the List's output with the remaining Nodes
// while also unsetting the HAS_CONSUMER bit for other threads to start dequeueing.
const new_output = @ptrToInt(self.stack); // & ~HAS_CONSUMER
self.list.output.store(new_output, .Release);
}
};
};
// Bounded, Single-Producer-Multi-Consumer, Ring-Buffer of *Nodes
const Buffer = struct {
head: Atomic(Index) = Atomic(Index).init(0),
tail: Atomic(Index) = Atomic(Index).init(0),
array: [capacity]Atomic(*Node) = undefined,
// Found to be the point of diminishing returns in terms of scheduler throughput.
// Appears to also be the queue size used in Go's scheduler:
// https://github.com/golang/go/blob/master/src/runtime/runtime2.go#L626
const capacity = 256;
// Ensure that the head and tail indices have valid integer types.
const Index = std.meta.Int(.signed, std.meta.bitCount(usize) / 2);
comptime {
assert(std.math.maxInt(Index) >= capacity);
assert(@typeInfo(Index).Int.signedness == .signed);
}
pub fn push(self: *Buffer, batch: Batch) ?Batch {
// Make sure there is a node to push
const node = batch.head orelse return null;
// Don't push more than one node to the queue.
// Batches of multiple nodes should be pushed to unbounded queues instead.
if (node.next != null) {
return batch;
}
// Since we're the producer, the tail can't change so it doesn't need atomics.
// Other threads also aren't publishing memory to us so Monotonic is fine for the head.
const tail = self.tail.loadUnchecked();
var head = self.head.load(.Monotonic);
while (true) {
const size = tail -% head;
assert(size <= capacity);
// Push to the array if it's not full.
// The array slot must be written to atomically since there could be a concurrent reader on wrap around.
// The tail must be updated with a Release barrier so that stealing threads see the node written into the array on tail Acquire.
if (size < capacity) {
self.array[tail % capacity].store(node, .Unordered);
self.tail.store(tail +% 1, .Release);
return null;
}
// The array is full, try to move some node out to make space for future push() operations.
// We migrate only half of the array's nodes so that subsequent pop() operations don't fail either.
// Acquire barrier to ensure that the Node modifications done below only happen on a successful steal and not reorderd before it.
var migrate = size / 2;
if (self.head.tryCompareAndSwap(
head,
head +% migrate,
.Acquire,
.Monotonic,
)) |updated| {
head = updated;
continue;
}
// Create a batch of the migrated nodes to report as overflowing.
// Iterate backwards in the array as the most recent nodes are more likely to be in cache.
// NOTE: Creating this batch already thrashes the cache by dereferencing the intrusive nodes...
var overflowed = Batch.from(node);
head +%= (migrate - 1);
while (migrate > 0) : (migrate -= 1) {
const migrated = self.array[head % capacity].loadUnchecked();
overflowed.push(Batch.from(migrated));
head -%= 1;
}
return overflowed;
}
}
/// SeqCst fence is slower on most architectures that are either
/// strongly-ordered (i.e. x86 mfence) or have operational orderings (i.e. aarch64 ldar/stlr).
///
/// On Armv7 and lower, it doesn't generate as any barriers so it's more effective while being correct:
/// https://fzn.fr/readings/ppopp13.pdf
const use_seq_cst_fence = arch.isARM() or arch.isThumb();
pub fn pop(self: *Buffer) ?*Node {
// Read the tail without synchronization as we're the only thread that can update it.
const tail = self.tail.loadUnchecked();
// Bail if the array isn't empty without invaliding the cache-line from the tail update below.
var head = self.head.load(.Monotonic);
if (tail == head) {
return null;
}
// Speculatively update the tail with the assumption that we dequeued from the array.
// The tail must be updated before observing the head which can't be ensured without SeqCst:
//
// reads / writes -----------
// ^ |
// | store(tail, Release) -| |
// x | | x
// | |- load(head, Acquire) |
// | v
// |----- reads / writes
//
// SeqCst *on both* the store() and the load() ensure a total-ordering between
// both atomics which prevents them from being re-ordered around each other.
if (use_seq_cst_fence) {
self.tail.store(tail -% 1, .Monotonic);
std.atomic.fence(.SeqCst);
head = self.head.load(.Monotonic);
} else {
self.tail.store(tail -% 1, .SeqCst);
head = self.head.load(.SeqCst);
}
// If the array becomes empty (head was updated after store() but before load())
// then we need to restore the tail to its original value so that head == tail.
if (head == tail) {
self.tail.store(tail, .Monotonic);
return null;
}
// Updating the tail speculatively means that the node is already claimed.
// However, the last node in the array must be handled specially below.
const node = self.array[(tail -% 1) % capacity].loadUnchecked();
if (head -% tail > 1) {
return node;
}
// A steal() thread may see a valid head/tail and get preempted before it updates the head.
// If we don't update the head on the last node, the preempted stealer could update it and assume it owns the node.
//
// Update the head with SeqCst which synchronizes with the ordering of the store/load above:
// - it also implies Acquire to prevent node updates from being reordered before the "local steal"
// Restore the tail on failure to do so.
if (self.head.compareAndSwap(
head,
tail,
.SeqCst,
.Monotonic,
)) |udpated| {
self.tail.store(tail, .Monotonic);
return null;
}
return node;
}
pub fn steal(self: *Buffer) error{Empty, Contended}!*Node {
// Acquire barrier on head to prevent the tail load below from being reordered before it.
const head = self.head.load(.Acquire);
if (use_seq_cst_fence) {
std.atomic.fence(.SeqCst);
}
// Acquire barrier on tail load to ensure we see the Released node in the array.
const tail = self.tail.load(.Acquire);
// The tail can be smaller-than/precedessor-of the head when the pop() thread
// speculatively updates the tail while the queue is empty (head == tail).
if (tail -% head <= 0) {
return error.Empty;
}
// Read the node from the array before actually performing the steal.
// If read after, the node could have been written over by a push() and give an unstolen node.
// Also read from the array atomically given a push() could write over it with a valid node on wrap.
const node = self.array[head % capacity].load(.Unordered);
// Try to advance the head, therefor claiming the read node above.
// SeqCst on success to synchronize with the ordering of store() and CAS() in push():
// - it also implies Release to prevent the node read from being reordered after the steal.
// - it also implies Acquire to prevent node updates from being reordered before the steal.
if (self.head.compareAndSwap(
head,
head +% 1,
.SeqCst,
.Monotonic,
)) |updated| {
return error.Contended;
}
return node orelse unreachable;
}
pub fn consume(
noalias self: *Buffer,
noalias consumer: *List.Consumer,
noalias pushed_to_buffer: *bool,
) ?*Node {
var pushed: usize = 0;
var popped = consumer.pop();
const tail = self.tail.loadUnchecked();
while (true) {
// Reload the head in case any stealer threads updated it.
// Relaxed ordering as it's not being used to synchronize any memory between threads.
const head = self.head.load(.Monotonic);
// Bail if the buffer is as full as it can get.
const size = (tail +% pushed) -% head;
assert(size <= capacity);
if (size == capacity) {
break;
}
// Try to dequeue a node from the Consumer and push it to the array.
// Writing into the array must be done atomically due to concurrent stealing threads reading from it.
const node = consumer.pop() orelse break;
assert(pushed < capacity);
self.array[(tail +% pushed) % capacity].store(node, .Unordered);
pushed += 1;
}
// Use one of the array's nodes as the return value if the first consumer.pop() failed.
if (popped == null and pushed > 0) {
pushed -= 1;
popped = self.array[(tail +% pushed) % capacity].loadUnchecked();
}
// Update the tail if there were nodes pushed into the array.
// Release ordering on tail update so stealers see the pushed nodes in the array with an Acquire tail load.
if (pushed > 0) {
pushed_to_buffer.* = true;
self.tail.store(tail +% pushed, .Release);
}
return popped;
}
};
};
const std = @import("std");
const Task = @This();
next: ?*Task align(std.math.max(4, @alignOf(usize))) = undefined,
onExecute: fn(*Task) void,
pub fn execute(self: *Task) void {
return (self.onExecute)(self);
}
pub const Platform = struct {
callFn: fn(*Platform, Command) void,
pub fn call(self: *Platform, command: Command) void {
return (self.callFn)(self, command);
}
pub const Command = union(enum) {
WorkerWait: struct {
worker: *Worker,
},
WorkerNotify: struct {
worker: *Worker,
},
WorkerSpawn: struct {
scheduler: *Scheduler,
spawned: *bool,
},
WorkerSignal: struct {
scheduler: *Scheduler,
max_notify: usize,
},
WorkerPollFirst: struct {
worker: *Worker,
be_fair: *bool,
attempts: *usize,
},
WorkerPollYield: struct {
worker: *Worker,
be_fair: *bool,
attempt: *usize,
},
WorkerPollLast: struct {
worker: *Worker,
},
WorkerExecute: struct {
worker: *Worker,
task: *Task,
},
WorkerJoinWait: struct {
worker: *Worker,
scheduler: *Scheduler,
},
WorkerJoinNotify: struct {
worker: *Worker,
scheduler: *Scheduler,
},
ShutdownWait: struct {
scheduler: *Scheduler,
},
ShutdownSignal: struct {
scheduler: *Scheduler,
waiting: usize,
},
};
};
pub const Group = struct {
len: usize = 0,
head: *Task = undefined,
tail: *Task = undefined,
pub fn from(task: *Task) Group {
task.next = null;
return .{
.len = 1,
.head = task,
.tail = task,
};
}
pub fn push(self: *Group, group: Group) void {
if (self.len == 0) {
self.* = group;
} else if (group.len > 0) {
self.tail.next = group.head;
self.tail = group.tail;
self.len += group.len;
}
}
pub fn pop(self: *Group) ?*Task {
if (self.len == 0) {
return null;
} else {
const task = self.head;
self.head = task.next orelse undefined;
self.len -= 1;
return task;
}
}
};
pub const SchedHints = struct {
affinity: ?*Worker = null,
};
pub const Worker = struct {
state: usize,
next_target: ?*Worker = null,
next_spawned: ?*Worker = null,
run_queue: struct {
local: UnboundedQueue,
overflow: UnboundedQueue,
buffer: BoundedQueue,
},
const State = struct {
scheduler: ?*Scheduler = null,
is_waking: bool = false,
did_schedule: bool = false,
is_monitoring: bool = false,
comptime {
const alignment = 0b111 + 1;
std.debug.assert(@alignOf(Scheduler) >= alignment);
}
fn pack(self: SchedState) usize {
return @ptrToInt(self.scheduler)
| (@as(usize, @boolToInt(self.is_waking)) << 0)
| (@as(usize, @boolToInt(self.did_schedule)) << 1)
| (@as(usize, @boolToInt(self.is_monitoring)) << 2);
}
fn unpack(state: usize) State {
return .{
.scheduler = @intToPtr(?*Scheduler, state & ~@as(usize, 0b111)),
.is_waking = state & (1 << 0) != 0,
.did_schedule = state & (1 << 1) != 0,
.is_monitoring = state & (1 << 2) != 0,
};
}
};
pub const Iter = struct {
worker: ?*Worker,
pub fn next(self: *Iter) *Worker {
const worker = self.worker orelse return null;
self.worker = worker.next_spawned;
return worker;
}
};
pub const Config = struct {
be_fair: bool,
scheduler: *Scheduler,
};
pub fn run(self: *Worker, config: Config) void {
const scheduler = config.scheduler;
self.* = .{
.state = (State{
.scheduler = scheduler,
.is_waking = true,
}).pack(),
.run_queue = .{
.local = UnboundedQueue.init(config.be_fair),
.overflow = UnboundedQueue.init(config.be_fair),
.buffer = .{},
},
};
self.next_spawned = @atomicLoad(?*Worker, &scheduler.spawned, .Monotonic);
while (true) {
self.next_spawned = @cmpxchgWeak(
?*Worker,
&scheduler.spawned,
self.next_spawned,
self,
.Release,
.Monotonic,
) orelse break;
}
self.process();
scheduler.markWorkerShutdown();
scheduler.platform.call(.{
.WorkerJoinWait = .{
.worker = self,
.scheduler = scheduler,
},
});
}
pub fn getScheduler(self: Worker) *Scheduler {
const state = State.unpack(self.state);
return state.scheduler orelse unreachable;
}
pub fn schedule(self: *Worker, hints: SchedHints, group: Group) void {
if (group.len == 0) {
return;
}
if (hints.affinity) |worker| {
worker.run_queue.local.push(group);
return self.getScheduler().platform.call(.{
.WorkerNotify = .{
.worker = worker,
},
});
}
if (self.run_queue.buffer.push(group)) |overflowed_group| {
self.run_queue.overflow.push(overflowed_group);
}
var state = State.unpack(self.state);
if (state.is_monitoring) {
state.did_schedule = true;
self.state = state.pack();
return;
}
const scheduler = state.scheduler orelse unreachable;
scheduler.notify(false);
}
fn process(self: *Worker) void {
while (blk: {
const state = State.unpack(self.state);
break :blk state.scheduler;
}) |scheduler| {
const task = self.poll(scheduler) orelse {
scheduler.wait(self);
continue;
};
scheduler.platform.call(.{
.WorkerExecute = .{
.worker = self,
.task = task,
},
});
}
}
fn poll(self: *Worker, scheduler: *Scheduler) ?*Task {
var state = State.unpack(self.state);
std.debug.assert(!state.did_schedule);
std.debug.assert(!state.is_monitoring);
state.is_monitoring = true;
self.state = state.pack();
const polled_task = self.pollTask(scheduler, &state.did_schedule);
const poll_scheduled = state.did_schedule;
state = State.unpack(self.state);
std.debug.assert(state.is_monitoring);
if (polled_task) |task| {
if (state.is_waking or state.did_schedule or poll_scheduled) {
scheduler.notify(state.is_waking);
}
}
state = State{ .scheduler = scheduler };
self.state = state.pack();
return polled_task;
}
fn pollTask(self: *Worker, scheduler: *Scheduler, did_schedule: *bool) ?*Task {
var be_fair = false;
var poll_attempts: usize = 0;
var poll_result: ?*Task = null;
scheduler.platform.call(.{
.WorkerPollFirst = .{
.worker = self,
.be_fair = &be_fair,
.result = &poll_result,
.attempts = &poll_attempts,
},
});
if (poll_result) |task| return task;
if (be_fair) {
if (scheduler.run_queue.tryAcquireConsumer()) |*consumer| {
defer consumer.release();
if (consumer.pop()) |task| {
return task;
}
}
}
var attempt: usize = 0;
while (attempt < std.math.max(1, poll_attempts)) : (attempt += 1) {
const QueueTarget = enum{ local, buffer, overflow };
var queue_targets = [_]QueueTarget{ .local, .buffer, .overflow, };
if (be_fair) {
queue_targets = [_]QueueTarget{ .buffer, .overflow, .local };
}
for (queue_targets) |queue_target| {
if (switch (local_queue) {
.local => self.run_queue.local.popUnchecked(),
.buffer => self.run_queue.buffer.pop(be_fair),
.overflow => blk: {
const stole = self.run_queue.buffer.steal(&self.run_queue.buffer.overflow) orelse break :blk null;
did_schedule.* = stole.pushed;
break :blk stole.task;
},
}) |task| {
return task;
}
}
if (self.run_queue.buffer.steal(&scheduler.run_queue)) |stole| {
did_schedule.* = stole.pushed;
return stole.task;
}
var worker_iter = scheduler.getWorkerCount();
while (worker_iter > 0) : (worker_iter -= 1) {
const target = self.next_target.worker orelse self;
if (target != self) {
var steal_targets = [_]QueueTarget{ .overflow, .buffer };
if (be_fair and target.run_queue.overflow.be_fair) {
steal_targets = [_]QueueTarget{ .buffer, .overflow };
}
for (steal_targets) |steal_target| {
if (switch (steal_target) {
.buffer => self.run_queue.buffer.steal(&target.run_queue.buffer),
.overflow => self.run_queue.buffer.steal(&target.run_queue.overflow),
}) |stole| {
did_schedule.* = stole.pushed;
return stole.task;
}
}
}
self.next_target = target.next_spawned
orelse scheduler.getWorkerIter().worker
orelse unreachable;
}
scheduler.platform.call(.{
.WorkerPollYield = .{
.worker = worker,
.be_fair = &be_fair,
.attempt = &attempt,
},
});
}
scheduler.platform.call(.{
.WorkerPollLast = .{
.worker = worker,
.result = &poll_result,
},
});
return poll_result;
}
};
pub const Scheduler = struct {
max_workers: usize,
platform: *Platform,
spawned: ?*Worker = null,
idle_queue: usize = 0,
run_queue: UnboundedQueue,
pub const Config = struct {
be_fair: bool,
max_workers: usize,
platform: *Platform,
};
pub fn init(config: Config) Scheduler {
return .{
.max_workers = std.math.min(config.max_workers, std.math.maxInt(IdleQueue.Count)),
.platform = config.platform,
.run_queue = UnboundedQueue.init(config.be_fair),
};
}
pub fn getWorkerCount(self: *const Scheduler) usize {
const idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic));
return idle_queue.spawned;
}
pub fn getWorkerIter(self: *const Scheduler) Worker.Iter {
const spawned = @atomicLoad(?*Worker, &self.spawned, .Acquire);
return Worker.Iter{ .worker = spawned };
}
pub fn schedule(self: *Scheduler, hints: SchedHints, group: Group) void {
if (group.len == 0) {
return;
}
if (hints.affinity) |worker| {
return worker.schedule(hints, group);
}
self.run_queue.push(group);
self.notify(false);
}
fn notify(self: *Scheduler, is_waking: bool) void {
@setCold(true);
const platform = self.platform;
const max_workers = self.max_workers;
var idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic));
while (true) {
const result = idle_queue.notify(.{
.is_waking = is_waking,
.max_spawn = max_workers,
}) orelse return;
const new_idle_queue = switch (result) {
.changed => |q| q,
.resumed => |q| q,
.spawned => |q| q,
};
if (@cmpxchgWeak(
usize,
&self.idle_queue,
idle_queue.pack(),
new_idle_queue.pack(),
.Release,
.Monotonic,
)) |updated| {
idle_queue = IdleQueue.unpack(updated);
continue;
}
const do_notify = switch (result) {
.changed => return,
.resumed => true,
.spawned => false,
};
if (do_notify) {
return platform.call(.{
.WorkerSignal = .{
.scheduler = self,
.max_notify = 1,
},
});
}
var spawned = false;
platform.call(.{
.WorkerSpawn = .{
.scheduler = self,
.spawned = &spawned,
},
});
if (!spawned) self.markWorkerShutdown();
return;
}
}
fn wait(self: *Scheduler, worker: *Worker) void {
@setCold(true);
var is_waiting = false;
var state = Worker.State.unpack(worker.state);
var idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic));
while (true) {
const result = idle_queue.wait(blk: {
if (state.did_schedule) break :blk .notified;
if (state.is_waking) break :blk .waking;
if (is_waiting) break :blk .waiting;
break :blk .running;
}) orelse {
state.scheduler = null;
break;
};
if (switch (result) {
.running => |q| @as(?IdleQueue, q),
.waking => |q| @as(?IdleQueue, q),
.waiting => |q| q,
}) |new_idle_queue| {
if (@cmpxchgWeak(
usize,
&self.idle_queue,
idle_queue.pack(),
new_idle_queue.pack(),
.Acquire,
.Monotonic,
)) |updated| {
idle_queue = IdleQueue.unpack(updated);
continue;
}
}
if (switch (result) {
.running => false,
.waking => true,
.waiting => null,
}) |is_waking| {
state.is_waking = is_waking;
break;
}
is_waiting = true;
state.is_waking = false;
state.is_monitoring = true;
worker.state = state.pack();
self.platform.call(.{
.WorkerWait = .{
.worker = worker,
},
});
state = Worker.State.unpack(worker.state);
std.debug.assert(state.scheduler == self);
std.debug.assert(state.is_monitoring);
std.debug.assert(!state.is_waking);
state.is_monitoring = false;
idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic));
}
}
pub fn shutdown(self: *Scheduler) void {
@setCold(true);
var idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic));
while (true) {
const result = idle_queue.shutdown() orelse return;
if (@cmpxchgWeak(
usize,
&self.idle_queue,
idle_queue.pack(),
result.changed.pack(),
.AcqRel,
.Monotonic,
)) |updated| {
idle_queue = IdleQueue.unpack(updated);
continue;
}
if (result.resumed == 0) return;
return platform.call(.{
.WorkerSignal = .{
.scheduler = self,
.max_notify = result.resumed,
},
});
}
}
fn markWorkerShutdown(self: *Scheduler) void {
@setCold(true);
var idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic));
while (true) {
const result = idle_queue.notifyShutdown();
if (@cmpxchgWeak(
usize,
&self.idle_queue,
idle_queue.pack(),
result.changed.pack(),
.Release,
.Monotonic,
)) |updated| {
idle_queue = IdleQueue.unpack(updated);
continue;
}
if (result.resumed == 0) return;
return platform.call(.{
.ShutdownSignal = .{
.scheduler = self,
.waiting = result.resumed,
},
});
}
}
pub fn join(self: *Scheduler) void {
@setCold(true);
self.shutdown();
self.waitForWorkerShutdown();
var workers = self.getWorkerIter();
while (workers.next()) |worker| {
self.platform.call(.{
.WorkerJoinNotify = .{
.worker = worker,
.scheduler = self,
},
});
}
}
fn waitForWorkerShutdown(self: *Scheduler) void {
@setCold(true);
var idle_queue = IdleQueue.unpack(@atomicLoad(usize, &self.idle_queue, .Monotonic));
while (true) {
const result = idle_queue.waitShutdown() orelse return;
if (@cmpxchgWeak(
usize,
&self.idle_queue,
idle_queue.pack(),
result.changed.pack(),
.Acquire,
.Monotonic,
)) |updated| {
idle_queue = IdleQueue.unpack(updated);
continue;
}
return platform.call(.{
.ShutdownWait = .{
.scheduler = self,
},
});
}
}
};
const IdleQueue = struct {
idle: usize = 0,
spawned: usize = 0,
notified: bool = false,
state: State = .pending,
const count_bits = (std.meta.bitCount(usize) - 4) / 2;
const Count = std.meta.Int(.unsigned, count_bits);
const State = enum(u2) {
pending = 0,
waking,
notified,
shutdown,
};
pub fn pack(self: IdleQueue) usize {
return ((@as(usize, @boolToInt(self.notified)) << 0) |
(@as(usize, @enumToInt(self.state)) << 1) |
(@as(usize, @intCast(Count, self.idle)) << 4) |
(@as(usize, @intCast(Count, self.spawned)) << (4 + count_bits)));
}
pub fn unpack(value: usize) IdleQueue {
return .{
.notified = value & (1 << 0) != 0,
.state = @intToEnum(State, @truncate(u2, value >> 1)),
.idle = @truncate(Count, value >> 4),
.spawned = @truncate(Count, value >> (4 + count_bits)),
};
}
pub const Notify = union(enum) {
changed: IdleQueue,
resumed: IdleQueue,
spawned: IdleQueue,
pub const Context = struct {
is_waking: bool,
max_spawn: usize,
};
};
pub fn notify(self: IdleQueue, context: Notify.Context) ?Notify {
if (self.state == .shutdown) {
return null;
}
var updated = self;
updated.notified = true;
if (context.is_waking) {
std.debug.assert(self.state == .waking);
}
if (self.idle > 0 and (context.is_waking or self.state == .pending)) {
updated.state = .notified;
return Notified{ .resumed = updated };
}
if (self.spawned < context.max_spawn and (context.is_waking or self.state == .pending)) {
updated.spawned += 1;
return Notified{ .spawned = updated };
}
if (context.is_waking) {
updated.state = .pending;
} else if (self.notified) {
return null;
}
return Notified{ .changed = updated };
}
pub const Wait = union(enum) {
running: IdleQueue,
waking: IdleQueue,
waiting: ?IdleQueue,
pub const Context = enum {
running,
waking,
waiting,
notified,
};
};
pub fn wait(self: IdleQueue, context: Wait.Context) ?Wait {
if (self.state == .shutdown) {
return null;
}
switch (context) {
.running => {},
.waking => std.debug.assert(self.state == .waking),
.waiting, .notified => std.debug.assert(self.idle > 0),
}
var updated = self;
updated.notified = false;
if (self.notified or context == .notified) {
updated.notified = false;
if (context == .waiting) {
updated.idle -= 1;
}
if (context == .waking or self.state == .notified) {
updated.state = .waking;
return Wait{ .waking = updated };
}
return Wait{ .running = updated };
}
if (context == .waiting) {
return Wait{ .waiting = null };
}
updated.idle += 1;
if (context == .waking) {
updated.state = .pending;
}
return Wait{ .waiting = updated };
}
pub const Shutdown = struct {
changed: IdleQueue,
resumed: usize,
};
pub fn shutdown(self: IdleQueue) ?Shutdown {
if (self.state == .shutdown) {
return null;
}
var updated = self;
updated.state = .shutdown;
updated.idle = 0;
updated.notified = true;
return Shutdown{
.changed = updated,
.resumed = self.idle,
};
}
pub const NotifyShutdown = struct {
changed: IdleQueue,
resumed: usize,
};
pub fn notifyShutdown(self: IdleQueue) NotifyShutdown {
std.debug.assert(self.state == .shutdown);
std.debug.assert(self.spawned > 0);
var updated = self;
updated.spawned -= 1;
const resumed = self.spawned == 1 and self.idle > 0;
if (resumed) {
self.idle = 0;
}
return NotifyShutdown{
.changed = updated,
.resumed = if (resumed) self.idle else 0,
};
}
pub const WaitShutdown = struct {
changed: IdleQueue,
};
pub fn waitShutdown(self: IdleQueue) ?WaitShutdown {
std.debug.assert(self.state == .shutdown);
if (self.spawned == 0) {
return null;
}
var updated = self;
updated.idle += 1;
return WaitShutdown{ .changed = updated };
}
};
const UnboundedQueue = struct {
be_fair: bool,
queue: Queue,
pub fn init(be_fair: bool) UnboundedQueue {
return UnboundedQueue{
.be_fair = be_fair,
.queue = switch (be_fair) {
true => .{ .fifo = .{} },
else => .{ .lifo = .{} },
},
};
}
pub fn push(self: *UnboundedQueue, group: Group) void {
const head = group.head;
const tail = group.tail;
std.debug.assert(group.len > 0);
std.debug.assert(head.next != null);
std.debug.assert(tail.next == null);
return switch (self.be_fair) {
true => self.queue.fifo.push(head, tail),
else => self.queue.lifo.push(head, tail),
};
}
pub fn popUnchecked(self: *UnboundedQueue) ?*Task {
return switch (self.be_fair) {
true => self.queue.fifo.pop(null),
else => self.queue.lifo.pop(null),
};
}
pub fn tryAcquireConsumer(self: *UnboundedQueue) ?Consumer {
const be_fair = self.be_fair;
const consumer = switch (be_fair) {
true => self.queue.fifo.acquireConsumer(),
else => self.queue.lifo.acquireConsumer(),
};
return Consumer{
.be_fair = be_fair,
.queue = &self.queue,
.consumer = consumer orelse return null,
};
}
pub const Consumer = struct {
be_fair: bool,
queue: *Queue,
consumer: usize,
pub fn pop(self: *Consumer) ?*Task {
return switch (self.be_fair) {
true => self.queue.fifo.pop(&self.consumer),
else => self.queue.lifo.pop(&self.consumer),
};
}
pub fn release(self: Consumer) void {
return switch (self.be_fair) {
true => self.queue.fifo.releaseConsumer(self.consumer),
else => self.queue.lifo.releaseConsumer(self.consumer),
};
}
};
const Queue = extern union {
fifo: FifoQueue,
lifo: LifoQueue,
};
const LifoQueue = struct {
stack: usize = 0,
local: ?*Task = null,
const HAS_LOCAL: usize = 1 << 1;
const HAS_CONSUMER: usize = 1 << 0;
const PTR_MASK = ~(HAS_CONSUMER | HAS_LOCAL);
comptime {
std.debug.assert((~PTR_MASK + 1) >= @alignOf(Task));
}
fn push(self: *LifoQueue, head: *Task, tail: *Task) void {
var stack = @atomicLoad(usize, &self.stack, .Monotonic);
while (true) {
tail.next = @intToPtr(?*Task, stack & PTR_MASK);
stack = @cmpxchgWeak(
usize,
&self.stack,
stack,
@ptrToInt(head) | (stack & ~PTR_MASK),
.Release,
.Monotonic,
) orelse return;
}
}
fn pop(self: *LifoQueue, consumer_ref: ?*usize) ?*Task {
const local_ptr = blk: {
const ref = consumer_ref orelse break :blk &self.local;
break :blk @ptrCast(*?*Task, ref);
};
if (local_ptr.*) |task| {
local_ptr.* = task.next;
return task;
}
var stack = @atomicLoad(usize, &self.stack, .Monotonic);
if (stack & PTR_MASK != 0) {
stack = @atomicRmw(usize, &self.stack, .Xchg, HAS_LOCAL | HAS_CONSUMER, .Acquire);
}
const task = @intToPtr(?*Task, stack & PTR_MASK) orelse return null;
local_ptr.* = task.next;
return task;
}
fn acquireConsumer(self: *LifoQueue) ?usize {
var stack = @atomicLoad(usize, &self.stack, .Monotonic);
while (true) {
if ((stack & ~HAS_CONSUMER == 0) or (stack & HAS_CONSUMER != 0)) {
return null;
}
var new_stack = stack | HAS_CONSUMER | HAS_LOCAL;
const take_stack = stack & HAS_LOCAL == 0;
if (take_stack) {
new_stack &= ~PTR_MASK;
}
stack = @cmpxchgWeak(
usize,
&self.stack,
stack,
new_stack,
.Acquire,
.Monotonic,
) orelse {
if (take_stack) return stack & PTR_MASK;
return @ptrToInt(self.local);
};
}
}
fn releaseConsumer(self: *LifoQueue, consumer: usize) void {
var remove: usize = HAS_CONSUMER;
const local = @ptrCast(?*Task, consumer);
if (local == null) {
remove |= HAS_LOCAL;
}
self.local = local;
_ = @atomicRmw(usize, &self.stack, .Sub, remove, .Release);
}
};
const FifoQueue = struct {
head: usize = 0,
tail: ?*Task = null,
stub_next: ?*Task = null,
fn stub(self: *FifoQueue) callconv(.Inline) *Task {
return @fieldParentPtr(Task, "next", &self.stub_next);
}
fn isEmpty(self: *const FifoQueue) callconv(.Inline) bool {
const tail = @atomicLoad(?*Task, &self.tail, .Monotonic);
return (tail == null) or (tail == self.stub());
}
fn push(self: *FifoQueue, head: *Task, tail: *Task) void {
const queue_tail = @atomicRmw(?*Task, &self.tail, .Xchg, tail, .AcqRel);
const queue_next = if (queue_tail) |t| &t.next else &self.stub_next;
@atomicStore(?*Task, queue_next, head, .Release);
}
fn pop(self: *FifoQueue, consumer_ref: ?*usize) ?*Task {
const stub = self.stub();
const head_ref = blk: {
const ref = consumer_ref orelse &self.head;
break :blk @ptrCast(**Task, ref);
};
var head = head_ref.*;
if (head == stub) {
head = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null;
head_ref.* = head;
}
if (@atomicLoad(?*Task, &head.next, .Acquire)) |next| {
head_ref.* = next;
return head;
}
const tail = @atomicLoad(?*Task, &self.tail, .Acquire);
if (tail != head) {
return null;
}
self.stub_next = null;
self.push(stub, stub);
const next = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null;
head_ref.* = next;
return head;
}
const HAS_CONSUMER: usize = 1 << 0;
fn acquireConsumer(self: *FifoQueue) ?usize {
if (self.isEmpty()) return null;
return self.acquireConsumerSlow();
}
fn acquireConsumerSlow(self: *FifoQueue) ?usize {
@setCold(true);
var head = @atomicLoad(usize, &self.head, .Monotonic);
while (true) {
if ((head & HAS_CONSUMER != 0) or self.isEmpty()) {
return null;
}
head = @cmpxchgWeak(
usize,
&self.head,
head,
head | HAS_CONSUMER,
.Acquire,
.Monotonic,
) orelse {
if (head == 0) return @ptrToInt(self.stub());
return head;
};
}
}
fn releaseConsumer(self: *FifoQueue, consumer: usize) void {
@atomicStore(usize, &self.head, consumer, .Release);
}
};
};
const BoundedQueue = struct {
head: Index = 0,
tail: Index = 0,
buffer: [capacity]*Task = undefined,
const capacity = 256;
const Index = std.meta.Int(.unsigned, std.meta.bitCount(*Task) / 2);
comptime {
std.debug.assert(std.math.maxInt(Index) >= capacity);
}
pub fn push(self: *BoundedQueue, group: Group) ?Group {
std.debug.assert(group.len > 0);
if (group.len > capacity) {
return group;
}
var tail = self.tail;
const head = @atomicLoad(Index, &self.head, .Monotonic);
std.debug.assert(tail -% head <= capacity);
const free_slots = capacity - (tail -% head);
if (group.len > free_slots) {
return group;
}
var tasks = group;
while (tasks.pop()) |task| {
@atomicStore(*Task, &self.buffer[tail % capacity], task, .Unordered);
tail +%= 1;
}
@atomicStore(Index, &self.tail, tail, .Release);
return null;
}
pub fn pop(self: *BoundedQueue, be_fair: bool) ?*Task {
const tail = self.tail;
var head = @atomicLoad(Index, &self.head, .Monotonic);
if (head == tail) {
return null;
}
while (be_fair) {
head = @cmpxchgWeak(
Index,
&self.head,
head,
head +% 1,
.Acquire,
.Monotonic,
) orelse return self.buffer[head % capacity];
if (head == tail) {
return null;
}
}
const new_tail = tail -% 1;
@atomicStore(Index, &self.tail, new_tail, .SeqCst);
head = @atomicLoad(Index, &self.head, .SeqCst);
std.debug.assert(tail -% head <= capacity);
var task: ?*Task = null;
if (head != tail) {
task = self.buffer[new_tail % capacity];
if (head != new_tail) {
return task;
} else if (@cmpxchgStrong(Index, &self.head, head, tail, .SeqCst, .Monotonic) != null) {
task = null;
}
}
@atomicStore(Index, &self.tail, tail, .Monotonic);
return task;
}
pub const Steal = struct {
task: *Task,
pushed: bool = false,
};
pub fn steal(noalias self: *BoundedQueue, noalias queue: anytype) ?Steal {
const is_unbounded = switch (@TypeOf(queue)) {
*BoundedQueue => false,
*UnboundedQueue => true,
else => |T| @compileError("Cannot steal from " ++ @typeName(T)),
};
const tail = self.tail;
const head = @atomicLoad(Index, &self.head, .Unordered);
std.debug.assert(tail == head);
if (is_unbounded) {
var consumer = queue.tryAcquireConsumer() orelse return null;
defer consumer.release();
var pushed: usize = 0;
while (pushed < capacity) : (pushed += 1) {
const task = consumer.pop() orelse break;
const index = (tail +% pushed) % capacity;
@atomicStore(*Task, &self.buffer[index], task, .Unordered);
}
var task = consumer.pop();
if (task == null and pushed > 0) {
pushed -= 1;
task = self.buffer[(tail +% pushed) % capacity];
}
if (pushed > 0) @atomicStore(Index, &self.tail, tail +% pushed, .Release);
return Steal{
.task = task,
.pushed = pushed > 0,
};
}
var queue_head = @atomicLoad(Index, &queue.head, .SeqCst);
while (true) {
const queue_tail = @atomicLoad(Index, &queue.tail, .SeqCst);
if ((queue_head == queue_tail) or (queue_head == queue_tail -% 1)) {
return null;
}
std.debug.assert(queue_tail -% queue_head <= capacity);
const task = @atomicLoad(*Task, &queue.buffer[queue_head % capacity], .Unordered);
queue_head = @cmpxchgWeak(
Index,
&queue.head,
queue_head,
queue_head +% 1,
.SeqCst,
.SeqCst,
) orelse return Steal{
.task = task,
.pushed = false,
};
}
}
};
const std = @import("std");
const Pool = @This();
pub const Task = @import("./t2.zig");
stack_size: usize,
optimize_for: Config.OptimizeFor,
scheduler: Task.Scheduler,
shutdown_event: ResetEvent = .{},
start_thread: ?*Thread = null,
idle_queue: AbaAtomicUsize = .{ .value = 0 },
pub const Config = struct {
optimize_for: OptimizeFor,
max_threads: usize,
stack_size: usize,
pub const OptimizeFor = enum{
Latency,
Throughput,
};
pub fn getDefault() Config {
return Config{
.optimize_for = .Throughput,
.stack_size = 16 * 1024 * 1024,
.max_threads = blk: {
if (std.builtin.single_threaded) break :blk 1;
break :blk std.math.max(1, std.Thread.cpuCount() catch 1),
},
};
}
};
pub fn init(config: Config) Pool {
return .{
.stack_size = std.math.max(64 * 1024, config.stack_size),
.optimize_for = optimize_for,
.scheduler = Task.Scheduler.init(.{
.be_fair = config.optimize_for == .Latency,
.max_workers = std.math.max(1, config.max_threads),
.platform = &pool_platform,
}),
};
}
pub fn deinit(self: *Pool) void {
self.scheduler.shutdown();
self.scheduler.join();
self.shutdown_event.deinit();
self.* = undefined;
}
pub const StartConfig = struct {
use_caller_thread: bool = false,
initial_group: Task.Group = .{},
fn noop(task: *Task) void {};
};
pub fn start(self: *Pool, config: StartConfig) void {
var group = config.initial_group;
var noop_task = Task{ .onExecute = StartConfig.noop };
if (config.user_caller_thread) {
self.start_thread = @intToPtr(*Thread, @alignOf(Thread));
group.push(Task.Group.from(&noop_task));
}
if (group.len > 0) {
self.scheduler.schedule(.{}, group);
}
}
pub fn stop(self: *Pool) void {
self.scheduler.shutdown();
}
pub fn schedule(self: *Pool, groupable: anytype) void {
const group = switch (@TypeOf(groupable)) {
Task.Group => groupable,
*Task => Task.Group.from(groupable),
else => |T| @compileError(@typeName(T) ++ " is not a Task.Group"),
};
if (group.len == 0) {
return;
}
if (Thread.getCurrent()) |thread| {
thread.worker.schedule(.{}, group);
} else {
self.scheduler.schedule(.{}, group);
}
}
const IDLE_PTR = ~IDLE_NOTIFIED;
const IDLE_NOTIFIED: usize = 0b1;
const IDLE_SHUTDOWN = std.math.maxInt(usize);
fn idleWait(self: *Pool, thread: *Thread) void {
@setCold(true);
var idle_queue = self.idle_queue.load(.Monotonic);
while (true) {
if (idle_queue.value == IDLE_SHUTDOWN) {
return;
}
var new_idle_queue: usize = 0;
if (idle_queue.value != IDLE_NOTIFIED) {
const idle_next = @intToPtr(?*Thread, idle_queue.value & IDLE_PTR);
@atomicStore(?*Thread, &thread.idle_next, idle_next, .Unordered);
new_idle_queue = @ptrToInt(thread);
}
if (self.idle_queue.cmpxchgWeak(
idle_queue,
new_idle_queue,
.AcqRel,
.Monotonic,
)) |updated| {
idle_queue = updated;
continue;
}
if (idle_queue.value != IDLE_NOTIFIED) {
thread.idle_event.wait();
}
return;
}
}
fn idleWake(self: *Pool, wake_all: bool) void {
@setCold(true);
var idle_threads: ?*Thread = null;
defer while (idle_threads) |thread| {
idle_threads = thread.idle_next;
thread.idle_event.set();
}
if (wake_all) {
const idle_queue = @atomicRmw(
usize,
&self.idle_queue.value,
.Xchg,
IDLE_SHUTDOWN,
.Acquire,
);
return switch (idle_queue) {
IDLE_SHUTDOWN => unreachable,
IDLE_NOTIFIED => {},
else => idle_threads = @intToPtr(?*Thread, idle_queue & IDLE_PTR),
};
}
var idle_queue = self.idle_queue.load(.Acquire);
while (true) {
const idle_thread = switch (idle_queue.value) {
IDLE_SHUTDOWN => return,
IDLE_NOTIFIED => return,
else => @intToPtr(?*Thread, idle_queue.value & IDLE_PTR),
};
var new_idle_queue = IDLE_NOTIFIED;
if (idle_thread) |thread| {
const next_thread = @atomicLoad(?*Thread, &thread.idle_next, .Unordered);
new_idle_queue = @ptrToInt(next_thread);
}
if (self.idle_queue.cmpxchgWeak(
idle_queue,
new_idle_queue,
.AcqRel,
.Acquire,
)) |updated| {
idle_queue = updated;
continue;
}
const thread = idle_thread orelse return;
@atomicStore(?*Thread, &thread.idle_next, null, .Unordered);
idle_threads = thread;
return;
}
}
const Thread = struct {
worker: Task.Worker = undefined,
handle: ?*std.Thread,
idle_next: ?*Thread = null,
idle_event: ResetEvent = .{},
fn spawn(pool: *Pool) !void {
const Spawner = struct {
put_event: ResetEvent = .{},
got_event: ResetEvent = .{},
handle: ?*std.Thread = undefined,
pool_ref: *Pool = undefined,
fn run(self: *@This()) void {
self.put_event.wait();
const handle = self.handle;
const pool_ref = self.pool_ref;
self.got_event.set();
return Thread.run(pool_ref, handle);
}
};
var spawner = Spawner{};
spawner.handle = try std.Thread.spawn(Spawner.run, &spawner);
spawner.pool_ref = pool;
spawner.put_event.set();
spawner.get_event.get();
spawner.put_event.deinit();
spawner.get_event.deinit();
}
fn run(pool: *Pool, handle: ?*std.Thread) void {
var self = Thread{ .handle = handle };
defer self.idle_event.deinit();
if (handle == null) {
pool.start_thread = &self;
}
const old_tls = ThreadLocalUsize.get();
ThreadLocalUsize.set(@ptrToInt(&self));
defer ThreadLocalUsize.set(old_tls);
self.worker.run(.{
.be_fair = pool.optimize_for == .Latency,
.scheduler = &pool.scheduler,
});
if (handle == null) {
pool.scheduler.join();
}
}
};
var pool_platform = Task.Platform{
.callFn = onPlatformCommand,
};
fn onPlatformCommand(_platform: *Task.Platform, command: Task.Platform.Command) void {
@compileError("TODO");
switch (command) {
.WorkerWait => |cmd| {
const thread = @fieldParentPtr(Thread, "worker", cmd.worker);
const pool = @fieldParentPtr(Pool, "scheduler", thread.worker.getScheduler());
pool.idleWait(thread);
},
.WorkerNotify => |cmd| {
const thread = @fieldParentPtr(Thread, "worker", cmd.worker);
thread.event.set();
},
.WorkerSignal => |cmd| {
const pool = @fieldParentPtr(Pool, "scheduler", cmd.scheduler);
const wake_all = cmd.max_notify > 1;
pool.idleWake(wake_all);
},
.WorkerSpawn => |cmd| {
const pool = @fieldParentPtr(Pool, "scheduler", cmd.scheduler);
if (self.start_thread == @intToPtr(*Thread, @alignOf(Thread))) {
self.start_thread = null;
Thread.run(pool, null);
cmd.spawned.* = true;
return;
}
cmd.spawned.* = true;
Thread.spawn(pool) catch {
cmd.spawned.* = false;
};
},
.WorkerPollFirst => |cmd| {
const thread = @fieldParentPtr(Thread, "worker", cmd.worker);
thread.idle_event.reset();
const pool = @fieldParentPtr(Pool, "scheduler", thread.worker.getScheduler());
cmd.be_fair.* = pool.optimize_for == .Latency;
cmd.attempts.* = switch (std.builtin.arch) {
.i386, .x86_64 => 16,
.aarch64 => 8,
else => 4,
};
},
.WorkerPollYield => |cmd| {
std.os.sched_yield() catch {};
},
.WorkerPollLast => |cmd| {
return;
},
.WorkerExecute => |cmd| {
cmd.task.execute();
},
.WorkerJoinWait => |cmd| {
const thread = @fieldParentPtr(Thread, "worker", cmd.worker);
if (thread.handle != null) {
thread.shutdown_event.wait();
}
},
.WorkerJoinNotify => |cmd| {
const thread = @fieldParentPtr(Thread, "worker", cmd.worker);
if (thread.handle) |handle| {
thread.shutdown_event.set();
handle.wait();
}
},
.ShutdownWait => |cmd| {
const pool = @fieldParentPtr(Pool, "scheduler", cmd.scheduler);
pool.shutdown_event.wait();
},
.ShutdownSignal => |cmd| {
const pool = @fieldParentPtr(Pool, "scheduler", cmd.scheduler);
pool.shutdown_event.set();
},
}
}
const AbaAtomicUsize = switch (std.builtin.arch) {
.i386, .x86_64 => extern struct {
const DoubleWord = std.meta.Int(.unsigned, std.meta.bitCount(usize) * 2);
value: usize align(@alignOf(DoubleWord)),
aba_id: usize = 0,
pub fn load(
self: *const AbaAtomicUsize,
comptime ordering: std.builtin.AtomicOrder,
) AbaAtomicUsize {
const value = @atomicLoad(usize, &self.value, ordering);
const aba_id = @atomicLoad(usize, &self.aba_id, .Unordered);
return .{
.value = value,
.aba_id = aba_id,
};
}
pub fn cmpxchgWeak(
self: *AbaAtomicUsize,
compare: AbaAtomicUsize,
exchange: usize,
comptime success: std.builtin.AtomicOrder,
comptime failure: std.builtin.AtomicOrder,
) ?AbaAtomicUsize {
const double_word = @cmpxchgWeak(
DoubleWord,
@ptrCast(*DoubleWord, self),
@bitCast(DoubleWord, compare),
@bitCast(DoubleWord, AbaAtomicUsize{
.value = exchange,
.aba_id = compare.aba_id +% 1,
}),
success,
failure,
) orelse return null;
return @bitCast(AbaAtomicUsize, double_word);
}
},
else => struct {
value: usize,
pub fn load(
self: *const AbaAtomicUsize,
comptime ordering: std.builtin.AtomicOrder,
) AbaAtomicUsize {
const value = @atomicLoad(usize, &self.value, ordering);
return .{ .value = value };
}
pub fn cmpxchgWeak(
self: *AbaAtomicUsize,
compare: AbaAtomicUsize,
exchange: usize,
comptime success: std.builtin.AtomicOrder,
comptime failure: std.builtin.AtomicOrder,
) ?AbaAtomicUsize {
return .{
.value = @cmpxchgWeak(
usize,
&self.value,
compare.value,
exchange,
success,
failure,
) orelse return null,
};
}
},
};
const is_apple_silicon = std.builtin.os.tag == .macos and std.builtin.arch == .aarch64;
const ThreadLocalUsize = switch (is_apple_silicon) {
true => struct {
const pthread_key_t = c_ulong;
extern "c" fn pthread_key_create(key: *pthread_key_t, destructor: ?fn (value: *c_void) callconv(.C) void) c_int;
extern "c" fn pthread_getspecific(key: pthread_key_t) ?*c_void;
extern "c" fn pthread_setspecific(key: pthread_key_t, value: ?*c_void) c_int;
const dispatch_once_t = usize;
const dispatch_function_t = fn (?*c_void) callconv(.C) void;
extern "c" fn dispatch_once_f(
predicate: *dispatch_once_t,
context: ?*c_void,
function: dispatch_function_t,
) void;
var tls_once: dispatch_once_t = 0;
var tls_key: pthread_key_t = undefined;
fn tls_init(_: ?*c_void) callconv(.C) void {
std.debug.assert(pthread_key_create(&tls_key, null) == 0);
}
pub fn get() usize {
dispatch_once_f(&tls_key, null, tls_init);
return @ptrToInt(pthread_getspecific(tls_key));
}
pub fn set(value: usize) void {
dispatch_once_f(&tls_key, null, tls_init);
std.debug.assert(pthread_setspecific(tls_key, @intToPtr(?*c_void, value)) == 0);
}
},
else => struct {
threadlocal var tls_usize: usize = 0;
pub fn get() usize {
return tls_usize;
}
pub fn set(value: usize) void {
tls_usize = value;
}
},
};
const ResetEvent = struct {
state: State = .empty,
futex: Futex = .{},
const State = enum(u32) {
empty,
waiting,
notified,
};
pub fn deinit(self: *ResetEvent) void {
self.futex.deinit();
self.* = undefined;
}
pub fn wait(self: *ResetEvent) void {
while (true) {
var state = @atomicLoad(State, &self.state, .Acquire);
while (state == .empty) {
state = .waiting;
state = @cmpxchgWeak(
State,
&self.state,
.empty,
.waiting,
.Acquire,
.Acquire,
) orelse break;
}
if (state == .notified) return;
self.futex.wait(
@ptrCast(*const u32, &self.state),
@enumToInt(State.waiting),
);
}
}
pub fn set(self: *ResetEvent) void {
switch (@atomicRmw(State, &self.state, .Xchg, .notified, .Release)) {
.empty => {},
.notified => {},
.waiting => self.futex.wake(@ptrCast(*const u32, &self.state)),
}
}
pub fn reset(self: *ResetEvent) void {
@atomicStore(State, &self.state, .empty, .Monotonic);
}
const Futex = if (std.builtin.os.tag == .windows)
WindowsFutex
else if (std.builtin.link_libc)
PosixFutex
else if (std.builtin.os.tag == .linux)
LinuxFutex
else
@compileError("ResetEvent unavailable for platform");
const LinuxFutex = struct {
pub fn deinit(self: *Futex) void {
self.* = undefined;
}
pub fn wait(self: *Futex, ptr: *const u32, cmp: u32) void {
switch (std.os.linux.getErrno(std.os.linux.futex_wait(
@ptrCast(*const i32, ptr),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAIT,
@bitCast(i32, cmp),
null,
))) {
0 => {},
std.os.EINTR => {},
std.os.EAGAIN => {},
std.os.ETIMEDOUT => unreachable,
else => unreachable,
}
}
pub fn wake(self: *Futex, ptr: *const u32) void {
switch (std.os.linux.getErrno(std.os.linux.futex_wake(
@ptrCast(*const i32, ptr),
std.os.linux.FUTEX_PRIVATE_FLAG | std.os.linux.FUTEX_WAKE,
sstd.math.maxInt(i32),
))) {
0 => {},
std.os.EINVAL => {},
std.os.EACCES => {},
std.os.EFAULT => {},
else => unreachable,
}
}
};
const WindowsFutex = struct {
lock: std.os.windows.SRWLOCK = std.os.windows.SRWLOCK_INIT,
cond: std.os.windows.CONDITION_VARIABLE = std.os.windows.CONDITION_VARIABLE_INIT,
pub fn deinit(self: *Futex) void {
self.* = undefined;
}
pub fn wait(self: *Futex, ptr: *const u32, cmp: u32) void {
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.lock);
defer std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock);
if (@atomicLoad(u32, ptr, .SeqCst) == cmp) {
_ = std.os.windows.kernel32.SleepConditionVariableSRW(
&self.cond,
&self.lock,
std.os.windows.INFINITE,
0,
);
}
}
pub fn wake(self: *Futex, ptr: *const u32) void {
std.os.windows.kernel32.AcquireSRWLockExclusive(&self.lock);
defer std.os.windows.kernel32.ReleaseSRWLockExclusive(&self.lock);
std.os.windows.kernel32.WakeAllConditionVariable(&self.cond);
}
};
const PosixFutex = struct {
mutex: std.c.pthread_mutex_t = .{},
cond: std.c.pthread_cond_t = .{},
pub fn deinit(self: *Futex) void {
const rc = std.c.pthread_cond_destroy(&self.cond);
std.debug.assert(rc == 0 or rc == std.os.EINVAL);
const rm = std.c.pthread_mutex_destroy(&self.mutex);
std.debug.assert(rm == 0 or rm == std.os.EINVAL);
}
pub fn wait(self: *Futex, ptr: *const u32, cmp: u32) void {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0);
if (@atomicLoad(u32, ptr, .SeqCst) == cmp) {
std.debug.assert(std.c.pthread_cond_wait(&self.cond, &self.mutex) == 0);
}
}
pub fn wake(self: *Futex, ptr: *const u32) void {
std.debug.assert(std.c.pthread_mutex_lock(&self.mutex) == 0);
defer std.debug.assert(std.c.pthread_mutex_unlock(&self.mutex) == 0);
std.debug.assert(std.c.pthread_cond_signal(&self.cond) == 0)
}
};
};
const std = @import("std");
const Task = @This();
next: ?*Task = undefined,
pub const Batch = struct {
head: ?*Task = null,
tail: *Task = undefined,
pub fn isEmpty(self: Batch) bool {
return self.head == null;
}
pub fn from(task: *Task) Batch {
task.next = null;
return Batch{
.head = task,
.tail = task,
};
}
pub const push = pushBack;
pub fn pushBack(self: *Batch, batch: Batch) void {
if (self.isEmpty()) {
self.* = batch;
} else if (!batch.isEmpty()) {
self.tail.next = batch.head;
self.tail = batch.tail;
}
}
pub fn pushFront(self: *Batch, batch: Batch) void {
if (self.isEmpty()) {
self.* = batch;
} else if (!batch.isEmpty()) {
batch.tail.next = self.head;
self.head = batch.head;
}
}
pub const pop = popFront;
pub fn popFront(self: *Batch) ?*Task {
const task = self.head orelse return null;
self.head = task.next;
return task;
}
};
pub const System = struct {
callFn: fn(*System, Command) void,
pub fn call(self: *System, command: Command) void {
return (self.callFn)(self, command);
}
pub const Command = union(enum) {
Spawn: struct {
scheduler: *Scheduler,
spawned: *bool,
},
Join: union(enum) {
Wait: *Scheduler,
Wake: *Scheduler,
},
Resume: union(enum) {
Exit: *Worker,
Signal: *Scheduler,
Broadcast: *Scheduler,
},
Suspend: union(enum) {
Exit: *Worker,
Wait: struct {
worker: *Worker,
scheduled: *bool,
},
},
Poll: struct {
worker: *Worker,
attempt: usize,
task: *?*Task,
scheduled: *bool,
targets: *?[]const PollTarget,
},
Execute: struct {
worker: *Worker,
task: *Task,
},
pub const PollTarget = union(enum) {
Local: bool, // be_fair: bool
Global,
Remove: *Worker,
};
};
};
pub const Scheduler = struct {
idle: usize = 0,
system: *System,
max_workers: Idle.Count,
spawned: ?*Worker = null,
run_queue: GlobalQueue = .{},
pub const Config = struct {
system: *System,
max_workers: usize,
};
pub fn init(config: Config) Scheduler {
return .{
.system = config.system,
.max_workers = blk: {
const num_workers = std.math.max(1, config.max_workers);
const max_workers = std.math.cast(Idle.Count, num_workers);
break :blk max_workers catch std.math.maxInt(Idle.Count);
},
};
}
pub fn schedule(self: *Scheduler, batch: Batch) void {
if (batch.isEmpty()) {
return;
}
self.run_queue.push(batch);
slf.notify(false);
}
fn notify(self: *Scheduler, is_waking: bool) void {
@setCold(true);
const result = Idle.notify(&self.idle, .{
.is_waking = is_waking,
.max_spawn = self.max_workers,
}) orelse return;
if (result == .signal) {
self.syscall(.Resume, .{ .Signal = self });
return;
}
var spawned = false;
self.syscall(.Spawn, .{
.scheduler = self,
.spawned = &spawned,
});
if (!spawned) {
self.endSpawn();
}
}
fn wait(self: *Scheduler, is_waking: bool, worker: *Worker) ?bool {
@setCold(true);
var context = Idle.Wait.Context{
.is_waking = is_waking,
.is_waiting = false,
.is_notified = false,
};
while (true) {
const result = Idle.wait(&self.idle, context) orelse return null;
switch (result) {
.waking => return true,
.running => return false,
.waiting => {},
};
context = .{
.is_waking = false,
.is_waiting = true,
.is_notified = false,
};
self.syscall(.Suspend, .{
.Wait = .{
.worker = worker,
.scheduled = &context.is_notified,
},
});
}
}
fn beginSpawn(self: *Scheduler, worker: *Worker) ?bool {
@setCold(true);
const is_waking = Idle.beginSpawn(&self.idle) orelse return null;
worker.next = @atomicLoad(?*Worker, &self.spawned, .Monotonic);
while (true) {
worker.next = @cmpxchgWeak(
?*Worker,
&self.spawned,
worker.next,
worker,
.Release,
.Monotonic,
) orelse break;
}
return is_waking;
}
fn endSpawn(self: *Scheduler) void {
@setCold(true);
const pending = Idle.endSpawn(&self.idle) orelse return;
if (pending > 0) {
self.syscall(.Join, .{ .Wake = self });
}
}
pub fn getWorkerIter(self: *const Scheduler) Worker.Iter {
const workers = @atomicLoad(?*Worker, &self.spawned, .Acquire);
return Worker.Iter{ .workers = workers };
}
pub fn getWorkerCount(self: *const Scheduler) Worker.Count {
const idle = Idle.unpack(@atomicLoad(usize, &self.idle, .Monotonic));
return Worker.Count{
.idle = idle.waiting,
.spawned = idle.spawned,
.max = self.max_workers,
};
}
pub fn shutdown(self: *Scheduler) void {
@setCold(true);
const pending = Idle.shutdown(&self.idle) orelse return;
if (pending > 0) {
self.syscall(.Resume, .{ .Broadcast = self });
}
}
pub fn deinit(self: *Scheduler) void {
@setCold(true);
self.shutdown();
if (Idle.join(&self.idle)) |_| {
self.syscall(.Join, .{ .Wait = self });
}
var workers = self.getWorkerIter();
while (workers.next()) |worker| {
self.syscall(.Resume, .{ .Exit = worker });
}
}
fn syscall(
self: Scheduler,
comptime cmd: std.meta.TagType(System.Command),
payload: @TypeOf(@field(@as(System.Command, undefined), @tagName(cmd))),
) callconv(.Inline) void {
const command = @unionInit(System.Command, @tagName(cmd), payload);
return self.system.call(command);
}
};
pub const Worker = struct {
scheduler: *Scheduler,
next: ?*Worker = null,
run_queue: LocalQueue = .{},
pub const Count = struct {
idle: usize = 0,
spawned: usize = 0,
max: usize = 0,
};
pub const Iter = struct {
workers: ?*Worker = null,
pub fn next(self: *Iter) ?*Worker {
const worker = self.workers orelse return null;
self.workers = worker.next;
return worker;
}
};
pub fn run(self: *Worker, scheduler: *Scheduler) void {
self.* = .{ .scheduler = scheduler };
var is_waking = scheduler.beginSpawn(self) orelse {
scheduler.endSpawn();
return;
};
while (true) {
const stole = self.poll(scheduler) orelse {
is_waking = scheduler.wait(is_waking, self) orelse break;
continue;
};
if (is_waking or stole.pushed) {
scheduler.notify(is_waking);
is_waking = false;
}
scheduler.syscall(.Execute, .{
.worker = self,
.task = stole.task,
});
}
scheduler.endSpawn();
scheduler.syscall(.Suspend, .{ .Exit = self });
}
pub fn getScheduler(self: Worker) *Scheduler {
return self.scheduler;
}
pub fn schedule(self: *Worker, batch: Batch) void {
if (batch.isEmpty()) {
return;
}
self.run_queue.push(batch);
self.scheduler.notify(false);
}
fn poll(self: *Worker, scheduler: *Scheduler) ?LocalQueue.Stole {
var attempt: usize = 0;
while (true) : (attempt += 1) {
var scheduled = false;
var polled: ?*Task = null;
var targets: ?[]const System.Command.PollTarget = null;
scheduler.syscall(.Poll, .{
.worker = self,
.attempt = attempt,
.task = &polled,
.scheduled = &scheduled,
.targets = &targets,
});
if (polled) |task| {
return LocalQueue.Stole{
.task = task,
.pushed = scheduled,
};
}
const poll_targets = targets orelse {
std.debug.assert(!scheduled);
return null;
};
for (poll_targets) |poll_target| {
if (switch (poll_target) {
.Local => |be_fair| self.run_queue.pop(be_fair),
.Global => self.run_queue.steal(&scheduler.run_queue),
.Remote => |worker| blk: {
if (worker == self)
break :blk self.run_queue.pop(false);
break :blk self.run_queue.steal(&worker.run_queue);
},
}) |stole| {
return stole;
}
}
}
}
};
////////////////////////////////////////////////////////////
const Idle = struct {
notified: bool = false,
state: State = .pending,
waiting: usize = 0,
spawned: usize = 0,
const count_bits = @divFloor(std.meta.bitCount(usize) - 3, 2);
const Count = std.meta.Int(.unsigned, count_bits);
const state_bits = 2;
const State = enum(std.meta.Int(.unsigned, state_bits)) {
pending = 0,
waking,
signaled,
shutdown,
};
fn pack(self: Idle) usize {
var value = @as(usize, @boolToInt(self.notified));
value |= @as(usize, @enumToInt(self.state)) << 1;
value |= @as(usize, @intCast(Count, self.waiting)) << (1 + state_bits);
value |= @as(usize, @intCast(Count, self.spawned)) << (1 + state_bits + count_bits);
return value;
}
fn unpack(value: usize) Idle {
return Idle{
.notified = value & (1 << 0) != 0,
.state = @intToEnum(State, @truncate(std.meta.TagType(State), value >> 1)),
.waiting = value >> (1 + state_bits),
.spawned = value >> (1 + state_bits + count_bits),
};
}
const Notify = enum {
signal,
spawn,
const Context = struct {
is_waking: bool,
max_spawn: Count,
};
};
fn notify(ptr: *usize, context: Notify.Context) ?Notify {
var idle = unpack(@atomicLoad(usize, ptr, .Monotonic));
while (true) {
if (idle.state == .shutdown) {
return null;
}
const can_wake = context.is_waking or idle.state == .pending;
if (context.is_waking) {
std.debug.assert(idle.state == .waking);
}
var new_idle = idle;
new_idle.notified = true;
if (idle.waiting > 0 and can_wake) {
new_idle.state = .signaled;
} else if (idle.spawned < context.max_spawn and can_wake) {
new_idle.state = .signaled;
new_idle.spawned += 1;
} else if (context.is_waking) {
new_idle.state = .pending;
} else if (idle.notified) {
return null;
}
if (@cmpxchgWeak(
usize,
ptr,
idle.pack(),
new_idle.pack(),
.Release,
.Monotonic,
)) |updated| {
idle = unpack(updated);
continue;
}
if (idle.waiting > 0 and can_wake) {
return Notify.signal;
}
if (idle.spawned < context.max_spawn and can_wake) {
return Notify.spawn;
}
return null;
}
}
fn beginSpawn(ptr: *usize) ?bool {
var idle = unpack(@atomicLoad(usize, ptr, .Monotonic));
while (true) {
const can_wake = switch (idle.state) {
.pending => false,
.waking => return false,
.signaled => true,
.shutdown => return null,
};
if (can_wake or idle.notified) {
var new_idle = idle;
new_idle.notified = false;
if (can_wake) {
new_idle.state = .waking;
}
if (@cmpxchgWeak(
usize,
ptr,
idle.pack(),
new_idle.pack(),
.Acquire,
.Monotonic,
)) |updated| {
idle = unpack(updated);
continue;
}
}
return can_wake;
}
}
fn endSpawn(ptr: *usize) ?usize {
const idle = unpack(@atomicRmw(
usize,
ptr,
.Sub,
(Idle{ .spawned = 1 }).pack(),
.Release,
));
std.debug.assert(idle.spawned > 0);
if (idle.state == .shutdown and idle.spawned == 1) {
return idle.waiting;
}
}
const Wait = enum {
waking,
waiting,
running,
const Context = struct {
is_waking: bool,
is_waiting: bool,
is_notified: bool,
};
};
fn wait(ptr: *usize, context: Wait.Context) ?Wait {
var idle = unpack(@atomicLoad(usize, ptr, .Monotonic));
while (true) {
if (idle.state == .shutdown) {
return null;
}
const can_wake = context.is_notified or idle.notified;
if (context.is_waking) {
std.debug.assert(idle.state == .waking);
}
var new_idle = idle;
if (context.is_waiting and can_wake) {
new_idle.waiting -= 1;
} else if (context.is_waiting) {
return Wait.waiting;
} else {
std.debug.assert(!context.is_notified);
}
if (can_wake) {
new_idle.notified = false;
if (context.is_waking or idle.state == .signaled) {
new_idle.state = .waking;
}
} else {
new_idle.waiting += 1;
if (context.is_waking) {
new_idle.state = .pending;
}
}
if (@cmpxchgWeak(
usize,
ptr,
idle.pack(),
new_idle.pack(),
.Acquire,
.Monotonic,
)) |updated| {
idle = unpack(updated);
continue;
}
if (!can_wake) {
return Wait.waiting;
}
if (context.is_waking or idle.state == .signaled) {
return Wait.waking;
}
return Wait.running;
}
}
fn shutdown(ptr: *usize) ?usize {
var idle = unpack(@atomicLoad(usize, ptr, .Monotonic));
while (true) {
if (idle.state == .shutdown) {
return null;
}
var new_idle = idle;
new_idle.state = .shutdown;
new_idle.waiting = 0;
if (@cmpxchgWeak(
usize,
ptr,
idle.pack(),
new_idle.pack(),
.AcqRel,
.Monotonic,
)) |updated| {
idle = unpack(updated);
continue;
}
return idle.waiting;
}
}
fn join(ptr: *usize) ?void {
var idle = unpack(@atomicLoad(usize, ptr, .Acquire));
while (true) {
std.debug.assert(idle.state == .shutdown);
if (idle.spawned == 0) {
return null;
}
var new_idle = idle;
new_idle.waiting += 1;
if (@cmpxchgWeak(
usize,
ptr,
idle.pack(),
new_idle.pack(),
.Acquire,
.Acquire,
)) |updated| {
idle = unpack(updated);
continue;
}
return;
}
}
};
const GlobalQueue = struct {
head: usize = 0,
tail: ?*Task = null,
stub: Task = .{ .next = null },
fn isEmpty(self: *const GlobalQueue) bool {
const tail = @atomicLoad(?*Task, &self.tail, .Monotonic);
return (tail == null) or (tail == &self.stub);
}
fn push(self: *GlobalQueue, batch: Batch) void {
std.debug.assert(!batch.isEmpty());
const tail = @atomicRmw(?*Task, &self.tail, .Xchg, batch.tail, .AcqRel);
const prev = tail orelse &self.stub;
@atomicStore(?*Task, &prev.next, batch.head, .Release);
}
const Consumer = struct {
queue: *GlobalQueue,
head: *Task,
const IS_CONSUMING: usize = 0b1;
fn tryAcquire(queue: *GlobalQueue) ?Consumer {
if (queue.isEmpty()) {
return null;
} else {
return tryAcquireSlow(queue);
}
}
fn tryAcquireSlow(queue: *GlobalQueue) ?Consumer {
@setCold(true);
while (true) {
const head = @atomicLoad(usize, &queue.head, .Monotonic);
if (head & IS_CONSUMING != 0) {
return null;
}
_ = @cmpxchgWeak(
usize,
&queue.head,
head,
head | IS_CONSUMING,
.Acquire,
.Monotonic,
) orelse return Consumer{
.queue = queue,
.head = @intToPtr(?*Task, head) orelse &queue.stub,
};
if (queue.isEmpty()) {
return null;
}
}
}
fn release(self: Consumer) void {
@atomicStore(
usize,
&self.queue.head,
@ptrToInt(self.head) & ~IS_CONSUMING,
.Release,
);
}
fn pop(self: *Consumer) ?*Task {
var head = self.head;
const stub = &self.queue.stub;
if (head == stub) {
head = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null;
self.head = head;
}
if (@atomicLoad(?*Task, &head.next, .Acquire)) |new_head| {
self.head = new_head;
return head;
}
const tail = @atomicLoad(?*Task, &queue.tail, .Monotonic);
if (head != tail) {
return null;
}
const batch = Batch.from(stub);
self.queue.push(batch);
const new_head = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null;
self.head = new_head;
return head;
}
};
};
const LocalQueue = struct {
head: Index = 0,
tail: Index = 0,
overflow: GlobalQueue = .{},
buffer: [256]*Task = undefined,
const Index = std.meta.Int(.unsigned, std.meta.bitCount(usize) / 2);
comptime {
std.debug.assert(std.math.maxInt(Index) >= self.buffer.len);
}
fn push(self: *LocalQueue, batch: Batch) void {
const task = batch.head orelse unreachable;
if (task == batch.tail) {
self.overflow.push(batch);
return;
}
var tail = self.tail;
var head = @atomicLoad(Index, &self.head, .Monotonic);
while (true) {
const size = tail -% head;
std.debug.assert(size <= self.buffer.len);
if (size < self.buffer.len) {
@atomicStore(*Task, &self.buffer[tail % self.buffer.len], task, .Unordered);
@atomicStore(Index, &self.tail, tail +% 1, .Release);
return;
}
var migrate: Index = self.buffer.len / 2;
if (@cmpxchgWeak(
Index,
&self.head,
head,
head +% migrate,
.Acquire,
.Monotonic,
)) |updated| {
head = updated;
continue;
}
var overflowed = Batch{};
while (migrate > 0) : (migrate -= 1) {
const migrated = self.buffer[(head +% (migrate - 1)) % self.buffer.len];
overflowed.pushFront(Batch.from(migrated));
}
overflowed.pushBack(batch);
return;
}
}
const Scope = enum {
lifo,
fifo,
remote,
global,
};
fn pop(self: *LocalQueue, be_fair: bool) ?Steal {
var scopes = [_]Scope{ .lifo, .overflow };
if (be_fair) {
scopes = [_]Scope{ .overflow, .fifo };
}
for (scopes) |scope| {
if (switch (scope) {
.lifo => self.steal(Scope.lifo),
.fifo => self.steal(Scope.fifo),
.global => self.steal(&self.overflow),
.remote => null,
}) |stole| {
return stole;
}
}
return null;
}
const Stole = struct {
task: *Task,
pushed: bool,
};
fn steal(self: *LocalQueue, target: anytype) ?Stole {
const tail = self.tail;
const scope = switch (@TypeOf(target)) {
Scope => target,
*LocalQueue => .remote,
*GlobalQueue => .global,
else => |T| @compileError(@typeName(T) ++ " is not stealable"),
};
switch (scope) {
.global => {
var consumer = GlobalQueue.Consumer.tryAcquire(target) orelse return null;
defer consumer.release();
const free_slots = blk: {
const head = @atomicLoad(Index, &self.head, .Monotonic);
const size = tail -% head;
std.debug.assert(size <= self.buffer.len);
break :blk self.buffer.len - size;
};
var pushed: Index = 0;
while (pushed < free_slots) {
const task = consumer.pop() orelse break;
@atomicStore(*Task, &self.buffer[(tail +% pushed) % self.buffer.len], task, .Unordered);
pushed += 1;
}
var task = consumer.pop();
if (task == null and pushed > 0) {
pushed -= 1;
task = self.buffer[(tail +% pushed) % self.buffer.len];
}
if (pushed > 0) {
@atomicStore(Index, &self.tail, tail +% pushed, .Release);
}
return Stole{
.task = task,
.pushed = pushed > 0,
};
},
.remote => {
if (self.steal(&target.overflow)) |stole| {
return stole;
}
var target_head = @atomicLoad(Index, &target.head, .SeqCst);
while (true) {
const target_tail = @atomicLoad(Index, &target.tail, .SeqCst);
const target_size = target_tail -% target_head;
if (target_size == 0 or target_size > self.buffer.len) {
return null;
}
const task = @atomicLoad(*Task, &target.buffer[target_head % target.buffer.len], .Unordered);
target_head = @cmpxchgWeak(
Index,
&target.head,
target_head,
target_head +% 1,
.SeqCst,
.SeqCst,
) orelse return task;
}
},
.fifo => {
var head = @atomicLoad(Index, &self.head, .Monotonic);
while (true) {
if (head == tail) {
return null;
}
const task = self.buffer[head % self.buffer.len];
head = @cmpxchgStrong(
Index,
&self.head,
head,
head +% 1,
.Acquire,
.Monotonic,
) orelse return head;
}
},
.lifo => {
var head = @atomicLoad(Index, &self.head, .Monotonic);
std.debug.assert(tail -% head <= self.buffer.len);
if (head == tail) {
return null;
}
const new_tail = tail -% 1;
@atomicStore(Index, &self.tail, new_tail, .SeqCst);
head = @atomicLoad(Index, &self.head, .SeqCst);
std.debug.assert(tail -% head <= self.buffer.len);
const popped = blk: {
if (head == tail) break :blk false;
if (head != new_tail) break :blk true;
break :blk @cmpxchgStrong(
Index,
&self.head,
head,
tail,
.SeqCst,
.Monotonic,
) == null;
};
if (popped) {
return Stole{
.task = self.buffer[new_tail % self.buffer.len],
.pushed = false,
};
}
@atomicStore(Index, &self.tail, tail, .Monotonic);
return null;
},
}
}
};
const std = @import("std");
/// Internal state used by the scheduler
next: ?*Task = undefined,
/// Callback function invoked when the task is executed
executeFn: fn(*Task) void,
pub fn execute(self: *Task) void {
return (self.executeFn)(self);
}
/// An ordered set of Tasks used to schedule them in batch.
pub const Group = struct {
len: usize = 0,
head: *Task = undefined,
tail: *Task = undefined,
/// Create a Group from a Task which can then be pushed to other groups.
pub fn from(task: *Task) Group {
task.next = null,
return Group{
.len = 1,
.head = task,
.tail = task,
};
}
/// Push a group into this group using FIFO ordering.
pub const push = pushBack;
/// Pop a task from this group using FIFO ordering.
pub const pop = popFront;
/// Push a group into this group using FIFO ordering.
pub fn pushBack(self: *Group, group: Group) callconv(.Inline) void {
return self.pushMany(group, .fifo);
}
/// Push a group into this group using LIFO ordering.
pub fn pushFront(self: *Group, group: Group) callconv(.Inline) void {
return self.pushMany(group, .lifo);
}
fn pushMany(self: *Group, group: Group, comptime order: enum{fifo, lifo}) void {
if (group.len == 0) {
return;
}
if (self.len == 0) {
self.* = group;
return;
}
self.len += group.len;
switch (order) {
.fifo => {
self.tail.next = group.head;
self.tail = group.tail;
},
.lifo => {
group.tail.next = self.head;
self.head = group.head;
},
}
}
/// Pop a task from this group using FIFO ordering.
pub fn popFront(self: *Group) ?*Task {
if (self.len == 0) {
return null;
}
const task = self.head;
self.head = task.next orelse undefined;
self.len -= 1;
return task;
}
};
pub const Worker = struct {
state: usize,
next: ?*Worker = null,
run_queue: struct {
direct: LifoQueue = .{},
overflow: FifoQueue = .{},
buffer: BufferQueue = .{},
},
const State = struct {
mode: Mode,
scheduled: bool,
lifo_queue: bool,
scheduler: *Scheduler,
const Mode = enum(u2) {
waking = 0,
polling,
running,
shutdown,
};
comptime {
std.debug.assert(@alignOf(Scheduler) >= 0b10000);
}
fn pack(self: State) usize {
return @ptrToInt(self.scheduler)
| (@as(usize, @boolToInt(self.scheduled)) << 0)
| (@as(usize, @boolToInt(self.lifo_queue)) << 1)
| (@as(usize, @enumToInt(self.mode)) << 2);
}
fn unpack(value: usize) State {
return .{
.scheduler = @intToPtr(*Scheduler, value & ~@as(usize, 0b1111)),
.scheduled = value & (1 << 0) != 0,
.lifo_queue = value & (1 << 1) != 0,
.mode = @intToEnum(Mode, @truncate(u2, value >> 2)),
};
}
};
// The type of queue/queue-operation for a Worker's local queue.
pub const LocalQueueOrder = enum{
/// Dequeue and steal from the opposite side that Tasks are scheduled.
Fifo,
/// Dequeue and try to steal from the same side that Tasks are scheduled.
Lifo,
};
/// Run a Worker under the given Scheduler that it was spawned from.
/// The LocalQueueOrder influences what type of local queue is used for the Worker.
pub fn run(self: *Worker, scheduler: *Scheduler, queue_order: LocalQueueOrder) void {
self.* = Worker{
.state = (State{
.mode = .waking,
.scheduled = false,
.lifo_queue = queue_order == .Lifo,
.scheduler = scheduler,
}).pack(),
};
// Register the worker into the scheduler as a spawned
self.next = @atomicLoad(?*Worker, &scheduler.spawned, .Monotonic);
while (true) {
self.next = @cmpxchgWeak(
?*Worker,
&scheduler.spawned,
self.next,
self,
.Release,
.Monotonic,
) orelse break;
}
// Worker event loop
while (running: {
const state = State.unpack(@ptrCast(*volatile usize, &self.state).*);
break :running switch (state.mode) {
.running, .waking => true,
.polling => unreachable,
.shutdown => false,
};
}) {
const task = self.poll(scheduler) orelse {
scheduler.wait(self);
continue;
};
scheduler.syscall(.Execute, .{
.worker = self,
.task = task,
});
}
// Wait for the worker to be joined.
// All workers are joined when the scheduler is shut down.
// This is needed due to the Workers being intrusive and possibly referencing other shutdown workers.
scheduler.syscall(.JoinWait, .{ .worker = self });
self.* = undefined;
}
/// Schedule a Group of Tasks to run on some Worker in the Scheduler.
/// Can only be called by the thread which owns this Worker.
pub fn schedule(self: *Worker, group: Group) void {
if (group.len == 0) return;
if (self.run_queue.buffer.push(group)) |overflowed| {
self.run_queue.overflow.push(overflowed);
}
var state = State.unpack(self.state);
std.debug.assert(state.mode != .shutdown);
if (state.mode != .polling) {
return state.scheduler.notify(false);
}
if (!state.scheduled) {
state.scheduled = true;
self.state = state.pack();
}
}
/// Schedule a Group of Tasks to run only on this Worker.
/// Can be called by any thread, not only the Worker's thread.
pub fn scheduleDirectly(self: *Worker, group: Group) void {
if (group.len == 0) return;
self.run_queue.direct.push(group);
const scheduler = self.getScheduler();
scheduler.syscall(.Wake, .{
.scheduler = scheduler,
.worker = .{ .direct = self },
});
}
/// Get a reference to the Scheduler this thread was spawned by.
pub fn getScheduler(self: Worker) callconv(.Inline) *Scheduler {
return State.unpack(self.state).scheduler;
}
/// Returns true if the Worker has no local Tasks it could be processing.
pub fn hasLocalTasks(self: *const Worker) bool {
return !self.run_queue.local.isEmpty()
and !self.run_queue.buffer.isEmpty()
and !self.run_queue.overflow.isEmpty();
}
fn poll(self: *Worker, scheduler: *Scheduler) ?*Task {
var state = State.unpack(self.state);
std.debug.assert(state.mode != .shutdown);
std.debug.assert(state.mode != .polling);
std.debug.assert(!state.scheduled);
// Put the Worker in a `polling` state.
// This causes any subsequent Worker.schedule()s to set `.scheduled` instead of doing a notification.
const old_mode = state.mode;
state.mode = .polling;
self.state = state.pack();
// Poll for a task, recording whether it pushed/scheduled some more tasks.
const polled_task = self.pollForTask(scheduler, &state);
const poll_scheduled = state.scheduled;
// Re-check the state to see if any syscalls made in `pollForTask()` did any Worker.schedule()s.
// Volatile read to ensure that the state is actually re-read to just be on the safe side.
state = State.unpack(@ptrCast(*volatile usize, &self.state).*);
std.debug.assert(state.mode == .polling);
// If we're the "waking" Worker then we need to transfer "waking" status & resume another worker.
// If we pushed tasks or did anything akin to a Worker.schedule(), we need to make other workers aware of that.
const is_waking = old_mode == .waking;
if (is_waking or state.scheduled or poll_scheduled) {
scheduler.notify(is_waking);
}
// Reset everything back and return the polled task.
state.mode = old_mode;
state.scheduled = false;
self.state = state.pack();
return polled_task;
}
pub const PollType = union(enum) {
/// Poll for Tasks on the Worker's local queue.
/// Any Tasks schedule()'d or stolen from other queues are in the Worker's local queue.
/// The provided queue order is used as a hint for how the Worker should dequeue from its local queue.
Local: LocalQueueOrder,
/// Poll for Tasks on the Scheduler's queue.
/// Any extra Tasks stolen from the Scheduler's queue are moved to the Worker's local queue.
Global,
/// Poll for Tasks on the Worker's direct queue.
/// Any tasks scheduleDirectly()'d to the Worker are in it's direct queue.
/// A Worker's direct queue cannot be stolen from by other Workers.
Direct,
/// Poll for Tasks by stealing from other Worker local queues.
/// Any extra Tasks stolen from other Worker local queues are moved to this Worker's local queue.
Steal,
};
fn pollForTask(self: *Worker, scheduler: *Scheduler, state: *State) ?*Task {
var poll_attempt: usize = 0;
var poll_task: ?*Task = null;
var poll_order: ?[]const PollType = null;
while (true) : (poll_attempt +%= 1) {
scheduler.syscall(.Poll, .{
.worker = self,
.attempt = poll_attempt,
.order = &poll_order,
.task = &poll_task,
});
if (poll_task) |task| {
return task;
}
const poll_types = poll_order orelse return null;
polling: for (poll_types) |poll_type| {
const result = switch (poll_type) {
.Local => |order| blk: {
const pop_context = BufferQueue.Pop.Context{
.be_fair = order == .Fifo,
.is_lifo = state.lifo_queue,
};
break :blk self.run_queue.buffer.pop(pop_context)
orelse self.run_queue.buffer.steal(&self.run_queue.overflow, pop_context)
orelse continue;
},
.Global => blk: {
break :blk self.run_queue.buffer.steal(
&scheduler.run_queue,
.{ .is_lifo = state.lifo_queue },
) orelse continue;
},
.Direct => blk: {
const task = self.run_queue.direct.pop() orelse continue;
break :blk BufferQueue.Pop{ .task = task };
},
.Steal => blk: {
var steal_attempt: usize = 0;
var steal_target: ?*Worker = null;
while (true) : (steal_attempt += 1) {
scheduler.syscall(.Steal, .{
.worker = self,
.attempt = steal_attempt,
.target = &steal_target,
.task = &poll_task,
});
if (poll_task) |task| {
return BufferQueue.Pop{ .task = task };
}
const target_worker = steal_target orelse continue :polling;
if (target_worker == self) {
break :blk self.run_queue.buffer.pop(.{ .is_lifo = state.lifo_queue }) orelse continue;
}
const target_is_lifo = State.unpack(target_worker.state).lifo_queue;
const target_context = BufferQueue.Pop.Context{ .is_lifo = target_is_lifo };
break :blk self.run_queue.buffer.steal(&target.run_queue.overflow, target_context)
orelse self.run_queue.buffer.steal(&target.run_queue.buffer, target_context)
orelse continue;
}
},
};
state.scheduled = result.pushed;
return result.tastk;
}
}
}
};
pub const Scheduler = struct {
config: Config align(std.math.max(16, @alignOf(Config))),
idle_queue: usize = 0,
spawned: ?*Worker = null,
run_queue: FifoQueue = .{},
pub const Config = struct {
/// System which is called by the Scheduler/Worker when necessary for the implementation.
system: *System,
/// Maximum amount of workers that can be spawned on this Scheduler.
max_workers: usize,
};
/// Create a Scheduler using the provided Config.
/// This can be called statically and at comptime.
pub fn init(config: Config) Scheduler {
return .{ .config = config };
}
/// Schedules a Group of Tasks to eventually be executed on a Worker in the Scheduler.
/// Can be called by any thread.
pub fn schedule(self: *Scheduler, group: Group) void {
if (group.len == 0) return;
self.run_queue.push(group);
self.notify(false);
}
fn notify(self: *Scheduler, is_waking: bool) void {
@setCold(true);
const result = IdleQueue.notify(&self.idle_queue, .{
.is_waking = is_waking,
.max_spawn = self.config.max_workers,
});
switch (result) {
.updated, .ignored => return,
.resumed => {
self.syscall(.Wake, .{
.scheduler = self,
.workers = .{ .waiters = 1 },
});
},
.spawned => {
var spawned = false;
self.syscall(.Spawn, .{
.scheduler = self,
.spawned = &spawned,
});
if (!spawned) {
self.markShutdown();
}
},
};
}
fn wait(self: *Scheduler, worker: *Worker) void {
@setCold(true);
var wait_state: ?bool = null;
var worker_state = Worker.State.unpack(worker.state);
while (true) {
const result = IdleQueue.wait(&self.idle_queue, blk: {
if (worker_state.mode == .waking) break :blk .waking;
const resumed = wait_state orelse break :blk .running;
if (resumed) break :blk .waiting;
break :blk .notified;
});
notified: {
worker_state.mode = switch (result) {
.running => .running,
.waking => .waking,
.waiting => break :notified,
.shutdown => .shutdown,
}
}
if (worker_state.mode == .waking) {
worker_state.mode = .running;
worker.state = worker_state.pack();
}
var resumed = true;
defer wait_state = resumed;
self.syscall(.Wait, .{
.scheduler = self,
.worker = worker,
.resumed = &resumed,
});
}
}
/// Mark the Scheduler as shutdown, waking up any idle Workers in the process.
/// Workers will eventually see that the Scheduler is shut down and enter a Join state.
pub fn shutdown(self: *Scheduler) void {
@setCold(true);
if (IdleQueue.beginShutdown(&self.idle_queue)) |idle_waiters| {
self.syscall(.Wake, .{
.scheduler = self,
.worker = .{ .waiters = idle_waiters },
});
}
}
fn markShutdown(self: *Scheduler) void {
@setCold(true);
if (IdleQueue.notifyOneShutdown(&self.idle_queue)) |shutdown_waiters| {
self.syscall(.ShutdownWake, .{
.scheduler = self,
.waiters = shutdown_waiters,
});
}
}
/// Waits for the Scheduler to be shut down and for all Workers to enter a Join state.
/// Once all workers are in the Join state, it joins them effectively releasing them from the Scheduler.
///
/// After a call to join(), the Scheduler's is considered `undefined` and further use is illegabl behavior.
/// Can only be called by one thread and only once.
pub fn join(self: *Scheduler) void {
@setCold(true);
if (IdleQueue.waitForShutdown(&self.idle_queue)) {
self.syscall(.ShutdownWait, .{ .scheduler = self });
}
var workers = Worker.Iter{ .workers = spawned_workers };
while (workers.next()) |worker| {
self.syscall(.JoinNotify, .{ .worker = worker });
}
}
fn syscall(self: Scheduler, comptime opcode: anytype, payload: anytype) callconv(.Inline) void {
const command = @unionInit(System.Command, std.meta.tagName(opcode), payload);
return self.config.system.call(command);
}
};
pub const System = struct {
callFn: fn(*System, Command) void,
pub fn call(self: *System, command: Command) callconv(.Inline) void {
return (self)(self, command);
}
pub const Command = union(enum) {
Spawn: struct {
scheduler: *Scheduler,
spawned: *bool,
},
Wait: struct {
scheduler: *Scheduler,
worker: *Worker,
resumed: *bool,
},
Wake: struct {
scheduler: *Scheduler,
worker: union(enum){
direct: *Worker,
waiters: usize,
},
},
Poll: struct {
worker: *Worker,
attempt: usize,
order: *?[]const Worker.PollType,
task: *?*Task,
},
Steal: struct {
worker: *Worker,
attempt: usize,
target: *?*Worker,
task: *?*Task,
},
Execute: struct {
worker: *Worker,
task: *Task,
},
JoinWait: struct {
worker: *Worker,
},
JoinNotify: struct {
worker: *Worker,
},
ShutdownWait: struct {
scheduler: *Scheduler,
},
ShutdownNotify: struct {
scheduler: *Scheduler,
waiters: usize,
},
};
};
/// Lock-free state manager system which coordinates the sleeping and waking of worker threads.
/// The algorithm was developed as a way to amortize the cost of thread wake ups and synchronize shutdown.
///
/// The idea is to have the concept of a "waking" thread which is either resumed from idle state or freshly spawned.
/// There can be only one "waking" thread at any given moment and notifications in the mean time just leave a token.
/// If the "waking" thread finds work, then it transfers its "waking" status to another thread via resuming/spawning one.
/// This has the effect of a smooth ramp up in threads looking for work to decrease contention.
/// It also amortizes the resume/spawn cost of multiple notifications in succession which is common when one thread is producing work.
///
/// When any thread fails to find work, it transitions to the idle state.
/// If the thread sees a notification token left behind, it cosumes it and tries to search for work again.
/// If the "waking" thread consumes the notification token, it remains "waking". If not, it relenquishes its "waking" status to be taken.
/// The notification token ensures that notifications after a "waking" thread was chosen do not go unseen.
/// The ability for any thread (not only the "waking" thread) to grab the token improves latency of a task being processed sooner.
///
/// Finally, the state also acts as a way to synchronize shutting down all of the worker threads.
/// On shutdown, the state is unconditionally transitioned to `.shutdown` and all idle threads are woken up.
/// Any subsequent notifications are ignored and ongoing idle transitions are cancelled and direct the thread to shut down.
/// When a thread shuts down, it decreased the `spawned` count by one. When `spawned` reaches 0, all worker threads have shut down.
/// If there are outside threads waiting for all spawned threads to shutdown, they can increment `idle` while `spawned` isnt 0.
/// The last worker thread to decrement `spawned` to 0 should wake up any outside threads waiting for shut down sync by checking `idle` count.
const IdleQueue = struct {
idle: usize = 0,
spawned: usize = 0,
notified: bool = false,
state: State = .pending,
const count_bits = (std.meta.bitCount(usize) - 4) / 2;
const Count = std.meta.Int(.unsigned, count_bits);
const State = enum(u2) {
pending = 0,
waking,
notified,
shutdown,
};
fn pack(self: IdleQueue) usize {
return ((@as(usize, @boolToInt(self.notified)) << 0) |
(@as(usize, @enumToInt(self.state)) << 1) |
(@as(usize, @intCast(Count, self.idle)) << 4) |
(@as(usize, @intCast(Count, self.spawned)) << (4 + count_bits)));
}
fn unpack(value: usize) IdleQueue {
return .{
.notified = value & (1 << 0) != 0,
.state = @intToEnum(State, @truncate(u2, value >> 1)),
.idle = @truncate(Count, value >> 4),
.spawned = @truncate(Count, value >> (4 + count_bits)),
};
}
pub const Notify = enum {
/// The IdleQueue state was just updated. Nothing left to do
updated,
/// A "waking" notification was performed indicating that an idle thread should be resumed
resumed,
/// A "waking" notification was performed indicating that a new thread should be spawned
spawned,
/// The notification request was ignored. Either due to a previous shutdown or notif token already being set.
ignored,
pub const Context = struct {
/// True if the caller is the "waking" thread who found work and is trying to transfer "waking" status.
is_waking: bool,
/// Max amount of threads being spawned at once if a spawn may occur.
max_spawn: usize,
};
};
pub fn notify(ptr: *usize, context: Notify.Context) Notify {
var ptr_value = @atomicLoad(usize, ptr, .Monotonic);
while (true) {
const self = IdleQueue.unpack(ptr_value);
if (self.state == .shutdown) {
return .ignored;
}
const can_wake = context.is_waking or self.state == .pending;
if (context.is_waking) {
std.debug.assert(self.state == .waking);
}
var result = Notify.updated;
var new_self = self;
new_self.notified = true;
if (self.idle > 0 and can_wake) {
new_self.state = .notified;
result = .resumed;
} else if (self.spawned < context.max_spawn and can_wake) {
new_self.spawned += 1;
result = .spawned;
} else if (context.is_waking) {
new_self.state = .pending; // if "waking" thread can't wake, relenquish "waking" status.
} else if (self.notified) {
return .ignored; // if notification token already set, nothing to do.
}
// Release barrier synchronises with Acquire in wait(),
// so the possibly resumed waiter can see any tasks made available before this notify().
ptr_value = IdleQueue.unpack@cmpxchgWeak(
usize,
ptr,
self.pack(),
new_self.pack(),
.Release,
.Monotonic,
) orelse return result;
}
}
pub const Wait = enum {
/// The caller was resumed/notified without being a "waking" thread.
running,
/// The caller was resumed/notified as the "waking" thread.
waking,
/// The caller failed to consume a "waking" or notification token and is now idle
waiting,
/// The idle/wait state transition was cancelled as a shutdown() was previously called.
shutdown,
pub const Context = enum {
/// The caller is a normal thread which didn't find any work.
running,
/// The caller is the "waking" thread which didn't find any work.
waking,
/// The caller was resumed having been in the idle/wait state.
waiting,
/// The caller's idle/wait state was interrupted as it thinks there are tasks available.
notified,
};
};
pub fn wait(ptr: *usize, context: Wait.Context) Wait {
var ptr_value = @atomicLoad(usize, ptr, .Monotonic);
while (true) {
const self = IdleQueue.unpack(ptr_value);
if (self.state == .shutdown) {
return .shutdown;
}
const is_waiting = switch (context) {
.running => false,
.waking => blk: {
std.debug.assert(self.state == .waking);
break :blk false;
},
.waiting, .notified => blk: {
std.debug.assert(self.idle > 0);
break :blk true;
},
};
var new_self = self;
var result: Wait = undefined;
if (self.notified or context == .notified) {
new_self.notified = false;
if (is_waiting) {
new_self.idle -= 1;
}
result = .running;
if (context.is_waking or self.state == .notified) {
new_self.state = .waking;
result = .waking;
}
} else if (!is_waiting) {
result = .waiting;
new_self.idle += 1;
if (context.is_waking) {
new_self.state = .pending;
}
} else {
return .waiting;
}
// Acquire barrier which synchronizes with the Release in notify()
// In order to see any tasks made avaialable by the matching notify().
ptr_value = IdleQueue.unpack@cmpxchgWeak(
usize,
ptr,
self.pack(),
new_self.pack(),
.Acquire,
.Monotonic,
) orelse return result;
}
}
pub fn beginShutdown(ptr: *usize) ?usize {
var ptr_value = @atomicLoad(usize, ptr, .Monotonic);
while (true) {
const self = IdleQueue.unpack(ptr_value);
if (self.state == .shutdown) {
return null;
}
var new_self = self;
new_self.idle = 0;
new_self.state = .shutdown;
ptr_value = IdleQueue.unpack@cmpxchgWeak(
usize,
ptr,
self.pack(),
new_self.pack(),
.AcqRel,
.Monotonic,
) orelse {
if (self.idle == 0) return null;
return self.idle;
};
}
}
pub fn waitForShutdown(ptr: *usize) bool {
var ptr_value = @atomicLoad(usize, ptr, .Acquire);
while (true) {
const self = IdleQueue.unpack(ptr_value);
std.debug.assert(self.state == .shutdown);
var new_self = self;
new_self.idle += 1;
if (self.spawned == 0) {
return false;
}
ptr_value = IdleQueue.unpack@cmpxchgWeak(
usize,
ptr,
self.pack(),
new_self.pack(),
.Acquire,
.Acquire,
) orelse return true;
}
}
pub fn notifyOneShutdown(ptr: *usize) ?usize {
const new_self = blk: {
const sub_self = IdleQueue{ .spawned = 1 };
const ptr_value = @atomicRmw(usize, ptr, .Sub, sub_self.pack(), .Release);
break :blk IdleQueue.unpack(ptr_value - sub_self.pack());
};
if (new_self.spawned == 0) {
return new_self.idle;
} else {
return null;
}
}
};
/// Unbounded Multi-Producer Single-Consumer LIFO queue for Tasks.
///
/// This acts as a generic sink for tasks produced by other threads.
/// It also has the property that it amortizes synchronization for the consumer.
const LifoQueue = struct {
stack: ?*Task = null,
local: ?*Task = null,
pub fn isEmpty(self: *const LifoQueue) callconv(.Inline) bool {
if (self.local != null) return false;
return @atomicLoad(?*Task, &self.stack, .Monotonic) == null;
}
pub fn push(self: *LifoQueue, group: Group) void {
std.debug.assert(group.len > 0);
const head = group.head;
const tail = group.tail;
tail.next = @atomicLoad(?*Task, &self.stack, .Monotonic);
while (true) {
tail.next = @cmpxchgWeak(
?*Task,
&self.stack,
tail.next,
head,
.Release,
.Monotonic,
) orelse break;
}
}
pub fn pop(self: *LifoQueue) ?*Task {
if (self.local) |task| {
self.local = task.next;
return task;
}
var stack = @atomicLoad(?*Task, &self.stack, .Monotonic);
if (stack != null) {
stack = @atomicRmw(?*Task, &self.stack, .Xchg, null, .Acquire);
std.debug.assert(stack != null);
}
const task = stack orelse return null;
self.locak = task.next;
return task;
}
};
/// Unbounded Multi-Producer Multi-Consumer FIFO queue for Tasks.
/// Based on Dmitry Vyukov's intrusive mpsc: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
///
/// The consumer-side is made multi by employing non-blocking mutual exclusion using an atomic flag.
/// It's better this way as, instead of blocking on the queue, threads can check other thread queues in the mean time.
///
/// The point in the algorithm in which pop() may falsely return null due to a preempted push() thread is also taken into account.
/// A disconnected pop() is just treated as empty since worst case is that the queue would be empty and the popping thread will fall asleep via wait().
/// This is fine as a push() is always followed by a notify() which ensures a thread waiting on the queue will eventually start consuming or see it empty.
/// This effectively delegates the consumer side blocking of the queue to the external wait()/notify() mechanisms which can do more efficient "blocking".
const FifoQueue = struct {
head: usize = 0,
tail: ?*Task = null,
stub: Task = Task{
.next = null,
.executeFn = undefined,
},
const HAS_CONSUMER: usize = 1 << 0;
/// Returns true if the queue is currently empty.
/// `head` and `tail` default to null for static initialization.
/// The algorithms account for this by treating null == &self.stub
pub fn isEmpty(self: *const FifoQueue) callconv(.Inline) bool {
const tail = @atomicLoad(?*Task, &self.tail, .Monotonic);
return (tail == null) or (tail == &self.stub);
}
pub fn push(self: *FifoQueue, group: Group) void {
std.debug.assert(group.len > 0);
const tail = @atomicRmw(?*Task, &self.tail, .Xchg, group.tail, .AcqRel);
const prev = tail orelse self.stub;
@atomicStore(?*Task, &prev.next, group.head, .Release);
}
pub fn tryAcquireConsumer(self: *FifoQueue) callconv(.Inline) ?Consumer {
// Fast path check of tail here improves perf by decreasing i-cache hit
// as most queues will be empty when worker threads are polling them.
if (self.isEmpty()) {
return null;
}
return self.tryAcquireConsumerSlow();
}
fn tryAcquireConsumerSlow(self: *FifoQueue) ?Consumer {
@setCold(true);
while (true) {
const head = @atomicLoad(usize, &self.head, .Monotonic);
if (head & HAS_CONSUMER != 0) {
return null;
}
_ = @cmpxchgWeak(
usize,
&self.head,
head,
head | HAS_CONSUMER,
.Acquire,
.Monotonic,
) orelse return Consumer{
.queue = self,
.head = @ptrToInt(?*Task, head) orelse &self.stub,
};
// re-check the head under contention in case the queue became empty
if (self.isEmpty()) {
return null;
}
}
}
pub const Consumer = struct {
queue: *FifoQueue,
head: *Task,
pub fn release(self: Consumer) void {
// Update the head with the pop()-updated head
// while at the same time unsetting the HAS_CONSUMER bit.
const new_head = @ptrToInt(self.head);
@atomicStore(usize, &self.queue.head, new_head, .Release);
}
pub fn pop(self: *Consumer) ?*Task {
var head = self.head;
const stub = &self.queue.stub;
if (head == stub) {
head = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null;
self.head = head;
}
if (@atomicLoad(?*Task, &head.next, .Acquire)) |next| {
self.head = next;
return head;
}
const tail = @atomicLoad(?*Task, &self.tail, .Acquire);
if (tail != head) {
return null;
}
self.queue.push(Group.from(stub));
const next = @atomicLoad(?*Task, &head.next, .Acquire) orelse return null;
self.head = next;
return head;
}
};
};
/// Bounded Single-Producer, Multi-Consumer queue for Tasks.
/// Based on the Go scheduler's runq: https://tokio.rs/blog/2019-10-scheduler#a-better-run-queue
///
/// This acts as the primary queue for workers when pushing/popping Tasks.
/// The capacity was chosen to be a nice tradeoff for throughput vs memory usage under heavy task load.
///
/// push() is cheap in that it only does a store(Release) at most which is free on x86 but slightly costly on other platforms.
/// Given the queue is bounded, it returns the Group of tasks which overflow if it is unable to fill it's buffer.
/// steal() from other bounded queues grab half of the tasks at once which amortizes the contention with the queue owner's pop().
///
/// It was opted not to migrate half of the buffer on push() overflow like the original algorithm
/// as it was measured that handling overflow via an FifoQueue was cheaper than having to link the migrated tasks together due to:
/// - assumed cache effects of re-accessing the migrated tasks. This is still a major slowdown for steal(*FifoQueue).
/// - push()ing to worker-local wait-free FifoQueue being cheaper than pushing to shared Mutex protected injector queue.
const BufferQueue = extern struct {
head: Index = 0,
tail: Index = 0,
buffer: [capacity]*Task = undefined,
const Index = std.meta.Int(.unsigned, std.meta.bitCount(usize) / 2);
const capacity = 256;
comptime {
std.debug.assert(std.math.maxInt(Index) >= capacity);
}
pub fn isEmpty(self: *const BufferQueue) bool {
var head: Index = undefined;
var tail: Index = undefined;
// For platforms known to support it, try to load both the head & tail in one atomic load instead of two.
switch (std.builtin.cpu.arch) {
.i386, .x86_64, .aarch64 => {
comptime std.debug.assert(@byteOffsetOf(BufferQueue, "head") == 0);
comptime std.debug.assert(@byteOffsetOf(BufferQueue, "tail") == @sizeOf(Index));
const head_tail = @bitCast([2]Index, @atomicLoad(usize, @ptrCast(*const usize, self), .Monotonic));
head = head_tail[0];
tail = head_tail[1];
},
else => {
head = @atomicLoad(Index, &self.head, .Acquire);
tail = @atomicLoad(Index, &self.tail, .Monotonic);
},
}
return (tail == head) or (tail == (head -% 1));
}
pub fn push(self: *BufferQueue, group: Group) ?Group {
// Preemptively check if group is too much without actually looking at current size
std.debug.assert(group.len > 0);
if (group.len > capacity) {
return group;
}
var tail = self.tail;
const head = @atomicLoad(Index, &self.head, .Monotonic));
const size = tail -% head;
std.debug.assert(size <= capacity);
// Overflow if we cant fit the whole group in one go.
// This avoids the cache effects of accessing the tasks if some will be overflowed anyway.
const free_slots = capacity - size;
if (group.len > free_slots) {
return group;
}
var tasks = group;
while (tasks.pop()) |task| {
@atomicStore(*Task, &self.buffer[tail % capacity], task, .Unordered);
tail +%= 1;
}
@atomicStore(Index, &self.tail, tail, .Release);
return null;
}
pub const Pop = struct {
task: *Task,
pushed: bool = false,
pub const Context = struct {
be_fair: bool = false,
is_lifo: bool = false,
};
};
pub fn pop(self: *BufferQueue, context: Pop.Context) ?Pop {
const tail = self.tail;
var head = @atomicLoad(Index, &self.head, .Monotonic);
if (head == tail) {
return null;
}
const be_fair = context.be_fair or !context.is_lifo;
while (be_fair) {
if (head == tail) {
return null;
}
head = @cmpxchgWeak(
Index,
&self.head,
head,
head +% 1,
.Acquire,
.Monotonic,
) orelse return Pop{
.task = self.buffer[head % capacity],
};
}
const new_tail = tail -% 1;
@atomicStore(Index, &self.tail, new_tail, .SeqCst);
head = @atomicLoad(Index, &self.head, .SeqCst);
std.debug.assert(tail -% head <= capacity);
var task: ?*Task = null;
if (head != tail) {
task = self.buffer[new_tail % capacity];
if (head != new_tail) {
return task;
}
if (@cmpxchgStrong(Index, &self.head, head, tail, .SeqCst, .Monotonic)) |new_head| {
task = null;
std.debug.assert(new_head == tail);
}
}
@atomicStore(Index, &self.tail, tail, .Monotonic);
return task;
}
pub fn steal(noalias self: *BufferQueue, noalias target: anytype, context: Pop.Context) ?Pop {
const is_unbounded = switch (@TypeOf(target)) {
*BufferQueue => false,
*FifoQueue => true,
else => |T| @compileError("Cannot steal from " ++ @typeName(T)),
};
if (is_unbounded) {
var consumer = target.tryAcquireConsumer() orelse return null;
defer consumer.release();
// TODO: Figure out a good limit to the max amount of Tasks
// that can be popped from the consumer to reduce the cache
// polution effects of iterating an intrusive linked list.
const MAX_PUSH = std.math.maxInt(usize);
const tail = self.tail;
const max_push = blk: {
const head = @atomicLoad(Index, &self.head, .Unordered);
const size = tail -% head;
std.debug.assert(size <= capacity);
const free_slots = capacity - size;
break :blk std.math.min(free_slots, MAX_PUSH);
};
var pushed: Index = 0;
while (pushed < max_push) : (pushed += 1) {
const task = consumer.pop() orelse break;
const index = (tail +% pushed) % capacity;
@atomicStore(*Task, &self.buffer[index], task, .Unordered);
}
var task = consumer.pop();
if (task == null and pushed > 0) {
pushed -= 1;
task = self.buffer[(tail +% pushed) % capacity];
}
if (pushed > 0) @atomicStore(Index, &self.tail, tail +% pushed, .Release);
return Pop{
.task = task,
.pushed = pushed > 0,
};
}
const tail = self.tail;
const head = @atomicLoad(usize, &self.head, .Monotonic);
const size = tail -% head;
std.debug.assert(size == 0);
var target_head = @atomicLoad(Index, &target.head, .SeqCst);
while (true) {
const target_tail = @atomicLoad(Index, &target.tail, .SeqCst);
if (context.is_lifo and (target_tail == (target_head -% 1))) {
return null;
}
const target_size = target_tail -% target_head;
if (target_size == 0) {
return null;
}
var target_steal = target_size - (target_size / 2);
if (context.is_lifo) target_steal = 1;
if (target_steal > capacity / 2) {
target_head = @atomicLoad(Index, &target.head, .Acquire);
continue;
}
var pushed: usize = 0;
while (pushed < target_steal) : (pushed += 1) {
const task = @atomicLoad(*Task, &target.buffer[(target_head +% pushed) % capacity], .Unordered);
@atomicStore(*Task, &self.buffer[(tail +% pushed) % capacity], task, .Unordered);
}
if (@cmpxchgStrong(
Index,
&target.head,
target_head,
target_head +% target_steal,
.AcqRel,
.Acquire,
)) |updated| {
target_head = updated;
continue;
}
pushed -= 1;
if (pushed > 0) {
@atomicStore(Index, &self.tail, tail +% pushed, .Release);
}
return Pop{
.task = self.buffer[(tail +% pushed) % capacity],
.pushed = pushed > 0,
};
}
}
};
const std = @import("std");
const Task = @import("./Task.zig");
const ThreadPool = @This();
scheduler: Task.Scheduler,
optimize_for: ?OptimizeFor,
use_caller_thread: bool,
idle_queue: AtomicUsize = .{},
join_event: std.Thread.StaticResetEvent = .{},
pub const Config = struct {
max_threads: ?usize = null,
optimize_for: ?OptimizeFor = null,
pub const OptimizeFor = enum {
Latency,
Throughput,
};
};
pub fn init(config: Config) ThreadPool {
const max_threads = if (std.builtin.single_threaded) @as(usize, 1) else blk: {
const max_threads = config.max_threads orelse std.Thread.cpuCount() catch 1;
break :blk std.math.max(1, max_threads);
};
return ThreadPool{
.scheduler = Task.Scheduler.init(.{
.system = &system_impl,
.max_workers = max_threads,
}),
.optimize_for = config.optimize_for,
.use_caller_thread = false,
};
}
pub fn deinit(self: *ThreadPool) void {
self.scheduler.deinit();
self.join_event.deinit();
self.* = undefined;
}
pub const StartConfig = struct {
use_caller_thread: bool = std.builtin.single_threaded,
first_runnable: ?*Runnable = null,
};
pub fn start(self: *ThreadPool, config: StartConfig) void {
self.use_caller_thread = std.builtin.single_threaded or config.use_caller_thread;
if (self.use_caller_thread) {
std.debug.assert(self.first_runnable != null);
}
if (self.first_runnable) |runnable| {
self.scheduler.schedule(Task.Batch.from(&runnable.task));
}
}
pub fn stop(self: *ThreadPool) void {
self.scheduler.shutdown();
}
pub const Runnable = struct {
task: Task = undefined,
runFn: fn(*Runnable) void,
};
pub fn schedule(self *ThreadPool, runnable: *Runnable) void {
const batch = Task.Batch.from(&runnable.task);
if (Thread.getCurrent()) |thread| {
thread.worker.schedule(batch);
} else {
self.scheduler.schedule(batch);
}
}
pub const SpawnConfig = struct {
allocator: *std.mem.Allocator,
};
pub fn spawn(
self: *ThreadPool,
config: SpawnConfig,
comptime function: anytype,
parameters: anytype,
) !void {
const Params = @TypeOf(parameters);
const Closure = struct {
params: Params,
allocator: *std.mem.Allocator,
runnable: Runnable = .{ .runFn = run },
fn run(runnable: *Runnable) void {
const self = @fieldParentPtr(@This(), "runnable", runnable);
const result = @call(.{}, function, self.params);
self.allocator.destroy(self);
}
};
const allocator = config.allocator;
const closure = try allocator.create(Closure);
errdefer allocator.destroy(closure);
closure.* = .{
.params = parameters,
.allocator = allocator,
};
return self.schedule(&closure.runnable);
}
const IDLE_QUEUE_EMPTY: usize = 0;
const IDLE_QUEUE_NOTIFIED: usize = 1;
const IDLE_QUEUE_SHUTDOWN: usize = 2;
fn idleWait(self: *ThreadPool, thread: *Thread) void {
var idle_queue = self.idle_queue.load(.Monotonic);
while (true) {
if (idle_queue.value == IDLE_QUEUE_SHUTDOWN) {
return;
}
var new_idle_queue: usize = undefined;
if (idle_queue.value == IDLE_QUEUE_NOTIFIED) {
new_idle_queue = IDLE_QUEUE_EMPTY;
} else {
const next = @intToPtr(?*Thread, idle_queue.value);
@atomicStore(?*Thread, &thread.idle_next, next, .Unordered);
new_idle_queue = @ptrToInt(thread);
}
if (idle_queue.tryCompareAndSwap(
idle_queue,
new_idle_queue,
.AcqRel,
.Monotonic,
)) |updated| {
idle_queue = updated;
continue;
}
if (idle_queue.value != IDLE_QUEUE_NOTIFIED) {
thread.idle_event.wait();
thread.idle_event.reset();
}
return;
}
}
fn idleWake(self: *ThreadPool, shutdown: bool) void {
if (shutdown) {
const idle_queue = @atomicRmw(
usize,
&self.idle_queue.value,
.Xchg,
IDLE_QUEUE_SHUTDOWN,
.AcqRel,
);
std.debug.assert(idle_queue != IDLE_QUEUE_SHUTDOWN);
if (idle_queue != IDLE_QUEUE_NOTIFIED) {
var idle_threads = @intToPtr(?*Thread, idle_queue);
while (idle_threads) |thread| {
idle_threads = thread.idle_next;
thread.idle_event.set();
}
}
return;
}
var idle_queue = self.idle_queue.load(.Acquire);
while (true) {
const new_idle_queue = switch (idle_queue.value) {
IDLE_QUEUE_EMPTY => IDLE_QUEUE_NOTIFIED,
IDLE_QUEUE_SHUTDOWN => return,
IDLE_QUEUE_NOTIFIED => return,
else => |idle_queue| blk: {
const thread = @intToPtr(*Thread, idle_queue);
const next = @atomicLoad(?*Thread, &thread.idle_next, .Unordered);
break :blk @ptrToInt(next);
},
};
if (idle_queue.tryCompareAndSwap(
idle_queue,
new_idle_queue,
.AcqRel,
.Acquire,
)) |updated| {
idle_queue = updated;
continue;
}
if (new_idle_queue != IDLE_QUEUE_NOTIFIED) {
const thread = @intToPtr(*Thread, idle_queue.value);
thread.idle_event.set();
}
return;
}
}
const Thread = struct {
worker: Task.Worker,
handle: ?*std.Thread,
idle_event: std.Thread.StaticResetEvent = .{},
exit_event: std.Thread.StaticResetEvent = .{},
sched_tick: u8 = 0,
iter: Task.Worker.Iter = .{},
fn getCurrent() ?*Thread {
return @intToPtr(?*Thread, ThreadLocal.get());
}
fn trySpawn(pool: *ThreadPool) bool {
if (pool.use_caller_thread) {
pool.use_caller_thread = false;
Thread.run(pool, null);
return true;
}
const Spawner = struct {
_pool: *ThreadPool,
_handle: *std.Thread = undefined,
put_event: std.Thread.StaticResetEvent = .{},
got_event: std.Thread.StaticResetEvent = .{},
fn run(self: *@This()) void {
self.put_event.wait();
const _pool = self._pool;
const _handle = self._handle;
self.got_event.set();
Thread.run(_pool, _handle);
}
};
var spawner = Spawner{ ._pool = pool };
spawner._handle = std.Thread.spawn(Spawner.run, &spawner);
spawner.put_event.set();
spawner.got_event.wait();
spawner.put_event.deinit();
spawner.got_event.deinit();
return true;
}
fn run(pool: *ThreadPool, handle: ?*std.Thread) void {
var self = Thread{
.worker = undefined,
.handle = handle,
.tick_trigger = blk: {
const optimize_for = pool.optimize_for orelse break :blk 61;
break :blk switch (optimize_for) {
.Latency => 0,
.Throughput => null,
};
},
};
ThreadLocal.set(@ptrToInt(&self));
self.worker.run(&pool.scheduler);
}
};
const ThreadLocal = if (std.Target.current.isDarwin())
struct {
const pthread_key_t = c_ulong;
extern "c" fn pthread_key_create(key: *pthread_key_t, destructor: ?fn (value: *c_void) callconv(.C) void) c_int;
extern "c" fn pthread_getspecific(key: pthread_key_t) ?*c_void;
extern "c" fn pthread_setspecific(key: pthread_key_t, value: ?*c_void) c_int;
const dispatch_once_t = usize;
const dispatch_function_t = fn (?*c_void) callconv(.C) void;
extern "c" fn dispatch_once_f(predicate: *dispatch_once_t, context: ?*c_void, function: dispatch_function_t) void;
var tls_once: dispatch_once_t = 0;
var tls_key: pthread_key_t = undefined;
fn tls_init(_: ?*c_void) callconv(.C) void {
std.debug.assert(pthread_key_create(&tls_key, null) == 0);
}
fn get() usize {
dispatch_once_f(&tls_once, null, tls_init);
return @ptrToInt(pthread_getspecific(tls_key));
}
fn set(value: usize) void {
dispatch_once_f(&tls_once, null, tls_init);
std.debug.assert(pthread_setspecific(tls_key, @intToPtr(?*c_void, value)) == 0);
}
}
else
struct {
threadlocal var tls_value: usize = 0;
fn get() usize {
return tls_value;
}
fn set(value: usize) void {
tls_value = value;
}
};
const AtomicUsize = switch (std.Target.current.cpu.arch) {
.i386, .x86_64 => extern struct {
const DoubleWord = std.meta.Int(.unsigned, std.meta.bitCount(usize) * 2);
value: usize align(@alignOf(DoubleWord))= 0,
aba_tag: usize = 0,
pub fn load(
self: *const AtomicUsize,
comptime order: std.builtin.AtomicOrder,
) AtomicUsize {
return AtomicUsize{
.value = @atomicLoad(usize, &self.value, order),
.aba_tag = @atomicLoad(usize, &self.aba_tag, .Unordered),
};
}
pub fn tryCompareAndSwap(
self: *AtomicUsize,
compare: AtomicUsize,
exchange: usize,
comptime success: std.builtin.AtomicOrder,
comptime failure: std.builtin.AtomicOrder,
) ?AtomicUsize {
const double_word = @cmpxchgWeak(
DoubleWord,
@ptrCast(*DoubleWord, self),
@bitCast(DoubleWord, compare),
@bitCast(DoubleWord, AtomicUsize{
.value = exchange,
.aba_tag = compare.aba_tag +% 1,
}),
success,
failure,
) orelse return null;
return @bitCast(AtomicUsize, double_word);
}
},
else => extern struct {
value: usize = 0,
pub fn load(
self: *const AtomicUsize,
comptime order: std.builtin.AtomicOrder,
) AtomicUsize {
const value = @atomicLoad(usize, &self.value, order);
return AtomicUsize{ .value = value };
}
pub fn tryCompareAndSwap(
self: *AtomicUsize,
compare: AtomicUsize,
exchange: usize,
comptime success: std.builtin.AtomicOrder,
comptime failure: std.builtin.AtomicOrder,
) ?AtomicUsize {
const value = @cmpxchgWeak(
usize,
&self.value,
compare.value,
exchange,
success,
failure,
) orelse return null;
return AtomicUsize{ .value = value };
}
},
};
var system_impl = Task.System{ .callFn = systemCall };
fn systemCall(_: *Task.System, command: Task.System.Command) void {
switch (command) {
.Spawn => |cmd| {
const pool = @fieldParentPtr(ThreadPool, "scheduler", cmd.scheduler);
cmd.spawned.* = Thread.trySpawn(pool);
},
.Join => |cmd| switch (cmd) {
.Wait => |scheduler| {
const pool = @fieldParentPtr(ThreadPool, "scheduler", scheduler);
pool.join_event.wait();
},
.Wake => |scheduler| {
const pool = @fieldParentPtr(ThreadPool, "scheduler", scheduler);
pool.join_event.set();
},
},
.Resume => |cmd| switch (cmd) {
.Exit => |worker| {
const thread = @fieldParentPtr(Thread, "worker", worker);
if (thread.handle) |handle| {
thread.exit_event.set();
handle.wait();
}
},
.Signal => |scheduler| {
const pool = @fieldParentPtr(ThreadPool, "scheduler", scheduler);
pool.idleWake(false);
},
.Broadcast => |scheduler| {
const pool = @fieldParentPtr(ThreadPool, "scheduler", scheduler);
pool.idleWake(true);
},
},
.Suspend => |cmd| switch (cmd) {
.Exit => |worker| {
const thread = @fieldParentPtr(Thread, "worker", worker);
if (thread.handle) |handle| {
thread.exit_event.wait();
}
},
.Wait => |wait| {
const thread = @fieldParentPtr(Thread, "worker", wait.worker);
const pool = @fieldParentPtr(ThreadPool, "scheduler", wait.worker.getScheduler());
wait.scheduled.* = false;
pool.idleWait(thread);
},
},
.Poll => |cmd| {
const thread = @fieldParentPtr(Thread, "worker", cmd.worker);
const pool = @fieldParentPtr(ThreadPool, "scheduler", cmd.worker.getScheduler());
const max_attempts: usize = blk: {
const optimize_for = pool.optimize_for orelse break :blk 8;
break :blk switch (optimize_for) {
.Latency => 4,
.Throughput => 32,
};
};
if (cmd.attempt >= max_attempts) {
cmd.targets.* = null;
return;
}
const be_fair: bool = blk: {
const optimize_for = pool.optimize_for orelse thread.sched_tick % 61 == 0;
break :blk switch (optimize_for) {
.Latency => true,
.Throughput => false,
}
};
},
.Execute => |cmd| {
const thread = @fieldParentPtr(Thread, "worker", cmd.worker);
const runnable = @fieldParentPtr(Runnable, "task", cmd.task);
thread.sched_tick +%= 1;
(runnable.runFn)(runnable);
},
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment