Skip to content

Instantly share code, notes, and snippets.

@FCO
Last active May 6, 2022 11:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FCO/c999698b08a54e87b18ebd7ed00ac5ed to your computer and use it in GitHub Desktop.
Save FCO/c999698b08a54e87b18ebd7ed00ac5ed to your computer and use it in GitHub Desktop.
Testing processes
use lib ".";
use Process;
unit class Execution;
has Process @.processes;
has Process $.current-process;
has Promise $.spawn-process .= new;
has $.time-limit = 0.00005;
has Lock::Async $!lock .= new;
method spawn(&block) {
do given Process.new: &block {
$!lock.protect: { @!processes.push: .<> }
$!spawn-process.keep unless $!spawn-process.status ~~ Kept;
.<>
}
}
method loop {
loop {
for @!processes -> Process $proc {
my $*PROCESS = $proc;
my $*EXCEPTION = $_ with $proc.exception;
next if $proc.ended;
{
CATCH {
default {
$proc.died: $_;
note "Process { $proc.gist }: ", .message
}
}
$proc.run-by: $!time-limit
}
}
$!lock.protect: { @!processes = @!processes.grep: { !.ended } }
if @!processes == 0 {
$!lock.protect: { $!spawn-process .= new }
await $!spawn-process
}
}
}
use lib ".";
use Execution;
my $ex = Execution.new;
my $p1 = $ex.spawn: {
use TakeAfterEachLine;
for ^10 {
say $*PROCESS, ": $_"
}
}
$ex.spawn: {
use TakeAfterEachLine;
for ^10 {
say $*PROCESS, ": $_";
$p1.suicide if $_ >= 3
}
}
$ex.loop
use lib ".";
unit class Process;
class X::Receive::Timeout is Exception {
has UInt $.pid;
method message {
"Receive timeout for pid $!pid"
}
}
has UInt $.pid = $++;
has Bool $.my-turn = False;
has $.node; # TODO: = $*NODE;
has Exception $.exception is rw;
has Bool $.ended = False;
has Iterator $!iterator;
has $.last-duration;
has $.block-duration;
has $.total-duration;
has &.block is required;
has Channel $!channel handles <send> .= new;
multi method new(&block) {
self.new: :&block
}
method TWEAK(|) {
say self;
$.start
}
method gist { "Process<{ $!pid }>" }
method died(Exception $e) {
$!exception //= $e;
$!ended = True
}
method linked-died(Exception $e) {
$!exception = $e;
}
method suicide {
$!exception = class Kill is Exception {
method message {
"Killed"
}
}.new
}
method receive(&block, :%timeout) {
react {
whenever $!channel {
block
}
whenever Promise.in(%timeout.keys.first) {
$!exception = X::Receive::Timeout.new: $!pid;
done
}
}
}
method start {
$!iterator = gather { &!block.() }.iterator;
}
method next {
my $start = now;
given $!iterator.pull-one {
if .<> =:= IterationEnd {
$!ended = True
}
}
$!last-duration = now - $start;
$!total-duration += $!last-duration;
}
method run-by($time) {
my $duration = 0;
while !$!ended && $duration <= $time {
$.next;
$duration += $!last-duration
}
}
use lib ".";
use nqp;
use QAST:from<NQP>;
class X::Exception::Wrapper is Exception {
has Exception $.wrapped;
method message {
"Wrapped exception:\n{ $!wrapped.message.indent: 4 }"
}
}
my $Pair := $*W.find_single_symbol('Pair', :setting-only);
sub take-nothing is hidden-from-backtrace {
X::Exception::Wrapper.new(:wrapped($_)).throw with $*EXCEPTION;
take ""
}
sub EXPORT(|) {
role TakeAfterEachLine {
method statement(Mu $/) {
my $inner := callsame;
if nqp::istype($inner, QAST::Op)
&& (nqp::istype($inner.returns, $Pair) || $inner.name eq '&infix:«=>»') {
$/.make: $inner;
return;
}
my $ast := QAST::Stmts.new(
QAST::Op.new( :op('call'), QAST::WVal.new( :value( &take-nothing ) ) ),
$inner
);
$ast.sunk(1) unless $inner.nosink;
$ast.nosink( $inner.nosink );
$/.make: $ast;
}
}
$*LANG.define_slang: 'MAIN', $*LANG.slang_grammar('MAIN'), $*LANG.actions.^mixin(TakeAfterEachLine);
{}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment