-
-
Save jnthn/754292747161f95093d80b207d077d74 to your computer and use it in GitHub Desktop.
New hyper/race implementation exploration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use Test; | |
use nqp; | |
# Contains configuration for a hyper or race operation. | |
my class HyperConfiguration { | |
has Int $.degree is required; | |
has Int $.batch is required; | |
} | |
# A batch of work sent to a worker in a hyper or race operation. It is an | |
# Iterable, and iterates to the items in the batch. This is so that it can be | |
# easily processed in terms of (non-hyper) Iterable implementations. | |
class Rakudo::Internals::HyperWorkBatch does Iterable { | |
# The items in the batch. | |
has IterationBuffer $.items; | |
# Sequence number of the batch, starting from zero. | |
has int $.sequence-number; | |
# Is this the first batch that was produced at the last fork point or the last batch that the | |
# fork point will produce? | |
has Bool $.first; | |
has Bool $.last; | |
# Iterator for a HyperWorkBatch; | |
my class HyperWorkBatchIterator does Iterator { | |
has $!items; | |
has int $!i; | |
has int $!n; | |
submethod BUILD(:$items --> Nil) { | |
$!items := nqp::decont($items); | |
$!i = 0; | |
$!n = nqp::elems($!items); | |
} | |
method pull-one() { | |
$!i < $!n | |
?? nqp::atpos($!items, $!i++) | |
!! IterationEnd | |
} | |
} | |
method iterator(--> Iterator) { | |
HyperWorkBatchIterator.new(:$!items) | |
} | |
method replace-with-results(Iterable \i --> Nil) { | |
my $results := IterationBuffer.new; | |
i.iterator.push-all($results); | |
$!items := $results; | |
} | |
} | |
# Work stages are individual steps in a hyper/race pipeline. They are chained | |
# in a linked list by the source attribute. Roles for different kinds of stages | |
# follow. | |
role Rakudo::Internals::HyperWorkStage { | |
has Rakudo::Internals::HyperWorkStage $.source; | |
} | |
# A HyperBatcher stage produces batches of work to do. It will typically be | |
# created with an Iterable of some kind, and divide up the work into batches | |
# of the appropriate size. Such a stage always lives at the start of a piece | |
# of parallel processing pipeline. | |
role Rakudo::Internals::HyperBatcher does Rakudo::Internals::HyperWorkStage { | |
has $!sequence = 0; | |
method next-sequence-number() { | |
$!sequence++ | |
} | |
method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) { ... } | |
} | |
# A HyperProcessor performs some operation in a work batch, updating it to | |
# reflect the results of the operation. | |
role Rakudo::Internals::HyperProcessor does Rakudo::Internals::HyperWorkStage { | |
method process-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... } | |
} | |
# A HyperRebatcher is given batches, and may produce zero or more batches as a | |
# result. The produced batches will be passed on to the next pipeline stages. | |
# This is intended only for steps that need to look across multiple batches, | |
# but that work in a "streaming" way rather than being a full bottleneck in | |
# the pipeline. A HyperRebatcher should produce one output batch for each | |
# input batch it gets (though may produce no batches on one call, and two on | |
# the next, for example). | |
role Rakudo::Internals::HyperRebatcher does Rakudo::Internals::HyperWorkStage { | |
method rebatch(Rakudo::Internals::HyperWorkBatch $batch --> List) { ... } | |
} | |
# Comes at the end of a pipeline, or a stage in a multi-stage pipeline (that | |
# is, one with a step in it where all results are needed). The batch-used | |
# method should be called whenever a batch passed to consume-batch has been | |
# used. This allows for backpressure control: a sequential iterator at the | |
# end of a parallel pipeline can choose to call batch-used only at the point | |
# when the downstream iterator has actually eaten all the values in a batch. | |
role Rakudo::Internals::HyperJoiner does Rakudo::Internals::HyperWorkStage { | |
has $!batch-used-channel; | |
method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... } | |
method consume-error(Exception \e) { ... } | |
method batch-used(--> Nil) { | |
$!batch-used-channel.send(True); | |
} | |
method SET-BATCH-USED-CHANNEL($!batch-used-channel) {} | |
} | |
# Takes a linked list of pipeline stages and assembles them into a pipeline. | |
# Given a pipeline must end with a HyperJoiner, it expects to be passed | |
# something of this type. | |
class Rakudo::Internals::HyperPipeline { | |
method start(Rakudo::Internals::HyperJoiner $stage, HyperConfiguration $config) { | |
# Create channel that the last non-join operation in the pipeline will | |
# put its results into, and start a worker to handle the channel. | |
my $cur-dest-channel = Channel.new; | |
self!join-worker($stage, $cur-dest-channel); | |
# Create a channel that will signal we're ready for more batches, | |
# and set join stage to send on it when batch-used is called. | |
my $ready-channel = Channel.new; | |
$stage.SET-BATCH-USED-CHANNEL($ready-channel); | |
# Go through the rest of the stages. | |
my $cur-stage = $stage.source; | |
my @processors; | |
while $cur-stage { | |
my $next-stage = $cur-stage.source; | |
given $cur-stage { | |
when Rakudo::Internals::HyperProcessor { | |
# Unshift them so a sequence will be in application order. | |
unshift @processors, $_; | |
} | |
when Rakudo::Internals::HyperBatcher { | |
if $next-stage { | |
die "A HyperBatcher may only be at the pipeline start"; | |
} | |
$cur-dest-channel = self!maybe-processor-workers: | |
[@processors], $cur-dest-channel, $config.degree; | |
@processors = (); | |
self!batch-worker($cur-stage, $cur-dest-channel, $ready-channel, | |
$config.batch); | |
} | |
default { | |
die "Unrecognized hyper pipeline stage " ~ .^name(); | |
} | |
} | |
$cur-stage = $next-stage; | |
} | |
# Set off $degree batches. | |
$ready-channel.send(True) for ^$config.degree; | |
} | |
method !batch-worker(Rakudo::Internals::HyperBatcher $stage, Channel $dest-channel, | |
Channel $ready-channel, int $size) { | |
start { | |
loop { | |
$*AWAITER.await($ready-channel); | |
my $batch := $stage.produce-batch($size); | |
$dest-channel.send($batch); | |
last if $batch.last; | |
CATCH { | |
default { | |
.note; | |
$dest-channel.fail($_); | |
} | |
} | |
} | |
} | |
} | |
method !maybe-processor-workers(@processors, Channel $dest-channel, Int $degree) { | |
return $dest-channel unless @processors; | |
my $source-channel := Channel.new; | |
for ^$degree { | |
start { | |
loop { | |
my $batch := $*AWAITER.await($source-channel); | |
for @processors { | |
.process-batch($batch); | |
} | |
$dest-channel.send($batch); | |
} | |
CATCH { | |
when X::Channel::ReceiveOnClosed { | |
$dest-channel.close; | |
} | |
default { | |
.note; | |
$dest-channel.fail($_); | |
} | |
} | |
} | |
} | |
return $source-channel; | |
} | |
method !join-worker(Rakudo::Internals::HyperJoiner $stage, Channel $source) { | |
start { | |
loop { | |
$stage.consume-batch($*AWAITER.await($source)); | |
} | |
CATCH { | |
when X::Channel::ReceiveOnClosed { | |
# We got everything; quietly exit the start block. | |
} | |
default { | |
$stage.consume-error($_); | |
CATCH { | |
default { | |
# Error handling code blew up; let the scheduler's | |
# error handler do it, which will typically bring | |
# the program down. Should never get here unless | |
# we've some bug in a joiner implementation. | |
$*SCHEDULER.handle_uncaught($_); | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
# Batches values sourced from an iterator, producing a work batch from them. | |
role Rakudo::Internals::HyperIteratorBatcher does Rakudo::Internals::HyperBatcher { | |
my constant NO_LOOKAHEAD = Mu.new; | |
has Iterator $!iterator; | |
has $!lookahead; | |
submethod BUILD(Iterator :$iterator!) { | |
$!iterator := $iterator; | |
$!lookahead := NO_LOOKAHEAD; | |
} | |
method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) { | |
my IterationBuffer $items .= new; | |
my Bool $first; | |
my Bool $last; | |
if $!lookahead =:= NO_LOOKAHEAD { | |
$first = True; | |
if $!iterator.push-exactly($items, $batch-size) =:= IterationEnd { | |
$last = True; | |
} | |
else { | |
$!lookahead := $!iterator.pull-one; | |
$last = True if $!lookahead =:= IterationEnd; | |
} | |
} | |
else { | |
$first = False; | |
$items.push($!lookahead); | |
if $!iterator.push-exactly($items, $batch-size - 1) =:= IterationEnd { | |
$last = True; | |
} | |
else { | |
$!lookahead := $!iterator.pull-one; | |
$last = True if $!lookahead =:= IterationEnd; | |
} | |
} | |
my $sequence-number = self.next-sequence-number(); | |
return Rakudo::Internals::HyperWorkBatch.new(:$sequence-number, :$items, :$first, :$last); | |
} | |
} | |
class Rakudo::Internals::RaceToIterator does Rakudo::Internals::HyperJoiner does Iterator { | |
has Channel $.batches .= new; | |
has int $!last-target = -1; | |
has int $!batches-seen = 0; | |
method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { | |
$!batches.send($batch); | |
$!batches-seen++; | |
if $batch.last { | |
$!last-target = $batch.sequence-number; | |
} | |
if $!last-target >= 0 && $!batches-seen == $!last-target + 1 { | |
$!batches.close; | |
} | |
} | |
method consume-error(Exception $e --> Nil) { | |
note $e; | |
$!batches.fail($e); | |
} | |
has IterationBuffer $!current-items = BEGIN IterationBuffer.new; | |
method pull-one() { | |
until nqp::elems(nqp::decont($!current-items)) { # Handles empty batches | |
my $batch = $!batches.receive; | |
self.batch-used(); | |
$!current-items = $batch.items; | |
CATCH { | |
when X::Channel::ReceiveOnClosed { | |
return IterationEnd; | |
} | |
# Throw other errors onwards | |
} | |
} | |
nqp::shift(nqp::decont($!current-items)) | |
} | |
} | |
class Rakudo::Internals::HyperToIterator does Rakudo::Internals::HyperJoiner does Iterator { | |
has Channel $.batches .= new; | |
has int $!last-target = -1; | |
has int $!next-to-send = 0; | |
has @!held-back; | |
method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { | |
if $batch.last { | |
$!last-target = $batch.sequence-number; | |
} | |
self!handle-batch($batch); | |
if $!last-target >= 0 && $!next-to-send > $!last-target { | |
$!batches.close; | |
} | |
} | |
method !handle-batch($batch) { | |
my int $seq = $batch.sequence-number; | |
if $seq == $!next-to-send { | |
$!batches.send($batch); | |
$!next-to-send++; | |
if @!held-back { | |
@!held-back.=sort(*.sequence-number); | |
while @!held-back && @!held-back[0].sequence-number == $!next-to-send { | |
$!batches.send(@!held-back.shift); | |
$!next-to-send++; | |
} | |
} | |
} | |
else { | |
@!held-back.push($batch); | |
} | |
} | |
method consume-error(Exception $e --> Nil) { | |
note $e; | |
$!batches.fail($e); | |
} | |
has IterationBuffer $!current-items = BEGIN IterationBuffer.new; | |
method pull-one() { | |
until nqp::elems(nqp::decont($!current-items)) { # Handles empty batches | |
my $batch = $!batches.receive; | |
self.batch-used(); | |
$!current-items = $batch.items; | |
CATCH { | |
when X::Channel::ReceiveOnClosed { | |
return IterationEnd; | |
} | |
# Throw other errors onwards | |
} | |
} | |
nqp::shift(nqp::decont($!current-items)) | |
} | |
} | |
# Implementations shared between HyperSeq and RaceSeq. | |
class Rakudo::Internals::HyperSharedImpl { | |
my class Grep does Rakudo::Internals::HyperProcessor { | |
has $!matcher; | |
submethod TWEAK(:$!matcher) {} | |
method process-batch(Rakudo::Internals::HyperWorkBatch $batch) { | |
$batch.replace-with-results($batch.grep($!matcher)) | |
} | |
} | |
method grep($source, $matcher, %options) { | |
Grep.new(:$source, :$matcher) | |
} | |
} | |
# A HyperSeq performs batches of work in parallel, but retains order of output | |
# values relative to input values. | |
my class HyperSeq does Iterable does Sequence { | |
has HyperConfiguration $.configuration; | |
has Rakudo::Internals::HyperWorkStage $!work-stage-head; | |
submethod BUILD(:$!configuration!, :$!work-stage-head!) {} | |
method iterator(HyperSeq:D: --> Iterator) { | |
my $joiner := Rakudo::Internals::HyperToIterator.new: | |
source => $!work-stage-head; | |
Rakudo::Internals::HyperPipeline.start($joiner, $!configuration); | |
$joiner | |
} | |
method grep(HyperSeq:D: $matcher, *%options) { | |
HyperSeq.new: :$!configuration, | |
:work-stage-head(Rakudo::Internals::HyperSharedImpl.grep( | |
$!work-stage-head, $matcher, %options)) | |
} | |
method hyper(HyperSeq:D:) { self } | |
} | |
# A RaceSeq performs batches of work in parallel, and will deliver the results | |
# in the order they are produced (so potentially disordering them relative to | |
# the input). | |
my class RaceSeq does Iterable { | |
has HyperConfiguration $.configuration; | |
has Rakudo::Internals::HyperWorkStage $!work-stage-head; | |
submethod BUILD(:$!configuration!, :$!work-stage-head!) {} | |
method iterator(RaceSeq:D: --> Iterator) { | |
my $joiner := Rakudo::Internals::RaceToIterator.new: | |
source => $!work-stage-head; | |
Rakudo::Internals::HyperPipeline.start($joiner, $!configuration); | |
$joiner | |
} | |
method grep(RaceSeq:D: $matcher, *%options) { | |
RaceSeq.new: :$!configuration, | |
:work-stage-head(Rakudo::Internals::HyperSharedImpl.grep( | |
$!work-stage-head, $matcher, %options)) | |
} | |
method race(RaceSeq:D:) { self } | |
} | |
sub new-hyper(Iterable:D \source, Int() :$degree = nqp::cpucores(), Int() :$batch = 64) { | |
HyperSeq.new: | |
configuration => HyperConfiguration.new(:$degree, :$batch), | |
work-stage-head => Rakudo::Internals::HyperIteratorBatcher.new( | |
iterator => source.iterator | |
) | |
} | |
sub new-race(Iterable:D \source, Int() :$degree = nqp::cpucores(), Int() :$batch = 64) { | |
RaceSeq.new: | |
configuration => HyperConfiguration.new(:$degree, :$batch), | |
work-stage-head => Rakudo::Internals::HyperIteratorBatcher.new( | |
iterator => source.iterator | |
) | |
} | |
# ---tests--- | |
sub time(&op) { | |
op(); # warm-up | |
my $t = now; | |
op(); | |
now - $t | |
} | |
my (@seq, @race, @hyper); | |
say "Sequential: " ~ time({ @seq = (^20000).grep(*.is-prime) }); | |
say "Race: " ~ time({ @race = new-race(^20000).grep(*.is-prime) }); | |
say "Hyper: " ~ time({ @hyper = new-hyper(^20000).grep(*.is-prime) }); | |
is @race.elems, @seq.elems, 'Correct number of elements from race'; | |
is @race.sort, @seq, 'Correct results from race (sorted to compare)'; | |
is @hyper.elems, @seq.elems, 'Correct number of elements from hyper'; | |
is @hyper, @seq, 'Correct results from hyper (order preseved, no sorting)'; | |
done-testing; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment