-
-
Save japhb/e146da8b3cfbc34ce7c555a7e1f14ada to your computer and use it in GitHub Desktop.
PoC: TaskWatcher utility module
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ABSTRACT: Proof of concept of mass task spawning/watching with controlled parallelism | |
use Terminal::ANSIColor; | |
use Text::MiscUtils::Layout; | |
enum MessageType < TaskStart TaskExit TaskOut TaskErr >; | |
#| Messages communicated from a SubTask to the TaskWatcher | |
class TaskMessage { | |
has MessageType:D $.type is required; | |
has Cool:D $.task-id is required; | |
has $.content is required; | |
} | |
#| A subtask and its react loop | |
class SubTask { | |
has Cool:D $.id is required; | |
has Supplier:D $.dropbox is required; | |
has Str:D $.command is required; | |
has @.args; | |
has Str $.input; | |
has Real:D $.timeout = 0; | |
has UInt $.pid is built(False); | |
has Supplier $!control .= new; | |
has Supply $!quit = $!control.Supply; | |
has Proc::Async $!proc; | |
method force-quit() { | |
$!control.emit('quit'); | |
} | |
method start() { | |
die "Task $.id already started!" if $!proc; | |
$!proc = Proc::Async.new: $.command, |@.args; | |
my sub emit-msg($type, $content) { | |
my $msg = TaskMessage.new(:$type, :$content, task-id => $.id); | |
$.dropbox.emit($msg); | |
} | |
start react { | |
my sub force-quit() { | |
# Signal SIGHUP immediately, and SIGKILL if still not exited | |
$!proc.kill; | |
whenever Promise.in(2) { | |
$!proc.kill: SIGKILL; | |
} | |
} | |
whenever $!proc.stdout.lines { emit-msg(TaskOut, $_) } | |
whenever $!proc.stderr { emit-msg(TaskErr, $_) } | |
whenever $!proc.ready { emit-msg(TaskStart, $!pid = $_) } | |
whenever $!proc.start { emit-msg(TaskExit, $_); done } | |
whenever $!quit { force-quit } | |
if $.input.defined { | |
whenever $!proc.print($.input) { $!proc.close-stdin } | |
} | |
if $.timeout { | |
whenever Promise.in($.timeout) { force-quit } | |
} | |
} | |
} | |
} | |
#| A task spawner/watcher with its own queue of work and message dispatch loop | |
class TaskWatcher { | |
has UInt:D $.max-tasks is required; | |
has Supplier::Preserving:D $!dropbox .= new; | |
has Supply:D $!inbox = $!dropbox.Supply; | |
has Str:D $!status = ''; | |
has UInt:D $!task-count = 0; | |
has UInt:D $!exited = 0; | |
has SubTask:D @!queued; | |
has SubTask:D %!tasks; | |
method new-task(Str:D $command, *@args, Real:D :$timeout = 0) { | |
my $id = ++$!task-count; | |
@!queued.push: SubTask.new(:$id, :$command, :@args, | |
:$!dropbox, :$timeout); | |
self.try-start-task; | |
} | |
method try-start-task() { | |
return unless $.active-tasks < $.max-tasks; | |
with @!queued.shift { | |
%!tasks{.id} = $_; | |
.start; | |
} | |
} | |
method queued-tasks() { | |
@!queued.elems | |
} | |
method active-tasks() { | |
%!tasks.elems | |
} | |
method task-started($task) { | |
self.log("==> Started task $task.id(): $task.command()"); | |
} | |
method task-exited($task, $exit-info) { | |
my $code = $exit-info.exitcode; | |
my $signal = $exit-info.signal; | |
%!tasks{$task.id}:delete; | |
$!exited++; | |
self.log("<== Exited task $task.id() with exit code $code, signal $signal"); | |
self.try-start-task; | |
} | |
method task-out($task, $content) { | |
self.log($task.id.fmt('%4d: ') ~ $content); | |
} | |
method task-err($task, $content) { | |
self.log(colored($task.id.fmt('%4d! ') ~ chomp($content), 'bold red')); | |
} | |
method log($content) { | |
self.hide-status; | |
put $content; | |
self.show-status; | |
} | |
method hide-status() { | |
my $width = duospace-width($!status); | |
print "\b" x $width ~ ' ' x $width ~ "\b" x $width; | |
} | |
method show-status() { | |
constant %color = exited => 'blue', | |
active => 'yellow', | |
queued => 'red'; | |
my $key = join '/', | |
colored('Exited', %color<exited>), | |
colored('Active', %color<active>), | |
colored('Queued', %color<queued>); | |
my $counts = "$!exited / $.active-tasks / $.queued-tasks"; | |
my $bar = ''; | |
if $!task-count { | |
my $width = 40; | |
my $exited-width = floor($width * $!exited / $!task-count); | |
my $active-width = floor($width * $.active-tasks / $!task-count); | |
my $queued-width = $width - $exited-width - $active-width; | |
$bar = join '', | |
colored(' ' x $exited-width, 'on_' ~ %color<exited>), | |
colored(' ' x $active-width, 'on_' ~ %color<active>), | |
colored(' ' x $queued-width, 'on_' ~ %color<queued>); | |
} | |
$!status = "$key: $counts $bar"; | |
print $!status; | |
} | |
method dispatch(TaskMessage $message) { | |
my $id = $message.task-id; | |
my $task = %!tasks{$id} or die "Unknown task ID $id"; | |
given $message.type { | |
when TaskStart { self.task-started: $task } | |
when TaskExit { self.task-exited: $task, $message.content } | |
when TaskOut { self.task-out: $task, $message.content } | |
when TaskErr { self.task-err: $task, $message.content } | |
default { die "Unknown MessageType $_" } | |
} | |
} | |
method start() { | |
start react { | |
whenever $!inbox { self.dispatch($_) } | |
} | |
} | |
method wait() { | |
sleep .1 while $.queued-tasks || $.active-tasks; | |
self.hide-status; | |
} | |
} | |
#| Demo: Create and watch a number of trivial tasks, with parallelism $max-tasks | |
sub MAIN(UInt:D :$max-tasks = Kernel.cpu-cores) { | |
my $watcher = TaskWatcher.new(:$max-tasks); | |
$watcher.start; | |
for ^20 { | |
my $script = $_ >= 10 ?? 'note "ERROR from task ' ~ ($_ + 1) ~ '"; sleep rand' | |
!! 'say "Hello from task ' ~ ($_ + 1) ~ '"; sleep rand'; | |
$watcher.new-task('raku', '-e', $script); | |
} | |
$watcher.wait; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment