Last active
March 22, 2017 23:13
-
-
Save lizmat/1c80d2210aeb4239810d7818fca7e575 to your computer and use it in GitHub Desktop.
Prototype Rakudo::Iterator.(Hyper|Race)Actions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use MONKEY; | |
class ConcurrentChain does Iterable { # perhaps this should be HyperSeq ? | |
has $!source; # the source iterator | |
has int $!batch; # batch size | |
has int $!degree; # degree size (number of workers) | |
has str $!method; # name of method: "hyper" or "race" | |
has @!actions; # Pairs of actions: Method => Capture | |
# Need to special case "map", to handle "for {}", which is code-genned | |
# as a .map, but with a :item named parameter specified | |
my $map-method := List.^find_method("map"); | |
method map(ConcurrentChain:D: :$item, |c) { | |
if defined($item) { | |
Seq.new(self.iterator).map(:$item, |c) | |
} | |
elsif c.AT-POS(0).has-phaser("LAST") { | |
Seq.new(self.iterator).map(|c) # must use serial version | |
} | |
else { # concurrent, save as action to do | |
@!actions.push(Pair.new($map-method,c)); | |
self | |
} | |
} | |
# Need to special case "grep" in case the block has a LAST phaser. | |
my $grep-method := List.^find_method("grep"); | |
method grep(ConcurrentChain:D: |c) { | |
if c.AT-POS(0).has-phaser("LAST") { | |
Seq.new(self.iterator).grep(|c) # must use serial version | |
} | |
else { # concurrent, save as action to do | |
@!actions.push(Pair.new($grep-method,c)); | |
self | |
} | |
} | |
# concurrent methods that should just save action for later | |
# BEGIN for <grep> { | |
# my $call-method := List.^find_method($_); | |
# my $chain-method := method (ConcurrentChain:D: |c) { | |
# @!actions.push(Pair.new($call-method,c)); | |
# self | |
# } | |
# $chain-method.set_name($_); | |
# ::?CLASS.^add_method($_,$chain-method) | |
# } | |
# serial methods | |
BEGIN for <gist keys kv perl repeated squish Str unique values> { | |
my $call-method := List.^find_method($_); | |
my $chain-method := method (ConcurrentChain:D: |c) { | |
self.iterator.push-all(my $buffer := IterationBuffer.CREATE); | |
$call-method( | |
nqp::p6bindattrinvres(nqp::create(List),List,'$!reified',$buffer) | |
) | |
} | |
$chain-method.set_name($_); | |
::?CLASS.^add_method($_,$chain-method) | |
} | |
method !SET-SELF(\source, \batch, \degree, $method) { | |
$!source := source; | |
$!batch = batch; | |
$!degree = degree; | |
$!method = $method; | |
self | |
} | |
method new(\source, \batch, \degree, $method) { | |
batch <= 0 | |
?? X::Invalid::Value.new( | |
:$method, :name<batch>, :value(batch)).throw | |
!! degree <= 0 | |
?? X::Invalid::Value.new( | |
:$method,:name<degree>,:value(degree)).throw | |
!! self.CREATE!SET-SELF(source, batch, degree, $method) | |
} | |
# The serial iterator producing the results | |
method iterator() { | |
@!actions # we can run actions | |
?? $!method eq 'hyper' | |
?? Rakudo::Iterator.HyperActions( # we want "hyper" | |
$!source,@!actions,$!batch,$!degree) | |
!! Rakudo::Iterator.RaceActions( # we want "race" | |
$!source,@!actions,$!batch,$!degree) | |
!! $!source # sadly, nothing to do | |
} | |
# Make sure we will always run the iterator | |
method sink() { self.iterator.sink-all } | |
} | |
# Since we cannot augment a role, specifically the Iterable role, we do | |
# an augment on Seq. For now, we must therefore do a .Seq on any iterable | |
# before we can hyper / race it. | |
augment class Seq { | |
# Cannot call it hyper | |
method hijper(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) { | |
ConcurrentChain.new(self.iterator,$batch,$degree,"hyper") | |
} | |
# Cannot call it race | |
method rees(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) { | |
ConcurrentChain.new(self.iterator,$batch,$degree,"race") | |
} | |
} | |
augment class Rakudo::Iterator { | |
my $empty := IterationBuffer.CREATE; | |
# Returns a sort of hyper-iterator that can be called from different | |
# threads at the same, and which produces Pairs with an ordinal number | |
# and an IterationBuffer filled with as many elements that could be | |
# fetched from the given source iterator and the given batch size. | |
# If the source iterator is exhausted, it will keep returning | |
# IterationEnd no matter how many times .pull-buffer is called. | |
method ConcurrentBatcher(\iterator, Int:D $batch) { | |
class { | |
has $!iterator; # the source iterator | |
has $!lock; # making sure one worker runs this code | |
has int $!ordinal; # the key of the Pair we produce | |
has int $!batch; # 0 indicates we're exhausted | |
method !SET-SELF(\iterator, \batch) { | |
$!iterator := iterator; | |
$!lock := Lock.new; | |
$!ordinal = -1; | |
$!batch = batch; | |
self | |
} | |
method new(\it,\ba) { self.CREATE!SET-SELF(it,ba) } | |
method pull-buffer() { | |
nqp::if( | |
$!batch, | |
nqp::stmts( # we're still in business | |
(my $buffer := IterationBuffer.CREATE), | |
(my $iterator := $!iterator), | |
(my int $batch = $!batch), | |
(my int $found = -1), | |
$!lock.protect({ | |
nqp::stmts( # we haz the source iterator | |
nqp::until( | |
nqp::iseq_i( | |
($found = nqp::add_i($found,1)), | |
$batch | |
) || nqp::eqaddr( | |
(my $pulled := $iterator.pull-one), | |
IterationEnd | |
), | |
nqp::bindpos($buffer,$found,$pulled) | |
), | |
nqp::if( # source iterator exhausted | |
nqp::islt_i($found,$batch), | |
nqp::stmts( # only produce what we found | |
nqp::splice( | |
$buffer, | |
$empty, | |
$found, | |
nqp::sub_i($batch,$found) | |
), | |
($!batch = 0) # tell the world we're done | |
) | |
), | |
nqp::if( | |
$found, | |
Pair.new( # produce a batch | |
($!ordinal = nqp::add_i($!ordinal,1)), | |
$buffer | |
), | |
IterationEnd # we're done here | |
) | |
) | |
}) | |
), | |
IterationEnd # we were already done | |
) | |
} | |
}.new(iterator,$batch) | |
} | |
# This role provides the basis for concurrent processing of an | |
# iterator. Its .new method expects a source iterator, a list of | |
# actions to be performed (consisting of method => capture pairs), | |
# on each value obtained from the source iterator, the maximum size | |
# of the buffer to be processed inside a single worker, and the | |
# degree (aka number of workers). Classes must provide at least a | |
# pull-one and a !TWEAK method (which is expected to return self). | |
# | |
# Please note that although all sorts of parallel processing happens | |
# inside the classes that do this role, the classes are expose | |
# themselves as ordinary Iterator classes to the world. | |
role ConcurrentActionator does Iterator { | |
has $!queue; | |
has @!promises; | |
has Int $!alive; # alas, cannot be a native int :-( | |
# We use a concurrent blocking queue (the work horse of a | |
# Channel, but without any frills here). It is save to | |
# nqp::push() on it from any thread, and nqp::shift() will | |
# block until a value becomes available. Since we know the | |
# number of workers, and each worker will send an IterationEnd | |
# at the end, by counting the number of IterationEnd's seen, | |
# we know when we should stop nqp::shift()ing from the queue. | |
my class Queue is repr('ConcBlockingQueue') { } | |
method !TWEAK { ... } | |
method !SET-SELF(\source, \actions, \batch, \degree) { | |
# Create two lists: one with methods to call, and one | |
# with captures to apply, so we can easily index into | |
# what we need to do. | |
my int $todo = actions.elems; | |
my $methods := nqp::setelems(nqp::list,$todo); | |
my $captures := nqp::setelems(nqp::list,$todo); | |
for actions.kv -> $w, $action { | |
my $m := $action.key; | |
nqp::istype($m,Method) | |
?? nqp::bindpos($methods,$w,$m) | |
!! nqp::bindpos($methods,$w,List.^find_method($m.Str)); | |
nqp::bindpos($captures,$w,$action.value); | |
} | |
# Set up the batcher and the queue. | |
$!alive = degree; | |
$!queue := Queue.CREATE; | |
my $batcher := Rakudo::Iterator.ConcurrentBatcher(source,batch); | |
# Set up the promises of the workers. Not sure we would | |
# actually ever need that, but maybe we can do something | |
# with reducing operations in the future, where we would | |
# need the result. Or we can have the worker just return | |
# the number of elements processed, or other statistics. | |
for ^degree -> $w { | |
@!promises.BIND-POS($w, | |
start { | |
CATCH { .say } # not sure what to do about exceptions | |
# Ensure we *always* queue an IterationEnd for this | |
# worker when we're done, normally or exceptionally. | |
LEAVE nqp::push($!queue,IterationEnd); | |
# A dummy HLL list to be passed as a "self", with | |
# its $!reified changed for each call. | |
my $List := List.CREATE; | |
# While we have batches | |
nqp::until( | |
nqp::eqaddr( | |
(my $enroute := $batcher.pull-buffer), | |
IterationEnd | |
), | |
# For all actions to perform | |
nqp::stmts( | |
(my int $i = -1), | |
nqp::while( | |
nqp::islt_i(($i = nqp::add_i($i,1)),$todo), | |
nqp::stmts( | |
# Transplant batch into List | |
nqp::bindattr($List,List,'$!reified', | |
nqp::getattr($enroute,Pair,'$!value') | |
), | |
# Run the iterator on the List for this action | |
nqp::atpos($methods,$i)( | |
$List,|nqp::atpos($captures,$i) | |
).iterator.push-all( | |
nqp::bindattr($enroute,Pair,'$!value', | |
IterationBuffer.CREATE | |
) | |
) | |
) | |
) | |
), | |
# Queue the result of all actions | |
nqp::push($!queue,$enroute) | |
) | |
} | |
) | |
} | |
# The instantiated object | |
self | |
} | |
method new(\source, \actions, \batch, \degree) { | |
self.CREATE!SET-SELF(source,actions,batch,degree)!TWEAK | |
} | |
method sink-all(--> IterationEnd) is raw { | |
# Just eat the queue, we don't care about order in any way | |
nqp::while( | |
$!alive, | |
nqp::stmts( | |
nqp::until( | |
nqp::eqaddr(nqp::shift($!queue),IterationEnd), | |
nqp::null | |
), | |
nqp::unless( | |
--$!alive, | |
self!cleanup, | |
) | |
) | |
) | |
} | |
# Handle the promises, indicate we're done | |
method !cleanup(--> IterationEnd) { | |
.result for @!promises | |
} | |
} | |
# The "hyper" case of the ConcurrentActionator role. | |
method HyperActions(\source,\actions,\batch,\degree) { | |
class :: does ConcurrentActionator { | |
has $!slipped; # current list of values to produce | |
has $!processed; # list of processed chunks | |
has int $!offset; # ordinal number of chunk at index 0 | |
method !TWEAK() { | |
$!slipped := $empty; | |
$!processed := nqp::list; | |
self | |
} | |
method pull-one() is raw { | |
nqp::if( | |
nqp::elems($!slipped), | |
nqp::shift($!slipped), # produce from the chunk | |
nqp::if( | |
$!alive, | |
nqp::stmts( | |
nqp::if( # no chunk to produce from | |
nqp::existspos($!processed,0), | |
nqp::stmts( # next chunk is available | |
($!offset = nqp::add_i($!offset,1)), | |
($!slipped := nqp::shift($!processed)) | |
), | |
nqp::if( # next chunk not there | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::if( # a worker has expired | |
--$!alive, | |
self.pull-one, # others not, try again | |
self!cleanup, # mohican time, bye bye | |
), | |
nqp::if( # a fresh chunk | |
nqp::iseq_i( | |
$!offset, | |
(my int $ordinal = $chunk.key) | |
), | |
nqp::stmts( # in sequence chunk | |
($!offset = nqp::add_i($!offset,1)), | |
nqp::if( # lose placeholder if any | |
nqp::elems($!processed), | |
nqp::shift($!processed) | |
), | |
($!slipped := $chunk.value) | |
), | |
nqp::stmts( # out of sequence | |
nqp::bindpos( # store for later usage | |
$!processed, | |
nqp::sub_i($ordinal,$!offset), | |
$chunk.value | |
), | |
) | |
) | |
) | |
), | |
self.pull-one # rinse and repeat | |
), | |
IterationEnd | |
) | |
) | |
} | |
method push-all($target --> IterationEnd) { | |
nqp::stmts( | |
nqp::while( # produce from available | |
nqp::elems($!slipped), | |
$target.push(nqp::shift($!slipped)) | |
), | |
nqp::while( # do the other chunks | |
$!alive, | |
nqp::if( | |
nqp::existspos($!processed,0), | |
nqp::stmts( # next chunk is available | |
($!offset = nqp::add_i($!offset,1)), | |
(my $slipped := nqp::shift($!processed)), | |
nqp::while( | |
nqp::elems($slipped), | |
$target.push(nqp::shift($slipped)) | |
) | |
), | |
nqp::if( # next chunk not there | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::unless( # worker expired | |
--$!alive, | |
self!cleanup # mohican time, bye bye | |
), | |
nqp::bindpos( # out of sequence | |
$!processed, # store for later usage | |
nqp::sub_i($chunk.key,$!offset), | |
$chunk.value | |
) | |
) | |
) | |
) | |
) | |
} | |
}.new(source, actions, batch, degree) | |
} | |
# The "race" case of the ConcurrentActionator role. | |
method RaceActions(\source,\actions,\batch,\degree) { | |
class :: does ConcurrentActionator { | |
has $!slipped; | |
method !TWEAK() { | |
$!slipped := $empty; | |
self | |
} | |
method pull-one() is raw { | |
nqp::if( | |
nqp::elems($!slipped), | |
nqp::shift($!slipped), # produce from available | |
nqp::if( # no values available | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::if( # worker exhausted | |
--$!alive, | |
self.pull-one, # but still others | |
self!cleanup, # mohican time, bye bye | |
), | |
nqp::stmts( | |
($!slipped := $chunk.value), # could be empty | |
self.pull-one # so try again | |
) | |
) | |
) | |
} | |
method push-all($target --> IterationEnd) { | |
nqp::stmts( | |
nqp::while( # produce from available | |
nqp::elems($!slipped), | |
$target.push(nqp::shift($!slipped)) | |
), | |
nqp::while( # do the other chunks | |
$!alive, | |
nqp::until( # still in business | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::stmts( # we have a chunk | |
(my $slipped := $chunk.value), | |
nqp::while( # push the chunk | |
nqp::elems($slipped), | |
$target.push(nqp::shift($slipped)) | |
) | |
) | |
), | |
nqp::unless( | |
--$!alive, | |
self!cleanup, # mohican time, bye bye | |
) | |
) | |
) | |
} | |
}.new(source, actions, batch, degree) | |
} | |
} | |
my @a = ^106; | |
my $now = now; | |
dd @a.Seq.hijper(:10batch).map({ sleep rand / 1000; $_++ }).grep({ $_ %% 2 }); | |
say "parallel processed in {now - $now}"; | |
$now = now; | |
dd @a.map( { sleep rand / 1000; $_++ } ).grep: { $_ %% 2 }; | |
say "serial processed in {now - $now}"; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
[23:01:11] lizmat: Hm, in the initial prototyping I did I had the thread that we invoke the .hyper/.race on play coordinator; it would then .push-exactly elements into the batch buffers and then hand them off to workers. That approach avoided the need to lock on the source iterator, which seemed a bit simpler
[23:08:08] lizmat: Will have to dig further through your gist tomorrow when I can think clearer.
Perhaps I should explain my thoughts:
What I basically developed here is an iterator that takes an iterator, then parallelizes it, does something to each element of the iterator, and then produces a result: in the case of race, as they come in. In the case of hyper, it will collect results until the appropriate result set arrives and then starts producing those. So, from the outside, it looks like a normal iterator, while inside it parallelizes.
The only communication from unthreaded to threaded are by the lock protected use of the source iterator. The reason for this approach is that it is simple, and that a faster worker will do more work without need for having a coordinator. You could argue that the protect lock handler is the coordinator, but at a very low level. The disadvantage of a HLL coordinator is that a worker needs to inform the coordinator that they're done, which would involve another set of lock / semaphores, no?
The only communication from threaded to unthreaded is through a Queue (repr('ConcBlockingQueue')). Each worker must generate an IterationEnd value in the queue when it is done. Since the end-unthreaded part knows how many workers there are, it knows how many IterationEnds it should see until it knows nothing will be coming anymore. So looking for new values is as simple as doing an nqp::shift on the Queue: it will block until there is a value available. Again, use the low level blocking feature and keep it away from HLL as far as possible. The only proviso is that a worker must always generate an IterationEnd when it's done. So that's done in a LEAVE phaser.
Providing back pressure could be as simple as a "shaped" Queue if you will: aka a Queue of a fixed size on which an nqp::push will block if it has reached maximum length.
Tagging the result of each worker with an ordinal number, allows for easy reconstruction at the unthreaded end. Since the result of each worker is already in order, we just need to process the received results in ordinal number order, and then in internal order.