-
-
Save emekoi/5a96469e46ab02eee31f2dc9018847b4 to your computer and use it in GitHub Desktop.
continuation stealing vs work stealing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const time = std.os.time; | |
use @import("deque.zig"); | |
const Thread = std.os.Thread; | |
const Timer = time.Timer; | |
const c_allocator = std.heap.c_allocator; | |
const approxEq = std.math.approxEq; | |
const EPSILON = 0.000001; | |
const LIMIT = 100000000.0; | |
fn exampleTask(i: f64) f64 { | |
return @sqrt(f64, i); | |
} | |
const ContinuationStealing = struct { | |
pub const Task = promise->anyerror!void; | |
const KILL_THREAD: Task = undefined; | |
pub async fn testFn0(comptime task: fn(f64) f64, deque: *Deque(Task)) anyerror!void { | |
const worker = deque.worker(); | |
var i: f64 = 1.0; | |
while (!approxEq(f64, i, LIMIT, EPSILON)) : (i += 1.0) { | |
suspend { | |
try worker.push(@handle()); | |
std.math.forceEval(task(i)); | |
} | |
} | |
try worker.push(KILL_THREAD); | |
try worker.push(KILL_THREAD); | |
try worker.push(KILL_THREAD); | |
try worker.push(KILL_THREAD); | |
} | |
pub async fn testFn1(comptime task: fn(f64) f64, deque: *Deque(Task)) anyerror!void { | |
const worker = deque.worker(); | |
var i: f64 = 1.0; | |
while (!approxEq(f64, i, LIMIT, EPSILON)) : (i += 1.0) { | |
suspend { | |
try worker.push(@handle()); | |
std.math.forceEval(task(i)); | |
if (worker.pop()) |p| { | |
resume p; | |
} | |
} | |
} | |
try worker.push(KILL_THREAD); | |
try worker.push(KILL_THREAD); | |
try worker.push(KILL_THREAD); | |
try worker.push(KILL_THREAD); | |
} | |
pub async fn testFn2(comptime task: fn(f64) f64, deque: *Deque(Task)) anyerror!void { | |
const worker = deque.worker(); | |
var i: f64 = 1.0; | |
while (!approxEq(f64, i, LIMIT, EPSILON)) : (i += 1.0) { | |
suspend { | |
try worker.push(@handle()); | |
std.math.forceEval(task(i)); | |
} | |
if (worker.pop()) |p| { | |
resume p; | |
} | |
} | |
try worker.push(KILL_THREAD); | |
try worker.push(KILL_THREAD); | |
try worker.push(KILL_THREAD); | |
try worker.push(KILL_THREAD); | |
} | |
pub fn worker_fn(stealer: Stealer(Task)) void { | |
while (true) { | |
switch (stealer.steal()) { | |
Stolen(Task).Data => |t| { | |
if (t != KILL_THREAD) { | |
resume t; | |
} else break; | |
}, | |
Stolen(Task).Abort => continue, | |
Stolen(Task).Empty => continue, | |
} | |
} | |
} | |
}; | |
const WorkStealing = struct { | |
pub const Task = f64; | |
pub fn testFn(deque: *Deque(Task)) !void { | |
const worker = deque.worker(); | |
var i: f64 = 1.0; | |
while (!approxEq(f64, i, LIMIT, EPSILON)) : (i += 1.0) { | |
try worker.push(i); | |
} | |
try worker.push(std.math.nan(f64)); | |
try worker.push(std.math.nan(f64)); | |
try worker.push(std.math.nan(f64)); | |
try worker.push(std.math.nan(f64)); | |
} | |
pub fn worker_fn(stealer: Stealer(Task)) void { | |
while (true) { | |
switch (stealer.steal()) { | |
Stolen(Task).Data => |i| { | |
if (!std.math.isNan(i)) { | |
std.math.forceEval(exampleTask(i)); | |
} else break; | |
}, | |
Stolen(Task).Abort => continue, | |
Stolen(Task).Empty => continue, | |
} | |
} | |
} | |
}; | |
test " WorkStealing" { | |
var deque = try Deque(WorkStealing.Task).withCapacity(std.heap.c_allocator, 50); | |
defer deque.deinit(); | |
var threads: [4]*std.os.Thread = undefined; | |
var timer = try Timer.start(); | |
try WorkStealing.testFn(&deque); | |
for (threads) |*t| { | |
t.* = try std.os.spawnThread(deque.stealer(), WorkStealing.worker_fn); | |
} | |
for (threads) |t| { | |
t.wait(); | |
} | |
std.debug.warn("time: {} ", timer.lap()); | |
} | |
test "ContinuationStealing0" { | |
var deque = try Deque(ContinuationStealing.Task).withCapacity(std.heap.c_allocator, 50); | |
defer deque.deinit(); | |
var threads: [4]*std.os.Thread = undefined; | |
var timer = try Timer.start(); | |
var handle = try async<c_allocator> ContinuationStealing.testFn0(exampleTask, &deque); | |
defer cancel handle; | |
for (threads) |*t| { | |
t.* = try std.os.spawnThread(deque.stealer(), ContinuationStealing.worker_fn); | |
} | |
for (threads) |t| { | |
t.wait(); | |
} | |
std.debug.warn("time: {} ", timer.lap()); | |
} | |
test "ContinuationStealing1" { | |
var deque = try Deque(ContinuationStealing.Task).withCapacity(std.heap.c_allocator, 50); | |
defer deque.deinit(); | |
var threads: [4]*std.os.Thread = undefined; | |
var timer = try Timer.start(); | |
var handle = try async<c_allocator> ContinuationStealing.testFn1(exampleTask, &deque); | |
defer cancel handle; | |
for (threads) |*t| { | |
t.* = try std.os.spawnThread(deque.stealer(), ContinuationStealing.worker_fn); | |
} | |
for (threads) |t| { | |
t.wait(); | |
} | |
std.debug.warn("time: {} ", timer.lap()); | |
} | |
test "ContinuationStealing2" { | |
var deque = try Deque(ContinuationStealing.Task).withCapacity(std.heap.c_allocator, 50); | |
defer deque.deinit(); | |
var threads: [4]*std.os.Thread = undefined; | |
var timer = try Timer.start(); | |
var handle = try async<c_allocator> ContinuationStealing.testFn2(exampleTask, &deque); | |
defer cancel handle; | |
for (threads) |*t| { | |
t.* = try std.os.spawnThread(deque.stealer(), ContinuationStealing.worker_fn); | |
} | |
for (threads) |t| { | |
t.wait(); | |
} | |
std.debug.warn("time: {} ", timer.lap()); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment