Last active
May 21, 2019 21:19
-
-
Save lizmat/787a6ddb31d51cc67d0c00f3403de78c to your computer and use it in GitHub Desktop.
fast as you can parallel feed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use nqp; | |
# Process a given set of feed operations from a given source and a list of | |
# Callables to execute. Returns a Seq to be used to storing in the pointy | |
# end of the feeds. | |
sub PROCESS-FEED($source, @stages --> Seq:D) { | |
my class Pipeline is repr('ConcBlockingQueue') {} | |
# Process a stage with given code for given iterator | |
my class Process does Iterator { | |
has &.code; | |
has $.iterator; | |
has $.is-lazy; | |
has int $.low-mark = 25; | |
has int $.high-mark = 100; | |
has Pipeline $!buffered; | |
has Pipeline $!feeder; | |
method pull-one() { | |
nqp::push($!feeder,$!high-mark) # feed if not enough available | |
if nqp::elems($!buffered) <= $!low-mark; | |
nqp::shift($!buffered) # there's always a value | |
} | |
# Fill buffer up to high-mark, return whether exhausted. If a Slip | |
# is encountered, then add *all* values from Slip to buffer, for | |
# simplicity. | |
method !fill() { | |
nqp::until( | |
nqp::isge_i(nqp::elems($!buffered),$!high-mark) # enough? | |
|| nqp::eqaddr( # or exhausted? | |
(my \pulled := $!iterator.pull-one), | |
IterationEnd | |
), | |
nqp::if( | |
nqp::istype((my \result := &!code(pulled)),Slip), | |
self!slip(result.iterator), # add all values from slip | |
nqp::push($!buffered,result) # add just the one value | |
) | |
); | |
nqp::if( | |
nqp::eqaddr(pulled,IterationEnd), # return 0 if not equal | |
nqp::push($!buffered,IterationEnd) | |
) | |
} | |
# Slip all the values from an interator into the buffer. | |
method !slip(\slipper --> Nil) { | |
nqp::until( | |
nqp::eqaddr((my \slip := slipper.pull-one),IterationEnd), | |
nqp::push($!buffered,slip) | |
) | |
} | |
method TWEAK(--> Nil) { | |
$!buffered := nqp::create(Pipeline); | |
$!feeder := nqp::create(Pipeline); | |
# Set up async buffer filling unless there are not enough values | |
# to fill up the initial high-mark. This should both set up a | |
# nice initial buffer to read from. And if there are not enough | |
# values, don't bother to set up all of the async stuff. | |
unless self!fill { | |
start { # did not exhaust source | |
loop { | |
nqp::shift($!feeder); # wait for the signal! | |
my $then := nqp::time_n(); | |
last if self!fill; # stop when exhausted now | |
$!high-mark = (( # fine-tune high mark | |
$!high-mark + (10 / (nqp::time_n() - $then)).Int | |
) div 2) + 1; | |
} | |
} | |
} | |
} | |
} | |
# set up the stages | |
my $iterator = $source.iterator; | |
my $is-lazy = $iterator.is-lazy; | |
for @stages -> &code { | |
$iterator = Process.new(:&code, :$iterator, :$is-lazy); | |
} | |
Seq.new($iterator) | |
} | |
my @a = PROCESS-FEED( ^10_000, ( { Nil for ^10_000; $_ } xx 10)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment