Skip to content

Instantly share code, notes, and snippets.

@jnthn
Created October 12, 2017 16:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jnthn/754292747161f95093d80b207d077d74 to your computer and use it in GitHub Desktop.
Save jnthn/754292747161f95093d80b207d077d74 to your computer and use it in GitHub Desktop.
New hyper/race implementation exploration
use Test;
use nqp;
# Contains configuration for a hyper or race operation.
my class HyperConfiguration {
has Int $.degree is required;
has Int $.batch is required;
}
# A batch of work sent to a worker in a hyper or race operation. It is an
# Iterable, and iterates to the items in the batch. This is so that it can be
# easily processed in terms of (non-hyper) Iterable implementations.
class Rakudo::Internals::HyperWorkBatch does Iterable {
# The items in the batch.
has IterationBuffer $.items;
# Sequence number of the batch, starting from zero.
has int $.sequence-number;
# Is this the first batch that was produced at the last fork point or the last batch that the
# fork point will produce?
has Bool $.first;
has Bool $.last;
# Iterator for a HyperWorkBatch;
my class HyperWorkBatchIterator does Iterator {
has $!items;
has int $!i;
has int $!n;
submethod BUILD(:$items --> Nil) {
$!items := nqp::decont($items);
$!i = 0;
$!n = nqp::elems($!items);
}
method pull-one() {
$!i < $!n
?? nqp::atpos($!items, $!i++)
!! IterationEnd
}
}
method iterator(--> Iterator) {
HyperWorkBatchIterator.new(:$!items)
}
method replace-with-results(Iterable \i --> Nil) {
my $results := IterationBuffer.new;
i.iterator.push-all($results);
$!items := $results;
}
}
# Work stages are individual steps in a hyper/race pipeline. They are chained
# in a linked list by the source attribute. Roles for different kinds of stages
# follow.
role Rakudo::Internals::HyperWorkStage {
has Rakudo::Internals::HyperWorkStage $.source;
}
# A HyperBatcher stage produces batches of work to do. It will typically be
# created with an Iterable of some kind, and divide up the work into batches
# of the appropriate size. Such a stage always lives at the start of a piece
# of parallel processing pipeline.
role Rakudo::Internals::HyperBatcher does Rakudo::Internals::HyperWorkStage {
has $!sequence = 0;
method next-sequence-number() {
$!sequence++
}
method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) { ... }
}
# A HyperProcessor performs some operation in a work batch, updating it to
# reflect the results of the operation.
role Rakudo::Internals::HyperProcessor does Rakudo::Internals::HyperWorkStage {
method process-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... }
}
# A HyperRebatcher is given batches, and may produce zero or more batches as a
# result. The produced batches will be passed on to the next pipeline stages.
# This is intended only for steps that need to look across multiple batches,
# but that work in a "streaming" way rather than being a full bottleneck in
# the pipeline. A HyperRebatcher should produce one output batch for each
# input batch it gets (though may produce no batches on one call, and two on
# the next, for example).
role Rakudo::Internals::HyperRebatcher does Rakudo::Internals::HyperWorkStage {
method rebatch(Rakudo::Internals::HyperWorkBatch $batch --> List) { ... }
}
# Comes at the end of a pipeline, or a stage in a multi-stage pipeline (that
# is, one with a step in it where all results are needed). The batch-used
# method should be called whenever a batch passed to consume-batch has been
# used. This allows for backpressure control: a sequential iterator at the
# end of a parallel pipeline can choose to call batch-used only at the point
# when the downstream iterator has actually eaten all the values in a batch.
role Rakudo::Internals::HyperJoiner does Rakudo::Internals::HyperWorkStage {
has $!batch-used-channel;
method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... }
method consume-error(Exception \e) { ... }
method batch-used(--> Nil) {
$!batch-used-channel.send(True);
}
method SET-BATCH-USED-CHANNEL($!batch-used-channel) {}
}
# Takes a linked list of pipeline stages and assembles them into a pipeline.
# Given a pipeline must end with a HyperJoiner, it expects to be passed
# something of this type.
class Rakudo::Internals::HyperPipeline {
method start(Rakudo::Internals::HyperJoiner $stage, HyperConfiguration $config) {
# Create channel that the last non-join operation in the pipeline will
# put its results into, and start a worker to handle the channel.
my $cur-dest-channel = Channel.new;
self!join-worker($stage, $cur-dest-channel);
# Create a channel that will signal we're ready for more batches,
# and set join stage to send on it when batch-used is called.
my $ready-channel = Channel.new;
$stage.SET-BATCH-USED-CHANNEL($ready-channel);
# Go through the rest of the stages.
my $cur-stage = $stage.source;
my @processors;
while $cur-stage {
my $next-stage = $cur-stage.source;
given $cur-stage {
when Rakudo::Internals::HyperProcessor {
# Unshift them so a sequence will be in application order.
unshift @processors, $_;
}
when Rakudo::Internals::HyperBatcher {
if $next-stage {
die "A HyperBatcher may only be at the pipeline start";
}
$cur-dest-channel = self!maybe-processor-workers:
[@processors], $cur-dest-channel, $config.degree;
@processors = ();
self!batch-worker($cur-stage, $cur-dest-channel, $ready-channel,
$config.batch);
}
default {
die "Unrecognized hyper pipeline stage " ~ .^name();
}
}
$cur-stage = $next-stage;
}
# Set off $degree batches.
$ready-channel.send(True) for ^$config.degree;
}
method !batch-worker(Rakudo::Internals::HyperBatcher $stage, Channel $dest-channel,
Channel $ready-channel, int $size) {
start {
loop {
$*AWAITER.await($ready-channel);
my $batch := $stage.produce-batch($size);
$dest-channel.send($batch);
last if $batch.last;
CATCH {
default {
.note;
$dest-channel.fail($_);
}
}
}
}
}
method !maybe-processor-workers(@processors, Channel $dest-channel, Int $degree) {
return $dest-channel unless @processors;
my $source-channel := Channel.new;
for ^$degree {
start {
loop {
my $batch := $*AWAITER.await($source-channel);
for @processors {
.process-batch($batch);
}
$dest-channel.send($batch);
}
CATCH {
when X::Channel::ReceiveOnClosed {
$dest-channel.close;
}
default {
.note;
$dest-channel.fail($_);
}
}
}
}
return $source-channel;
}
method !join-worker(Rakudo::Internals::HyperJoiner $stage, Channel $source) {
start {
loop {
$stage.consume-batch($*AWAITER.await($source));
}
CATCH {
when X::Channel::ReceiveOnClosed {
# We got everything; quietly exit the start block.
}
default {
$stage.consume-error($_);
CATCH {
default {
# Error handling code blew up; let the scheduler's
# error handler do it, which will typically bring
# the program down. Should never get here unless
# we've some bug in a joiner implementation.
$*SCHEDULER.handle_uncaught($_);
}
}
}
}
}
}
}
# Batches values sourced from an iterator, producing a work batch from them.
role Rakudo::Internals::HyperIteratorBatcher does Rakudo::Internals::HyperBatcher {
my constant NO_LOOKAHEAD = Mu.new;
has Iterator $!iterator;
has $!lookahead;
submethod BUILD(Iterator :$iterator!) {
$!iterator := $iterator;
$!lookahead := NO_LOOKAHEAD;
}
method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) {
my IterationBuffer $items .= new;
my Bool $first;
my Bool $last;
if $!lookahead =:= NO_LOOKAHEAD {
$first = True;
if $!iterator.push-exactly($items, $batch-size) =:= IterationEnd {
$last = True;
}
else {
$!lookahead := $!iterator.pull-one;
$last = True if $!lookahead =:= IterationEnd;
}
}
else {
$first = False;
$items.push($!lookahead);
if $!iterator.push-exactly($items, $batch-size - 1) =:= IterationEnd {
$last = True;
}
else {
$!lookahead := $!iterator.pull-one;
$last = True if $!lookahead =:= IterationEnd;
}
}
my $sequence-number = self.next-sequence-number();
return Rakudo::Internals::HyperWorkBatch.new(:$sequence-number, :$items, :$first, :$last);
}
}
class Rakudo::Internals::RaceToIterator does Rakudo::Internals::HyperJoiner does Iterator {
has Channel $.batches .= new;
has int $!last-target = -1;
has int $!batches-seen = 0;
method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) {
$!batches.send($batch);
$!batches-seen++;
if $batch.last {
$!last-target = $batch.sequence-number;
}
if $!last-target >= 0 && $!batches-seen == $!last-target + 1 {
$!batches.close;
}
}
method consume-error(Exception $e --> Nil) {
note $e;
$!batches.fail($e);
}
has IterationBuffer $!current-items = BEGIN IterationBuffer.new;
method pull-one() {
until nqp::elems(nqp::decont($!current-items)) { # Handles empty batches
my $batch = $!batches.receive;
self.batch-used();
$!current-items = $batch.items;
CATCH {
when X::Channel::ReceiveOnClosed {
return IterationEnd;
}
# Throw other errors onwards
}
}
nqp::shift(nqp::decont($!current-items))
}
}
class Rakudo::Internals::HyperToIterator does Rakudo::Internals::HyperJoiner does Iterator {
has Channel $.batches .= new;
has int $!last-target = -1;
has int $!next-to-send = 0;
has @!held-back;
method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) {
if $batch.last {
$!last-target = $batch.sequence-number;
}
self!handle-batch($batch);
if $!last-target >= 0 && $!next-to-send > $!last-target {
$!batches.close;
}
}
method !handle-batch($batch) {
my int $seq = $batch.sequence-number;
if $seq == $!next-to-send {
$!batches.send($batch);
$!next-to-send++;
if @!held-back {
@!held-back.=sort(*.sequence-number);
while @!held-back && @!held-back[0].sequence-number == $!next-to-send {
$!batches.send(@!held-back.shift);
$!next-to-send++;
}
}
}
else {
@!held-back.push($batch);
}
}
method consume-error(Exception $e --> Nil) {
note $e;
$!batches.fail($e);
}
has IterationBuffer $!current-items = BEGIN IterationBuffer.new;
method pull-one() {
until nqp::elems(nqp::decont($!current-items)) { # Handles empty batches
my $batch = $!batches.receive;
self.batch-used();
$!current-items = $batch.items;
CATCH {
when X::Channel::ReceiveOnClosed {
return IterationEnd;
}
# Throw other errors onwards
}
}
nqp::shift(nqp::decont($!current-items))
}
}
# Implementations shared between HyperSeq and RaceSeq.
class Rakudo::Internals::HyperSharedImpl {
my class Grep does Rakudo::Internals::HyperProcessor {
has $!matcher;
submethod TWEAK(:$!matcher) {}
method process-batch(Rakudo::Internals::HyperWorkBatch $batch) {
$batch.replace-with-results($batch.grep($!matcher))
}
}
method grep($source, $matcher, %options) {
Grep.new(:$source, :$matcher)
}
}
# A HyperSeq performs batches of work in parallel, but retains order of output
# values relative to input values.
my class HyperSeq does Iterable does Sequence {
has HyperConfiguration $.configuration;
has Rakudo::Internals::HyperWorkStage $!work-stage-head;
submethod BUILD(:$!configuration!, :$!work-stage-head!) {}
method iterator(HyperSeq:D: --> Iterator) {
my $joiner := Rakudo::Internals::HyperToIterator.new:
source => $!work-stage-head;
Rakudo::Internals::HyperPipeline.start($joiner, $!configuration);
$joiner
}
method grep(HyperSeq:D: $matcher, *%options) {
HyperSeq.new: :$!configuration,
:work-stage-head(Rakudo::Internals::HyperSharedImpl.grep(
$!work-stage-head, $matcher, %options))
}
method hyper(HyperSeq:D:) { self }
}
# A RaceSeq performs batches of work in parallel, and will deliver the results
# in the order they are produced (so potentially disordering them relative to
# the input).
my class RaceSeq does Iterable {
has HyperConfiguration $.configuration;
has Rakudo::Internals::HyperWorkStage $!work-stage-head;
submethod BUILD(:$!configuration!, :$!work-stage-head!) {}
method iterator(RaceSeq:D: --> Iterator) {
my $joiner := Rakudo::Internals::RaceToIterator.new:
source => $!work-stage-head;
Rakudo::Internals::HyperPipeline.start($joiner, $!configuration);
$joiner
}
method grep(RaceSeq:D: $matcher, *%options) {
RaceSeq.new: :$!configuration,
:work-stage-head(Rakudo::Internals::HyperSharedImpl.grep(
$!work-stage-head, $matcher, %options))
}
method race(RaceSeq:D:) { self }
}
sub new-hyper(Iterable:D \source, Int() :$degree = nqp::cpucores(), Int() :$batch = 64) {
HyperSeq.new:
configuration => HyperConfiguration.new(:$degree, :$batch),
work-stage-head => Rakudo::Internals::HyperIteratorBatcher.new(
iterator => source.iterator
)
}
sub new-race(Iterable:D \source, Int() :$degree = nqp::cpucores(), Int() :$batch = 64) {
RaceSeq.new:
configuration => HyperConfiguration.new(:$degree, :$batch),
work-stage-head => Rakudo::Internals::HyperIteratorBatcher.new(
iterator => source.iterator
)
}
# ---tests---
sub time(&op) {
op(); # warm-up
my $t = now;
op();
now - $t
}
my (@seq, @race, @hyper);
say "Sequential: " ~ time({ @seq = (^20000).grep(*.is-prime) });
say "Race: " ~ time({ @race = new-race(^20000).grep(*.is-prime) });
say "Hyper: " ~ time({ @hyper = new-hyper(^20000).grep(*.is-prime) });
is @race.elems, @seq.elems, 'Correct number of elements from race';
is @race.sort, @seq, 'Correct results from race (sorted to compare)';
is @hyper.elems, @seq.elems, 'Correct number of elements from hyper';
is @hyper, @seq, 'Correct results from hyper (order preseved, no sorting)';
done-testing;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment