Skip to content

Instantly share code, notes, and snippets.

@marler8997
Last active December 16, 2020 01:16
Show Gist options
  • Save marler8997/236ec0612db78752fea6c8c278333313 to your computer and use it in GitHub Desktop.
Save marler8997/236ec0612db78752fea6c8c278333313 to your computer and use it in GitHub Desktop.
const std = @import("std");
const Allocator = std.mem.Allocator;
const ArrayList = std.ArrayList;
const Mutex = std.Mutex;
const Thread = std.Thread;
fn log(comptime fmt: []const u8, args: anytype) void {
std.debug.warn(fmt ++ "\n", args);
}
pub const Task = struct {
placeholder: u32,
pub fn run(self: *Task, scheduler: *Scheduler) anyerror!?*Task {
log("TODO: implement Task.run (placeholder={})", .{self.placeholder});
//return error.NotImpl;
return null;
}
};
// TODO: functions to implement!!!
fn spawn(func: *const fn(*Scheduler,?*Task) void, scheduler: *Scheduler, task: ?*Task) void {
log("TODO: implement spawn task={}", .{task});
}
// This function sends a task to a worker
fn sendTask(tid: Thread.Id, task: ?*Task) void {
log("TODO: implement send tid={} task={}", .{tid, task});
}
// This function blocks a worker until it receives a task, returning NULL indicates the worker is done
fn receiveTask(comptime T: type) ?*T {
log("TODO: implement receiveTask", .{});
return null;
}
/// A parallel scheduler. Call schedule() to add a task to be executed.
/// Threads will be spawned dynamically as they are needed.
/// This is generic enough to be a candidate for the standard library.
pub const Scheduler = struct {
/// owner is the thread id of the thread that is responsible for cleaning the scheduler.
/// The scheduler cannot be cleaned up until all threads have exited or at least
/// dropped their references to the mutex. (see mutex_borrow_count).
owner: Thread.Id,
/// the maximum number of threads that can be spawned to handle scheduled tasks
max_spawn_count: u32,
mutex: Mutex,
shared_locked_data: LockedData,
// This data must be used within the mutex lock while the scheduler is running
const LockedData = struct {
/// saves the first uncaught exception on any of the threads
uncaught_error: ?anyerror = null,
/// when true, the scheduler will stop itself once there are no active threads
stop_when_zero_active_threads: bool = false,
/// while stopping, mutex_borrow_count cannot be incremented, new scheduled tasks
/// will be ignored and threads cannot be added to idle.
/// stopping should be checked each time a thread enters the lock.
stopping: bool = false,
pending_tasks: ArrayList(*Task),
/// The number of threads that have been spawned by the scheduler. This is only used to limit
/// the number of new threads from being spawned, it does not necessarily match the number of
/// threads currently running.
spawn_count: usize = 0,
/// The number of non-owner threads that have a reference to the mutex and can lock on it.
/// The owner cannot exit/cleanup the scheduler until this reaches 0. Also, while 'stopping',
/// this value should never be incremented.
mutex_borrow_count: usize = 0,
/// The number of threads that are active. When this drops to 0 the scheduler will initiate the stop
/// sequence so long as stop_when_zero_active_threads is true
active_thread_count: usize = 0,
/// The set of threads that are currently waiting for a task. Send a null task to stop them.
idle: ArrayList(Thread.Id),
/// Set `stopping` to true to enter stopping mode, and signal all the idle threads to stop.
pub fn stop(self: *LockedData, owner: Thread.Id) void {
std.debug.assert(!self.stopping); // already stopped
for (self.idle.items) |tid| {
// do not stop the owner until all tasks are stopped
if (tid != owner) {
sendTask(tid, null);
}
}
self.stopping = true;
}
};
pub fn init(allocator: *Allocator, owner: Thread.Id, max_spawn_count: u32) Scheduler {
return .{
.owner = owner,
.max_spawn_count = max_spawn_count,
.mutex = .{},
.shared_locked_data = .{
.pending_tasks = ArrayList(*Task).init(allocator),
.idle = ArrayList(Thread.Id).init(allocator),
},
};
}
pub fn deinit(self: Scheduler) void {
self.shared_locked_data.pending_tasks.deinit();
self.shared_locked_data.idle.deinit();
}
pub const Lock = struct {
data: *LockedData,
held: Mutex.Held,
pub fn unlock(self: Lock) void {
self.held.release();
}
};
/// lock the scheduler to access shared data
pub fn lock(self: *Scheduler) Lock
{
return .{ .data = &self.shared_locked_data, .held = self.mutex.acquire() };
}
/// schedule a task to be executed
pub fn schedule(self: *Scheduler, task: *Task) !void {
const locked = self.lock();
defer locked.unlock();
if (locked.data.stopping) return; // ignore new tasks when stopping
if (locked.data.idle.popOrNull()) |tid| {
sendTask(tid, task);
locked.data.active_thread_count += 1; // indicates work is still being done
return;
}
if (locked.data.spawn_count < self.max_spawn_count)
{
spawn(&threadLoop, self, task);
locked.data.spawn_count += 1;
// this new thread cannot be the owner because it was just created
locked.data.mutex_borrow_count += 1;
locked.data.active_thread_count += 1; // indicates work is still being done
}
else
{
// save the task to be executed when a thread becomes available
try locked.data.pending_tasks.append(task);
}
}
/// Service the scheduler with the current thread. If this thread is the owner then it will
/// not return until all other threads have exited or at least dropped their reference to
/// the mutex. This means if this thread is the owner, it is safe to cleanup the scheduler
/// memory once this function returns.
pub fn runOnCurrentThread(self: *Scheduler, enable_stop_when_zero_active_threads: bool, initial_task: ?*Task) error{OutOfMemory}!void {
var stopping : bool = false;
{
const locked = self.lock();
defer locked.unlock();
stopping = locked.data.stopping;
if (!stopping)
{
if (enable_stop_when_zero_active_threads)
{
locked.data.stop_when_zero_active_threads = true;
if (initial_task == null and locked.data.active_thread_count == 0)
{
locked.data.stop(self.owner);
stopping = true;
}
}
if (!stopping)
{
if (Thread.getCurrentId() != self.owner)
locked.data.mutex_borrow_count += 1; // this thread can reference the mutex
if (initial_task != null)
locked.data.active_thread_count += 1;
}
}
}
if (stopping) {
if (Thread.getCurrentId() == self.owner) {
std.debug.assert(null == receiveTask(Task));
}
} else {
threadLoop(self, initial_task);
}
}
pub fn runTasks(self: *Scheduler, task: *Task) !void {
var optional_next_task = try task.run(self);
while (true) {
if (optional_next_task) |next_task| {
optional_next_task = try next_task.run(self);
} else break;
}
}
fn threadLoopInner(scheduler: *Scheduler, optional_initial_task: ?*Task, active: *bool) anyerror!void {
if (optional_initial_task) |initial_task|
try scheduler.runTasks(initial_task);
while (true) {
// handle all pending_tasks first
while (true) {
const next_task = init: {
const locked = scheduler.lock();
defer locked.unlock();
if (locked.data.stopping) return;
if (locked.data.pending_tasks.popOrNull()) |next_task| break :init next_task;
std.debug.assert(active.*);
std.debug.assert(locked.data.active_thread_count > 0);
locked.data.active_thread_count -= 1;
active.* = false;
if (locked.data.active_thread_count == 0 and locked.data.stop_when_zero_active_threads)
{
locked.data.stop(scheduler.owner);
return;
}
try locked.data.idle.append(Thread.getCurrentId());
break;
};
try scheduler.runTasks(next_task);
}
if (receiveTask(Task)) |task| {
// in this case, the active_thread_count wil have been incremented for us
active.* = true;
try scheduler.runTasks(task);
} else break;
}
}
// NOTE: The mutex_borrow_count will have been incremented for every thread that enters this function,
// except the owner thread. It is this function's responsibility to decrement the mutex_borrow_count
// on exit and notify the owner thread if it reaches 0.
// Also, if the initial_task is non-null, then the active_thread_count will also have been incremented
// and it is this function's responsibility to decrement it when it is no longer active.
fn threadLoop(scheduler: *Scheduler, optional_initial_task: ?*Task) void {
// indicates this thread is contributing to the active_thread_count
var active = optional_initial_task != null;
threadLoopInner(scheduler, optional_initial_task, &active) catch |err| {
const locked = scheduler.lock();
defer locked.unlock();
if (locked.data.uncaught_error == null)
locked.data.uncaught_error = err;
};
const current_mutex_borrow_count = init: {
const locked = scheduler.lock();
defer locked.unlock();
if (Thread.getCurrentId() != scheduler.owner)
locked.data.mutex_borrow_count -= 1;
const save_mutex_borrow_count = locked.data.mutex_borrow_count;
if (active) {
std.debug.assert(locked.data.active_thread_count > 0);
locked.data.active_thread_count -= 1;
if (locked.data.active_thread_count == 0 and !locked.data.stopping)
{
// we are going to stop whether or not stop_when_zero_active_threads is
// enabled because this would constitute an error code path
locked.data.stop(scheduler.owner);
}
}
break :init save_mutex_borrow_count;
};
if (Thread.getCurrentId() == scheduler.owner) {
if (current_mutex_borrow_count > 0)
{
// flush any pending tasks
while (true) {
if (receiveTask(Task)) |_| {
} else break;
}
}
}
else
{
if (current_mutex_borrow_count == 0)
sendTask(scheduler.owner, null);
}
}
};
test "scheduler" {
log("", .{});
const max_job_count = 10;
var scheduler = Scheduler.init(std.testing.allocator, Thread.getCurrentId(), max_job_count);
defer scheduler.deinit();
var tasks = [_]Task {
.{ .placeholder = 0 },
.{ .placeholder = 1 },
.{ .placeholder = 2 },
};
for (tasks[1..]) |*task| {
try scheduler.schedule(task);
}
try scheduler.runOnCurrentThread(false,&tasks[0]);
{
const locked = scheduler.lock();
defer locked.unlock();
if (locked.data.uncaught_error) |err|
return err;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment