Created
May 21, 2019 21:07
-
-
Save lizmat/cecd280112920719f87cb5c9a8fe0ee7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use nqp; | |
# Process a given set of feed operations from a given source and a list of | |
# Callables to execute. Returns a Seq to be used to storing in the pointy | |
# end of the feeds. | |
sub PROCESS-FEED($source, @stages --> Seq:D) { | |
my class Pipeline is repr('ConcBlockingQueue') {} | |
# Process a stage with given code for given iterator | |
my class Process does Iterator { | |
has &.code; | |
has $.iterator; | |
has $.is-lazy; | |
has int $.low-mark = 25; | |
has int $.high-mark = 100; | |
has Pipeline $!buffered; | |
has Pipeline $!feeder; | |
method pull-one() { | |
nqp::push($!feeder,$!high-mark) # feed if not enough available | |
if nqp::elems($!buffered) <= $!low-mark; | |
nqp::shift($!buffered) # there's always a value | |
} | |
# Fill buffer up to high-mark, return whether exhausted. If a Slip | |
# is encountered, then add *all* values from Slip to buffer, for | |
# simplicity. | |
method !fill() { | |
nqp::until( | |
nqp::isge_i(nqp::elems($!buffered),$!high-mark) # enough? | |
|| nqp::eqaddr( # or exhausted? | |
(my \pulled := $!iterator.pull-one), | |
IterationEnd | |
), | |
nqp::if( | |
nqp::istype((my \result := &!code(pulled)),Slip), | |
nqp::stmts( # add all values from slip | |
(my \slipper := result.iterator), | |
nqp::until( | |
nqp::eqaddr((my \slip := slipper.pull-one),IterationEnd), | |
nqp::push($!buffered,slip) | |
) | |
), | |
nqp::push($!buffered,result) # add just the one value | |
) | |
); | |
nqp::if( | |
nqp::eqaddr(pulled,IterationEnd), # return 0 if not equal | |
nqp::push($!buffered,IterationEnd) | |
) | |
} | |
method TWEAK(--> Nil) { | |
$!buffered := nqp::create(Pipeline); | |
$!feeder := nqp::create(Pipeline); | |
# Set up async buffer filling unless there are not enough values | |
# to fill up the initial high-mark. This should both set up a | |
# nice initial buffer to read from. And if there are not enough | |
# values, don't bother to set up all of the async stuff. | |
unless self!fill { | |
start { # did not exhaust source | |
loop { | |
nqp::shift($!feeder); # wait for the signal! | |
my $then := nqp::time_n(); | |
last if self!fill; # stop when exhausted now | |
$!high-mark = (( # fine-tune high mark | |
$!high-mark + (10 / (nqp::time_n() - $then)).Int | |
) div 2) + 1; | |
} | |
} | |
} | |
} | |
} | |
# set up the stages | |
my $iterator = $source.iterator; | |
my $is-lazy = $iterator.is-lazy; | |
for @stages -> &code { | |
$iterator = Process.new(:&code, :$iterator, :$is-lazy); | |
} | |
Seq.new($iterator) | |
} | |
my @a = PROCESS-FEED( ^10_000, ( { Nil for ^10_000; $_ } xx 10)); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment