Last active
December 16, 2020 01:16
-
-
Save marler8997/236ec0612db78752fea6c8c278333313 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const Allocator = std.mem.Allocator; | |
const ArrayList = std.ArrayList; | |
const Mutex = std.Mutex; | |
const Thread = std.Thread; | |
fn log(comptime fmt: []const u8, args: anytype) void { | |
std.debug.warn(fmt ++ "\n", args); | |
} | |
pub const Task = struct { | |
placeholder: u32, | |
pub fn run(self: *Task, scheduler: *Scheduler) anyerror!?*Task { | |
log("TODO: implement Task.run (placeholder={})", .{self.placeholder}); | |
//return error.NotImpl; | |
return null; | |
} | |
}; | |
// TODO: functions to implement!!! | |
fn spawn(func: *const fn(*Scheduler,?*Task) void, scheduler: *Scheduler, task: ?*Task) void { | |
log("TODO: implement spawn task={}", .{task}); | |
} | |
// This function sends a task to a worker | |
fn sendTask(tid: Thread.Id, task: ?*Task) void { | |
log("TODO: implement send tid={} task={}", .{tid, task}); | |
} | |
// This function blocks a worker until it receives a task, returning NULL indicates the worker is done | |
fn receiveTask(comptime T: type) ?*T { | |
log("TODO: implement receiveTask", .{}); | |
return null; | |
} | |
/// A parallel scheduler. Call schedule() to add a task to be executed. | |
/// Threads will be spawned dynamically as they are needed. | |
/// This is generic enough to be a candidate for the standard library. | |
pub const Scheduler = struct { | |
/// owner is the thread id of the thread that is responsible for cleaning the scheduler. | |
/// The scheduler cannot be cleaned up until all threads have exited or at least | |
/// dropped their references to the mutex. (see mutex_borrow_count). | |
owner: Thread.Id, | |
/// the maximum number of threads that can be spawned to handle scheduled tasks | |
max_spawn_count: u32, | |
mutex: Mutex, | |
shared_locked_data: LockedData, | |
// This data must be used within the mutex lock while the scheduler is running | |
const LockedData = struct { | |
/// saves the first uncaught exception on any of the threads | |
uncaught_error: ?anyerror = null, | |
/// when true, the scheduler will stop itself once there are no active threads | |
stop_when_zero_active_threads: bool = false, | |
/// while stopping, mutex_borrow_count cannot be incremented, new scheduled tasks | |
/// will be ignored and threads cannot be added to idle. | |
/// stopping should be checked each time a thread enters the lock. | |
stopping: bool = false, | |
pending_tasks: ArrayList(*Task), | |
/// The number of threads that have been spawned by the scheduler. This is only used to limit | |
/// the number of new threads from being spawned, it does not necessarily match the number of | |
/// threads currently running. | |
spawn_count: usize = 0, | |
/// The number of non-owner threads that have a reference to the mutex and can lock on it. | |
/// The owner cannot exit/cleanup the scheduler until this reaches 0. Also, while 'stopping', | |
/// this value should never be incremented. | |
mutex_borrow_count: usize = 0, | |
/// The number of threads that are active. When this drops to 0 the scheduler will initiate the stop | |
/// sequence so long as stop_when_zero_active_threads is true | |
active_thread_count: usize = 0, | |
/// The set of threads that are currently waiting for a task. Send a null task to stop them. | |
idle: ArrayList(Thread.Id), | |
/// Set `stopping` to true to enter stopping mode, and signal all the idle threads to stop. | |
pub fn stop(self: *LockedData, owner: Thread.Id) void { | |
std.debug.assert(!self.stopping); // already stopped | |
for (self.idle.items) |tid| { | |
// do not stop the owner until all tasks are stopped | |
if (tid != owner) { | |
sendTask(tid, null); | |
} | |
} | |
self.stopping = true; | |
} | |
}; | |
pub fn init(allocator: *Allocator, owner: Thread.Id, max_spawn_count: u32) Scheduler { | |
return .{ | |
.owner = owner, | |
.max_spawn_count = max_spawn_count, | |
.mutex = .{}, | |
.shared_locked_data = .{ | |
.pending_tasks = ArrayList(*Task).init(allocator), | |
.idle = ArrayList(Thread.Id).init(allocator), | |
}, | |
}; | |
} | |
pub fn deinit(self: Scheduler) void { | |
self.shared_locked_data.pending_tasks.deinit(); | |
self.shared_locked_data.idle.deinit(); | |
} | |
pub const Lock = struct { | |
data: *LockedData, | |
held: Mutex.Held, | |
pub fn unlock(self: Lock) void { | |
self.held.release(); | |
} | |
}; | |
/// lock the scheduler to access shared data | |
pub fn lock(self: *Scheduler) Lock | |
{ | |
return .{ .data = &self.shared_locked_data, .held = self.mutex.acquire() }; | |
} | |
/// schedule a task to be executed | |
pub fn schedule(self: *Scheduler, task: *Task) !void { | |
const locked = self.lock(); | |
defer locked.unlock(); | |
if (locked.data.stopping) return; // ignore new tasks when stopping | |
if (locked.data.idle.popOrNull()) |tid| { | |
sendTask(tid, task); | |
locked.data.active_thread_count += 1; // indicates work is still being done | |
return; | |
} | |
if (locked.data.spawn_count < self.max_spawn_count) | |
{ | |
spawn(&threadLoop, self, task); | |
locked.data.spawn_count += 1; | |
// this new thread cannot be the owner because it was just created | |
locked.data.mutex_borrow_count += 1; | |
locked.data.active_thread_count += 1; // indicates work is still being done | |
} | |
else | |
{ | |
// save the task to be executed when a thread becomes available | |
try locked.data.pending_tasks.append(task); | |
} | |
} | |
/// Service the scheduler with the current thread. If this thread is the owner then it will | |
/// not return until all other threads have exited or at least dropped their reference to | |
/// the mutex. This means if this thread is the owner, it is safe to cleanup the scheduler | |
/// memory once this function returns. | |
pub fn runOnCurrentThread(self: *Scheduler, enable_stop_when_zero_active_threads: bool, initial_task: ?*Task) error{OutOfMemory}!void { | |
var stopping : bool = false; | |
{ | |
const locked = self.lock(); | |
defer locked.unlock(); | |
stopping = locked.data.stopping; | |
if (!stopping) | |
{ | |
if (enable_stop_when_zero_active_threads) | |
{ | |
locked.data.stop_when_zero_active_threads = true; | |
if (initial_task == null and locked.data.active_thread_count == 0) | |
{ | |
locked.data.stop(self.owner); | |
stopping = true; | |
} | |
} | |
if (!stopping) | |
{ | |
if (Thread.getCurrentId() != self.owner) | |
locked.data.mutex_borrow_count += 1; // this thread can reference the mutex | |
if (initial_task != null) | |
locked.data.active_thread_count += 1; | |
} | |
} | |
} | |
if (stopping) { | |
if (Thread.getCurrentId() == self.owner) { | |
std.debug.assert(null == receiveTask(Task)); | |
} | |
} else { | |
threadLoop(self, initial_task); | |
} | |
} | |
pub fn runTasks(self: *Scheduler, task: *Task) !void { | |
var optional_next_task = try task.run(self); | |
while (true) { | |
if (optional_next_task) |next_task| { | |
optional_next_task = try next_task.run(self); | |
} else break; | |
} | |
} | |
fn threadLoopInner(scheduler: *Scheduler, optional_initial_task: ?*Task, active: *bool) anyerror!void { | |
if (optional_initial_task) |initial_task| | |
try scheduler.runTasks(initial_task); | |
while (true) { | |
// handle all pending_tasks first | |
while (true) { | |
const next_task = init: { | |
const locked = scheduler.lock(); | |
defer locked.unlock(); | |
if (locked.data.stopping) return; | |
if (locked.data.pending_tasks.popOrNull()) |next_task| break :init next_task; | |
std.debug.assert(active.*); | |
std.debug.assert(locked.data.active_thread_count > 0); | |
locked.data.active_thread_count -= 1; | |
active.* = false; | |
if (locked.data.active_thread_count == 0 and locked.data.stop_when_zero_active_threads) | |
{ | |
locked.data.stop(scheduler.owner); | |
return; | |
} | |
try locked.data.idle.append(Thread.getCurrentId()); | |
break; | |
}; | |
try scheduler.runTasks(next_task); | |
} | |
if (receiveTask(Task)) |task| { | |
// in this case, the active_thread_count wil have been incremented for us | |
active.* = true; | |
try scheduler.runTasks(task); | |
} else break; | |
} | |
} | |
// NOTE: The mutex_borrow_count will have been incremented for every thread that enters this function, | |
// except the owner thread. It is this function's responsibility to decrement the mutex_borrow_count | |
// on exit and notify the owner thread if it reaches 0. | |
// Also, if the initial_task is non-null, then the active_thread_count will also have been incremented | |
// and it is this function's responsibility to decrement it when it is no longer active. | |
fn threadLoop(scheduler: *Scheduler, optional_initial_task: ?*Task) void { | |
// indicates this thread is contributing to the active_thread_count | |
var active = optional_initial_task != null; | |
threadLoopInner(scheduler, optional_initial_task, &active) catch |err| { | |
const locked = scheduler.lock(); | |
defer locked.unlock(); | |
if (locked.data.uncaught_error == null) | |
locked.data.uncaught_error = err; | |
}; | |
const current_mutex_borrow_count = init: { | |
const locked = scheduler.lock(); | |
defer locked.unlock(); | |
if (Thread.getCurrentId() != scheduler.owner) | |
locked.data.mutex_borrow_count -= 1; | |
const save_mutex_borrow_count = locked.data.mutex_borrow_count; | |
if (active) { | |
std.debug.assert(locked.data.active_thread_count > 0); | |
locked.data.active_thread_count -= 1; | |
if (locked.data.active_thread_count == 0 and !locked.data.stopping) | |
{ | |
// we are going to stop whether or not stop_when_zero_active_threads is | |
// enabled because this would constitute an error code path | |
locked.data.stop(scheduler.owner); | |
} | |
} | |
break :init save_mutex_borrow_count; | |
}; | |
if (Thread.getCurrentId() == scheduler.owner) { | |
if (current_mutex_borrow_count > 0) | |
{ | |
// flush any pending tasks | |
while (true) { | |
if (receiveTask(Task)) |_| { | |
} else break; | |
} | |
} | |
} | |
else | |
{ | |
if (current_mutex_borrow_count == 0) | |
sendTask(scheduler.owner, null); | |
} | |
} | |
}; | |
test "scheduler" { | |
log("", .{}); | |
const max_job_count = 10; | |
var scheduler = Scheduler.init(std.testing.allocator, Thread.getCurrentId(), max_job_count); | |
defer scheduler.deinit(); | |
var tasks = [_]Task { | |
.{ .placeholder = 0 }, | |
.{ .placeholder = 1 }, | |
.{ .placeholder = 2 }, | |
}; | |
for (tasks[1..]) |*task| { | |
try scheduler.schedule(task); | |
} | |
try scheduler.runOnCurrentThread(false,&tasks[0]); | |
{ | |
const locked = scheduler.lock(); | |
defer locked.unlock(); | |
if (locked.data.uncaught_error) |err| | |
return err; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment