Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
HLL and documented prototype of .hyper/.race support
use MONKEY;
class ConcurrentChain does Iterable { # perhaps this should be HyperSeq ?
has $!source; # the source iterator
has int $!batch; # batch size
has int $!degree; # degree size (number of workers)
has str $!method; # name of method: "hyper" or "race"
has @!actions; # Pairs of actions: Method => Capture
# Need to special case "map", to handle "for {}", which is code-genned
# as a .map, but with a :item named parameter specified
my $map-method := List.^find_method("map");
method map(ConcurrentChain:D: :$item, |c) {
if defined($item) {
Seq.new(self.iterator).map(:$item, |c)
}
elsif c.AT-POS(0).has-phaser("LAST") {
Seq.new(self.iterator).map(|c) # must use serial version
}
else { # concurrent, save as action to do
@!actions.push(Pair.new($map-method,c));
self
}
}
# Need to special case "grep" in case the block has a LAST phaser.
my $grep-method := List.^find_method("grep");
method grep(ConcurrentChain:D: |c) {
if c.AT-POS(0).has-phaser("LAST") {
Seq.new(self.iterator).grep(|c) # must use serial version
}
else { # concurrent, save as action to do
@!actions.push(Pair.new($grep-method,c));
self
}
}
# concurrent methods that should just save action for later
# BEGIN for <grep> {
# my $call-method := List.^find_method($_);
# my $chain-method := method (ConcurrentChain:D: |c) {
# @!actions.push(Pair.new($call-method,c));
# self
# }
# $chain-method.set_name($_);
# ::?CLASS.^add_method($_,$chain-method)
# }
# serial methods
BEGIN for <gist keys kv perl repeated squish Str unique values> {
my $call-method := List.^find_method($_);
my $chain-method := method (ConcurrentChain:D: |c) {
self.iterator.push-all(my $buffer := IterationBuffer.CREATE);
$call-method(
nqp::p6bindattrinvres(nqp::create(List),List,'$!reified',$buffer)
)
}
$chain-method.set_name($_);
::?CLASS.^add_method($_,$chain-method)
}
method !SET-SELF(\source, \batch, \degree, $method) {
$!source := source;
$!batch = batch;
$!degree = degree;
$!method = $method;
self
}
method new(\source, \batch, \degree, $method) {
batch <= 0
?? X::Invalid::Value.new(
:$method, :name<batch>, :value(batch)).throw
!! degree <= 0
?? X::Invalid::Value.new(
:$method,:name<degree>,:value(degree)).throw
!! self.CREATE!SET-SELF(source, batch, degree, $method)
}
# The serial iterator producing the results
method iterator() {
@!actions # we can run actions
?? $!method eq 'hyper'
?? Rakudo::Iterator.HyperActions( # we want "hyper"
$!source,@!actions,$!batch,$!degree)
!! Rakudo::Iterator.RaceActions( # we want "race"
$!source,@!actions,$!batch,$!degree)
!! $!source # sadly, nothing to do
}
# Make sure we will always run the iterator
method sink() { self.iterator.sink-all }
}
# Since we cannot augment a role, specifically the Iterable role, we do
# an augment on Seq. For now, we must therefore do a .Seq on any iterable
# before we can hyper / race it.
augment class Seq {
# Cannot call it hyper
method hijper(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) {
ConcurrentChain.new(self.iterator,$batch,$degree,"hyper")
}
# Cannot call it race
method rees(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) {
ConcurrentChain.new(self.iterator,$batch,$degree,"race")
}
}
augment class Rakudo::Iterator {
my $empty := IterationBuffer.CREATE;
# Returns a sort of hyper-iterator that can be called from different
# threads at the same, and which produces Pairs with an ordinal number
# and an IterationBuffer filled with as many elements that could be
# fetched from the given source iterator and the given batch size.
# If the source iterator is exhausted, it will keep returning
# IterationEnd no matter how many times .pull-buffer is called.
method ConcurrentBatcher(\iterator, Int:D $batch) {
class {
has $!iterator; # the source iterator
has $!lock; # making sure one worker runs this code
has int $!ordinal; # the key of the Pair we produce
has int $!batch; # 0 indicates we're exhausted
method !SET-SELF(\iterator, \batch) {
$!iterator := iterator;
$!lock := Lock.new;
$!ordinal = -1;
$!batch = batch;
self
}
method new(\it,\ba) { self.CREATE!SET-SELF(it,ba) }
method pull-buffer() {
nqp::if(
$!batch,
nqp::stmts( # we're still in business
(my $buffer := IterationBuffer.CREATE),
(my $iterator := $!iterator),
(my int $batch = $!batch),
(my int $found = -1),
$!lock.protect({
nqp::stmts( # we haz the source iterator
nqp::until(
nqp::iseq_i(
($found = nqp::add_i($found,1)),
$batch
) || nqp::eqaddr(
(my $pulled := $iterator.pull-one),
IterationEnd
),
nqp::bindpos($buffer,$found,$pulled)
),
nqp::if( # source iterator exhausted
nqp::islt_i($found,$batch),
nqp::stmts( # only produce what we found
nqp::splice(
$buffer,
$empty,
$found,
nqp::sub_i($batch,$found)
),
($!batch = 0) # tell the world we're done
)
),
nqp::if(
$found,
Pair.new( # produce a batch
($!ordinal = nqp::add_i($!ordinal,1)),
$buffer
),
IterationEnd # we're done here
)
)
})
),
IterationEnd # we were already done
)
}
}.new(iterator,$batch)
}
# This role provides the basis for concurrent processing of an
# iterator. Its .new method expects a source iterator, a list of
# actions to be performed (consisting of method => capture pairs),
# on each value obtained from the source iterator, the maximum size
# of the buffer to be processed inside a single worker, and the
# degree (aka number of workers). Classes must provide at least a
# pull-one and a !TWEAK method (which is expected to return self).
#
# Please note that although all sorts of parallel processing happens
# inside the classes that do this role, the classes are expose
# themselves as ordinary Iterator classes to the world.
role ConcurrentActionator does Iterator {
has $!queue;
has @!promises;
has Int $!alive; # alas, cannot be a native int :-(
# We use a concurrent blocking queue (the work horse of a
# Channel, but without any frills here). It is save to
# nqp::push() on it from any thread, and nqp::shift() will
# block until a value becomes available. Since we know the
# number of workers, and each worker will send an IterationEnd
# at the end, by counting the number of IterationEnd's seen,
# we know when we should stop nqp::shift()ing from the queue.
my class Queue is repr('ConcBlockingQueue') { }
method !TWEAK { ... }
method !SET-SELF(\source, \actions, \batch, \degree) {
# Create two lists: one with methods to call, and one
# with captures to apply, so we can easily index into
# what we need to do.
my int $todo = actions.elems;
my $methods := nqp::setelems(nqp::list,$todo);
my $captures := nqp::setelems(nqp::list,$todo);
for actions.kv -> $w, $action {
my $m := $action.key;
nqp::istype($m,Method)
?? nqp::bindpos($methods,$w,$m)
!! nqp::bindpos($methods,$w,List.^find_method($m.Str));
nqp::bindpos($captures,$w,$action.value);
}
# Set up the batcher and the queue.
$!alive = degree;
$!queue := Queue.CREATE;
my $batcher := Rakudo::Iterator.ConcurrentBatcher(source,batch);
# Set up the promises of the workers. Not sure we would
# actually ever need that, but maybe we can do something
# with reducing operations in the future, where we would
# need the result. Or we can have the worker just return
# the number of elements processed, or other statistics.
for ^degree -> $w {
@!promises.BIND-POS($w,
start {
CATCH { .say } # not sure what to do about exceptions
# Ensure we *always* queue an IterationEnd for this
# worker when we're done, normally or exceptionally.
LEAVE nqp::push($!queue,IterationEnd);
# A dummy HLL list to be passed as a "self", with
# its $!reified changed for each call.
my $List := List.CREATE;
# While we have batches
nqp::until(
nqp::eqaddr(
(my $enroute := $batcher.pull-buffer),
IterationEnd
),
# For all actions to perform
nqp::stmts(
(my int $i = -1),
nqp::while(
nqp::islt_i(($i = nqp::add_i($i,1)),$todo),
nqp::stmts(
# Transplant batch into List
nqp::bindattr($List,List,'$!reified',
nqp::getattr($enroute,Pair,'$!value')
),
# Run the iterator on the List for this action
nqp::atpos($methods,$i)(
$List,|nqp::atpos($captures,$i)
).iterator.push-all(
nqp::bindattr($enroute,Pair,'$!value',
IterationBuffer.CREATE
)
)
)
)
),
# Queue the result of all actions
nqp::push($!queue,$enroute)
)
}
)
}
# The instantiated object
self
}
method new(\source, \actions, \batch, \degree) {
self.CREATE!SET-SELF(source,actions,batch,degree)!TWEAK
}
method sink-all(--> IterationEnd) is raw {
# Just eat the queue, we don't care about order in any way
nqp::while(
$!alive,
nqp::stmts(
nqp::until(
nqp::eqaddr(nqp::shift($!queue),IterationEnd),
nqp::null
),
nqp::unless(
--$!alive,
self!cleanup,
)
)
)
}
# Handle the promises, indicate we're done
method !cleanup(--> IterationEnd) {
.result for @!promises
}
}
# The "hyper" case of the ConcurrentActionator role.
method HyperActions(\source,\actions,\batch,\degree) {
class :: does ConcurrentActionator {
has $!slipped; # current list of values to produce
has $!processed; # list of processed chunks
has int $!offset; # ordinal number of chunk at index 0
method !TWEAK() {
$!slipped := $empty;
$!processed := nqp::list;
self
}
method pull-one() is raw {
nqp::if(
nqp::elems($!slipped),
nqp::shift($!slipped), # produce from the chunk
nqp::if(
$!alive,
nqp::stmts(
nqp::if( # no chunk to produce from
nqp::existspos($!processed,0),
nqp::stmts( # next chunk is available
($!offset = nqp::add_i($!offset,1)),
($!slipped := nqp::shift($!processed))
),
nqp::if( # next chunk not there
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::if( # a worker has expired
--$!alive,
self.pull-one, # others not, try again
self!cleanup, # mohican time, bye bye
),
nqp::if( # a fresh chunk
nqp::iseq_i(
$!offset,
(my int $ordinal = $chunk.key)
),
nqp::stmts( # in sequence chunk
($!offset = nqp::add_i($!offset,1)),
nqp::if( # lose placeholder if any
nqp::elems($!processed),
nqp::shift($!processed)
),
($!slipped := $chunk.value)
),
nqp::stmts( # out of sequence
nqp::bindpos( # store for later usage
$!processed,
nqp::sub_i($ordinal,$!offset),
$chunk.value
),
)
)
)
),
self.pull-one # rinse and repeat
),
IterationEnd
)
)
}
method push-all($target --> IterationEnd) {
nqp::stmts(
nqp::while( # produce from available
nqp::elems($!slipped),
$target.push(nqp::shift($!slipped))
),
nqp::while( # do the other chunks
$!alive,
nqp::if(
nqp::existspos($!processed,0),
nqp::stmts( # next chunk is available
($!offset = nqp::add_i($!offset,1)),
(my $slipped := nqp::shift($!processed)),
nqp::while(
nqp::elems($slipped),
$target.push(nqp::shift($slipped))
)
),
nqp::if( # next chunk not there
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::unless( # worker expired
--$!alive,
self!cleanup # mohican time, bye bye
),
nqp::bindpos( # out of sequence
$!processed, # store for later usage
nqp::sub_i($chunk.key,$!offset),
$chunk.value
)
)
)
)
)
}
}.new(source, actions, batch, degree)
}
# The "race" case of the ConcurrentActionator role.
method RaceActions(\source,\actions,\batch,\degree) {
class :: does ConcurrentActionator {
has $!slipped;
method !TWEAK() {
$!slipped := $empty;
self
}
method pull-one() is raw {
nqp::if(
nqp::elems($!slipped),
nqp::shift($!slipped), # produce from available
nqp::if( # no values available
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::if( # worker exhausted
--$!alive,
self.pull-one, # but still others
self!cleanup, # mohican time, bye bye
),
nqp::stmts(
($!slipped := $chunk.value), # could be empty
self.pull-one # so try again
)
)
)
}
method push-all($target --> IterationEnd) {
nqp::stmts(
nqp::while( # produce from available
nqp::elems($!slipped),
$target.push(nqp::shift($!slipped))
),
nqp::while( # do the other chunks
$!alive,
nqp::until( # still in business
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::stmts( # we have a chunk
(my $slipped := $chunk.value),
nqp::while( # push the chunk
nqp::elems($slipped),
$target.push(nqp::shift($slipped))
)
)
),
nqp::unless(
--$!alive,
self!cleanup, # mohican time, bye bye
)
)
)
}
}.new(source, actions, batch, degree)
}
}
my @a = ^106;
my $now = now;
dd @a.Seq.hijper(:10batch).map({ sleep rand / 1000; $_++ }).grep({ $_ %% 2 });
say "parallel processed in {now - $now}";
$now = now;
dd @a.map( { sleep rand / 1000; $_++ } ).grep: { $_ %% 2 };
say "serial processed in {now - $now}";
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment