Skip to content

Instantly share code, notes, and snippets.

@lizmat
Last active May 21, 2019 21:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lizmat/787a6ddb31d51cc67d0c00f3403de78c to your computer and use it in GitHub Desktop.
Save lizmat/787a6ddb31d51cc67d0c00f3403de78c to your computer and use it in GitHub Desktop.
fast as you can parallel feed
use nqp;
# Process a given set of feed operations from a given source and a list of
# Callables to execute. Returns a Seq to be used to storing in the pointy
# end of the feeds.
sub PROCESS-FEED($source, @stages --> Seq:D) {
my class Pipeline is repr('ConcBlockingQueue') {}
# Process a stage with given code for given iterator
my class Process does Iterator {
has &.code;
has $.iterator;
has $.is-lazy;
has int $.low-mark = 25;
has int $.high-mark = 100;
has Pipeline $!buffered;
has Pipeline $!feeder;
method pull-one() {
nqp::push($!feeder,$!high-mark) # feed if not enough available
if nqp::elems($!buffered) <= $!low-mark;
nqp::shift($!buffered) # there's always a value
}
# Fill buffer up to high-mark, return whether exhausted. If a Slip
# is encountered, then add *all* values from Slip to buffer, for
# simplicity.
method !fill() {
nqp::until(
nqp::isge_i(nqp::elems($!buffered),$!high-mark) # enough?
|| nqp::eqaddr( # or exhausted?
(my \pulled := $!iterator.pull-one),
IterationEnd
),
nqp::if(
nqp::istype((my \result := &!code(pulled)),Slip),
self!slip(result.iterator), # add all values from slip
nqp::push($!buffered,result) # add just the one value
)
);
nqp::if(
nqp::eqaddr(pulled,IterationEnd), # return 0 if not equal
nqp::push($!buffered,IterationEnd)
)
}
# Slip all the values from an interator into the buffer.
method !slip(\slipper --> Nil) {
nqp::until(
nqp::eqaddr((my \slip := slipper.pull-one),IterationEnd),
nqp::push($!buffered,slip)
)
}
method TWEAK(--> Nil) {
$!buffered := nqp::create(Pipeline);
$!feeder := nqp::create(Pipeline);
# Set up async buffer filling unless there are not enough values
# to fill up the initial high-mark. This should both set up a
# nice initial buffer to read from. And if there are not enough
# values, don't bother to set up all of the async stuff.
unless self!fill {
start { # did not exhaust source
loop {
nqp::shift($!feeder); # wait for the signal!
my $then := nqp::time_n();
last if self!fill; # stop when exhausted now
$!high-mark = (( # fine-tune high mark
$!high-mark + (10 / (nqp::time_n() - $then)).Int
) div 2) + 1;
}
}
}
}
}
# set up the stages
my $iterator = $source.iterator;
my $is-lazy = $iterator.is-lazy;
for @stages -> &code {
$iterator = Process.new(:&code, :$iterator, :$is-lazy);
}
Seq.new($iterator)
}
my @a = PROCESS-FEED( ^10_000, ( { Nil for ^10_000; $_ } xx 10));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment