Skip to content

Instantly share code, notes, and snippets.

@japhb
Created April 5, 2023 02:34
Show Gist options
  • Save japhb/e146da8b3cfbc34ce7c555a7e1f14ada to your computer and use it in GitHub Desktop.
Save japhb/e146da8b3cfbc34ce7c555a7e1f14ada to your computer and use it in GitHub Desktop.
PoC: TaskWatcher utility module
# ABSTRACT: Proof of concept of mass task spawning/watching with controlled parallelism
use Terminal::ANSIColor;
use Text::MiscUtils::Layout;
enum MessageType < TaskStart TaskExit TaskOut TaskErr >;
#| Messages communicated from a SubTask to the TaskWatcher
class TaskMessage {
has MessageType:D $.type is required;
has Cool:D $.task-id is required;
has $.content is required;
}
#| A subtask and its react loop
class SubTask {
has Cool:D $.id is required;
has Supplier:D $.dropbox is required;
has Str:D $.command is required;
has @.args;
has Str $.input;
has Real:D $.timeout = 0;
has UInt $.pid is built(False);
has Supplier $!control .= new;
has Supply $!quit = $!control.Supply;
has Proc::Async $!proc;
method force-quit() {
$!control.emit('quit');
}
method start() {
die "Task $.id already started!" if $!proc;
$!proc = Proc::Async.new: $.command, |@.args;
my sub emit-msg($type, $content) {
my $msg = TaskMessage.new(:$type, :$content, task-id => $.id);
$.dropbox.emit($msg);
}
start react {
my sub force-quit() {
# Signal SIGHUP immediately, and SIGKILL if still not exited
$!proc.kill;
whenever Promise.in(2) {
$!proc.kill: SIGKILL;
}
}
whenever $!proc.stdout.lines { emit-msg(TaskOut, $_) }
whenever $!proc.stderr { emit-msg(TaskErr, $_) }
whenever $!proc.ready { emit-msg(TaskStart, $!pid = $_) }
whenever $!proc.start { emit-msg(TaskExit, $_); done }
whenever $!quit { force-quit }
if $.input.defined {
whenever $!proc.print($.input) { $!proc.close-stdin }
}
if $.timeout {
whenever Promise.in($.timeout) { force-quit }
}
}
}
}
#| A task spawner/watcher with its own queue of work and message dispatch loop
class TaskWatcher {
has UInt:D $.max-tasks is required;
has Supplier::Preserving:D $!dropbox .= new;
has Supply:D $!inbox = $!dropbox.Supply;
has Str:D $!status = '';
has UInt:D $!task-count = 0;
has UInt:D $!exited = 0;
has SubTask:D @!queued;
has SubTask:D %!tasks;
method new-task(Str:D $command, *@args, Real:D :$timeout = 0) {
my $id = ++$!task-count;
@!queued.push: SubTask.new(:$id, :$command, :@args,
:$!dropbox, :$timeout);
self.try-start-task;
}
method try-start-task() {
return unless $.active-tasks < $.max-tasks;
with @!queued.shift {
%!tasks{.id} = $_;
.start;
}
}
method queued-tasks() {
@!queued.elems
}
method active-tasks() {
%!tasks.elems
}
method task-started($task) {
self.log("==> Started task $task.id(): $task.command()");
}
method task-exited($task, $exit-info) {
my $code = $exit-info.exitcode;
my $signal = $exit-info.signal;
%!tasks{$task.id}:delete;
$!exited++;
self.log("<== Exited task $task.id() with exit code $code, signal $signal");
self.try-start-task;
}
method task-out($task, $content) {
self.log($task.id.fmt('%4d: ') ~ $content);
}
method task-err($task, $content) {
self.log(colored($task.id.fmt('%4d! ') ~ chomp($content), 'bold red'));
}
method log($content) {
self.hide-status;
put $content;
self.show-status;
}
method hide-status() {
my $width = duospace-width($!status);
print "\b" x $width ~ ' ' x $width ~ "\b" x $width;
}
method show-status() {
constant %color = exited => 'blue',
active => 'yellow',
queued => 'red';
my $key = join '/',
colored('Exited', %color<exited>),
colored('Active', %color<active>),
colored('Queued', %color<queued>);
my $counts = "$!exited / $.active-tasks / $.queued-tasks";
my $bar = '';
if $!task-count {
my $width = 40;
my $exited-width = floor($width * $!exited / $!task-count);
my $active-width = floor($width * $.active-tasks / $!task-count);
my $queued-width = $width - $exited-width - $active-width;
$bar = join '',
colored(' ' x $exited-width, 'on_' ~ %color<exited>),
colored(' ' x $active-width, 'on_' ~ %color<active>),
colored(' ' x $queued-width, 'on_' ~ %color<queued>);
}
$!status = "$key: $counts $bar";
print $!status;
}
method dispatch(TaskMessage $message) {
my $id = $message.task-id;
my $task = %!tasks{$id} or die "Unknown task ID $id";
given $message.type {
when TaskStart { self.task-started: $task }
when TaskExit { self.task-exited: $task, $message.content }
when TaskOut { self.task-out: $task, $message.content }
when TaskErr { self.task-err: $task, $message.content }
default { die "Unknown MessageType $_" }
}
}
method start() {
start react {
whenever $!inbox { self.dispatch($_) }
}
}
method wait() {
sleep .1 while $.queued-tasks || $.active-tasks;
self.hide-status;
}
}
#| Demo: Create and watch a number of trivial tasks, with parallelism $max-tasks
sub MAIN(UInt:D :$max-tasks = Kernel.cpu-cores) {
my $watcher = TaskWatcher.new(:$max-tasks);
$watcher.start;
for ^20 {
my $script = $_ >= 10 ?? 'note "ERROR from task ' ~ ($_ + 1) ~ '"; sleep rand'
!! 'say "Hello from task ' ~ ($_ + 1) ~ '"; sleep rand';
$watcher.new-task('raku', '-e', $script);
}
$watcher.wait;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment