Skip to content

Instantly share code, notes, and snippets.

@lizmat
Created May 21, 2019 16:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lizmat/8652e4a403fffe64e84b392b31cc6ad4 to your computer and use it in GitHub Desktop.
Save lizmat/8652e4a403fffe64e84b392b31cc6ad4 to your computer and use it in GitHub Desktop.
parallel feed implementation
use nqp;
sub PROCESS-FEED($source, @stages --> Seq:D) {
my class Pipeline is repr('ConcBlockingQueue') {}
# process a stage with given code for given iterator
my class Process does Iterator {
has &.code;
has $.iterator;
has $.is-lazy;
has int $.low-mark = 0;
has int $.high-mark = 64;
has Pipeline $!buffered;
has Pipeline $!feeder;
method pull-one() {
nqp::push($!feeder, $!high-mark - nqp::elems($!buffered))
if nqp::elems($!buffered) <= $!low-mark;
nqp::shift($!buffered)
}
method TWEAK(--> Nil) {
$!buffered := nqp::create(Pipeline);
$!feeder := nqp::create(Pipeline);
start {
loop {
my int $todo = nqp::shift($!feeder);
nqp::while(
--$todo
&& nqp::not_i(nqp::eqaddr(
(my \pulled := $!iterator.pull-one),
IterationEnd
)),
nqp::push($!buffered, &!code(pulled))
);
if nqp::eqaddr(pulled,IterationEnd) {
nqp::push($!buffered,IterationEnd);
last;
}
}
}
}
}
# set up the stages
my $iterator = $source.iterator;
my $is-lazy = $iterator.is-lazy;
for @stages -> &code {
$iterator = Process.new(:&code, :$iterator, :$is-lazy);
}
Seq.new($iterator)
}
#for PROCESS-FEED( ^10, ( { say "foo: $_"; 2 * $_ }, { say "bar: $_"; 3 * $_ })) {
# say "baz: $_"
#}
my @a = PROCESS-FEED( ^1000, ( { sleep .001; 2 * $_ }, { sleep .001; 3 * $_ }));
#dd @a;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment