Skip to content

Instantly share code, notes, and snippets.

@lizmat
Last active March 22, 2017 23:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lizmat/1c80d2210aeb4239810d7818fca7e575 to your computer and use it in GitHub Desktop.
Save lizmat/1c80d2210aeb4239810d7818fca7e575 to your computer and use it in GitHub Desktop.
Prototype Rakudo::Iterator.(Hyper|Race)Actions
use MONKEY;
class ConcurrentChain does Iterable { # perhaps this should be HyperSeq ?
has $!source; # the source iterator
has int $!batch; # batch size
has int $!degree; # degree size (number of workers)
has str $!method; # name of method: "hyper" or "race"
has @!actions; # Pairs of actions: Method => Capture
# Need to special case "map", to handle "for {}", which is code-genned
# as a .map, but with a :item named parameter specified
my $map-method := List.^find_method("map");
method map(ConcurrentChain:D: :$item, |c) {
if defined($item) {
Seq.new(self.iterator).map(:$item, |c)
}
elsif c.AT-POS(0).has-phaser("LAST") {
Seq.new(self.iterator).map(|c) # must use serial version
}
else { # concurrent, save as action to do
@!actions.push(Pair.new($map-method,c));
self
}
}
# Need to special case "grep" in case the block has a LAST phaser.
my $grep-method := List.^find_method("grep");
method grep(ConcurrentChain:D: |c) {
if c.AT-POS(0).has-phaser("LAST") {
Seq.new(self.iterator).grep(|c) # must use serial version
}
else { # concurrent, save as action to do
@!actions.push(Pair.new($grep-method,c));
self
}
}
# concurrent methods that should just save action for later
# BEGIN for <grep> {
# my $call-method := List.^find_method($_);
# my $chain-method := method (ConcurrentChain:D: |c) {
# @!actions.push(Pair.new($call-method,c));
# self
# }
# $chain-method.set_name($_);
# ::?CLASS.^add_method($_,$chain-method)
# }
# serial methods
BEGIN for <gist keys kv perl repeated squish Str unique values> {
my $call-method := List.^find_method($_);
my $chain-method := method (ConcurrentChain:D: |c) {
self.iterator.push-all(my $buffer := IterationBuffer.CREATE);
$call-method(
nqp::p6bindattrinvres(nqp::create(List),List,'$!reified',$buffer)
)
}
$chain-method.set_name($_);
::?CLASS.^add_method($_,$chain-method)
}
method !SET-SELF(\source, \batch, \degree, $method) {
$!source := source;
$!batch = batch;
$!degree = degree;
$!method = $method;
self
}
method new(\source, \batch, \degree, $method) {
batch <= 0
?? X::Invalid::Value.new(
:$method, :name<batch>, :value(batch)).throw
!! degree <= 0
?? X::Invalid::Value.new(
:$method,:name<degree>,:value(degree)).throw
!! self.CREATE!SET-SELF(source, batch, degree, $method)
}
# The serial iterator producing the results
method iterator() {
@!actions # we can run actions
?? $!method eq 'hyper'
?? Rakudo::Iterator.HyperActions( # we want "hyper"
$!source,@!actions,$!batch,$!degree)
!! Rakudo::Iterator.RaceActions( # we want "race"
$!source,@!actions,$!batch,$!degree)
!! $!source # sadly, nothing to do
}
# Make sure we will always run the iterator
method sink() { self.iterator.sink-all }
}
# Since we cannot augment a role, specifically the Iterable role, we do
# an augment on Seq. For now, we must therefore do a .Seq on any iterable
# before we can hyper / race it.
augment class Seq {
# Cannot call it hyper
method hijper(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) {
ConcurrentChain.new(self.iterator,$batch,$degree,"hyper")
}
# Cannot call it race
method rees(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) {
ConcurrentChain.new(self.iterator,$batch,$degree,"race")
}
}
augment class Rakudo::Iterator {
my $empty := IterationBuffer.CREATE;
# Returns a sort of hyper-iterator that can be called from different
# threads at the same, and which produces Pairs with an ordinal number
# and an IterationBuffer filled with as many elements that could be
# fetched from the given source iterator and the given batch size.
# If the source iterator is exhausted, it will keep returning
# IterationEnd no matter how many times .pull-buffer is called.
method ConcurrentBatcher(\iterator, Int:D $batch) {
class {
has $!iterator; # the source iterator
has $!lock; # making sure one worker runs this code
has int $!ordinal; # the key of the Pair we produce
has int $!batch; # 0 indicates we're exhausted
method !SET-SELF(\iterator, \batch) {
$!iterator := iterator;
$!lock := Lock.new;
$!ordinal = -1;
$!batch = batch;
self
}
method new(\it,\ba) { self.CREATE!SET-SELF(it,ba) }
method pull-buffer() {
nqp::if(
$!batch,
nqp::stmts( # we're still in business
(my $buffer := IterationBuffer.CREATE),
(my $iterator := $!iterator),
(my int $batch = $!batch),
(my int $found = -1),
$!lock.protect({
nqp::stmts( # we haz the source iterator
nqp::until(
nqp::iseq_i(
($found = nqp::add_i($found,1)),
$batch
) || nqp::eqaddr(
(my $pulled := $iterator.pull-one),
IterationEnd
),
nqp::bindpos($buffer,$found,$pulled)
),
nqp::if( # source iterator exhausted
nqp::islt_i($found,$batch),
nqp::stmts( # only produce what we found
nqp::splice(
$buffer,
$empty,
$found,
nqp::sub_i($batch,$found)
),
($!batch = 0) # tell the world we're done
)
),
nqp::if(
$found,
Pair.new( # produce a batch
($!ordinal = nqp::add_i($!ordinal,1)),
$buffer
),
IterationEnd # we're done here
)
)
})
),
IterationEnd # we were already done
)
}
}.new(iterator,$batch)
}
# This role provides the basis for concurrent processing of an
# iterator. Its .new method expects a source iterator, a list of
# actions to be performed (consisting of method => capture pairs),
# on each value obtained from the source iterator, the maximum size
# of the buffer to be processed inside a single worker, and the
# degree (aka number of workers). Classes must provide at least a
# pull-one and a !TWEAK method (which is expected to return self).
#
# Please note that although all sorts of parallel processing happens
# inside the classes that do this role, the classes are expose
# themselves as ordinary Iterator classes to the world.
role ConcurrentActionator does Iterator {
has $!queue;
has @!promises;
has Int $!alive; # alas, cannot be a native int :-(
# We use a concurrent blocking queue (the work horse of a
# Channel, but without any frills here). It is save to
# nqp::push() on it from any thread, and nqp::shift() will
# block until a value becomes available. Since we know the
# number of workers, and each worker will send an IterationEnd
# at the end, by counting the number of IterationEnd's seen,
# we know when we should stop nqp::shift()ing from the queue.
my class Queue is repr('ConcBlockingQueue') { }
method !TWEAK { ... }
method !SET-SELF(\source, \actions, \batch, \degree) {
# Create two lists: one with methods to call, and one
# with captures to apply, so we can easily index into
# what we need to do.
my int $todo = actions.elems;
my $methods := nqp::setelems(nqp::list,$todo);
my $captures := nqp::setelems(nqp::list,$todo);
for actions.kv -> $w, $action {
my $m := $action.key;
nqp::istype($m,Method)
?? nqp::bindpos($methods,$w,$m)
!! nqp::bindpos($methods,$w,List.^find_method($m.Str));
nqp::bindpos($captures,$w,$action.value);
}
# Set up the batcher and the queue.
$!alive = degree;
$!queue := Queue.CREATE;
my $batcher := Rakudo::Iterator.ConcurrentBatcher(source,batch);
# Set up the promises of the workers. Not sure we would
# actually ever need that, but maybe we can do something
# with reducing operations in the future, where we would
# need the result. Or we can have the worker just return
# the number of elements processed, or other statistics.
for ^degree -> $w {
@!promises.BIND-POS($w,
start {
CATCH { .say } # not sure what to do about exceptions
# Ensure we *always* queue an IterationEnd for this
# worker when we're done, normally or exceptionally.
LEAVE nqp::push($!queue,IterationEnd);
# A dummy HLL list to be passed as a "self", with
# its $!reified changed for each call.
my $List := List.CREATE;
# While we have batches
nqp::until(
nqp::eqaddr(
(my $enroute := $batcher.pull-buffer),
IterationEnd
),
# For all actions to perform
nqp::stmts(
(my int $i = -1),
nqp::while(
nqp::islt_i(($i = nqp::add_i($i,1)),$todo),
nqp::stmts(
# Transplant batch into List
nqp::bindattr($List,List,'$!reified',
nqp::getattr($enroute,Pair,'$!value')
),
# Run the iterator on the List for this action
nqp::atpos($methods,$i)(
$List,|nqp::atpos($captures,$i)
).iterator.push-all(
nqp::bindattr($enroute,Pair,'$!value',
IterationBuffer.CREATE
)
)
)
)
),
# Queue the result of all actions
nqp::push($!queue,$enroute)
)
}
)
}
# The instantiated object
self
}
method new(\source, \actions, \batch, \degree) {
self.CREATE!SET-SELF(source,actions,batch,degree)!TWEAK
}
method sink-all(--> IterationEnd) is raw {
# Just eat the queue, we don't care about order in any way
nqp::while(
$!alive,
nqp::stmts(
nqp::until(
nqp::eqaddr(nqp::shift($!queue),IterationEnd),
nqp::null
),
nqp::unless(
--$!alive,
self!cleanup,
)
)
)
}
# Handle the promises, indicate we're done
method !cleanup(--> IterationEnd) {
.result for @!promises
}
}
# The "hyper" case of the ConcurrentActionator role.
method HyperActions(\source,\actions,\batch,\degree) {
class :: does ConcurrentActionator {
has $!slipped; # current list of values to produce
has $!processed; # list of processed chunks
has int $!offset; # ordinal number of chunk at index 0
method !TWEAK() {
$!slipped := $empty;
$!processed := nqp::list;
self
}
method pull-one() is raw {
nqp::if(
nqp::elems($!slipped),
nqp::shift($!slipped), # produce from the chunk
nqp::if(
$!alive,
nqp::stmts(
nqp::if( # no chunk to produce from
nqp::existspos($!processed,0),
nqp::stmts( # next chunk is available
($!offset = nqp::add_i($!offset,1)),
($!slipped := nqp::shift($!processed))
),
nqp::if( # next chunk not there
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::if( # a worker has expired
--$!alive,
self.pull-one, # others not, try again
self!cleanup, # mohican time, bye bye
),
nqp::if( # a fresh chunk
nqp::iseq_i(
$!offset,
(my int $ordinal = $chunk.key)
),
nqp::stmts( # in sequence chunk
($!offset = nqp::add_i($!offset,1)),
nqp::if( # lose placeholder if any
nqp::elems($!processed),
nqp::shift($!processed)
),
($!slipped := $chunk.value)
),
nqp::stmts( # out of sequence
nqp::bindpos( # store for later usage
$!processed,
nqp::sub_i($ordinal,$!offset),
$chunk.value
),
)
)
)
),
self.pull-one # rinse and repeat
),
IterationEnd
)
)
}
method push-all($target --> IterationEnd) {
nqp::stmts(
nqp::while( # produce from available
nqp::elems($!slipped),
$target.push(nqp::shift($!slipped))
),
nqp::while( # do the other chunks
$!alive,
nqp::if(
nqp::existspos($!processed,0),
nqp::stmts( # next chunk is available
($!offset = nqp::add_i($!offset,1)),
(my $slipped := nqp::shift($!processed)),
nqp::while(
nqp::elems($slipped),
$target.push(nqp::shift($slipped))
)
),
nqp::if( # next chunk not there
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::unless( # worker expired
--$!alive,
self!cleanup # mohican time, bye bye
),
nqp::bindpos( # out of sequence
$!processed, # store for later usage
nqp::sub_i($chunk.key,$!offset),
$chunk.value
)
)
)
)
)
}
}.new(source, actions, batch, degree)
}
# The "race" case of the ConcurrentActionator role.
method RaceActions(\source,\actions,\batch,\degree) {
class :: does ConcurrentActionator {
has $!slipped;
method !TWEAK() {
$!slipped := $empty;
self
}
method pull-one() is raw {
nqp::if(
nqp::elems($!slipped),
nqp::shift($!slipped), # produce from available
nqp::if( # no values available
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::if( # worker exhausted
--$!alive,
self.pull-one, # but still others
self!cleanup, # mohican time, bye bye
),
nqp::stmts(
($!slipped := $chunk.value), # could be empty
self.pull-one # so try again
)
)
)
}
method push-all($target --> IterationEnd) {
nqp::stmts(
nqp::while( # produce from available
nqp::elems($!slipped),
$target.push(nqp::shift($!slipped))
),
nqp::while( # do the other chunks
$!alive,
nqp::until( # still in business
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::stmts( # we have a chunk
(my $slipped := $chunk.value),
nqp::while( # push the chunk
nqp::elems($slipped),
$target.push(nqp::shift($slipped))
)
)
),
nqp::unless(
--$!alive,
self!cleanup, # mohican time, bye bye
)
)
)
}
}.new(source, actions, batch, degree)
}
}
my @a = ^106;
my $now = now;
dd @a.Seq.hijper(:10batch).map({ sleep rand / 1000; $_++ }).grep({ $_ %% 2 });
say "parallel processed in {now - $now}";
$now = now;
dd @a.map( { sleep rand / 1000; $_++ } ).grep: { $_ %% 2 };
say "serial processed in {now - $now}";
@lizmat
Copy link
Author

lizmat commented Mar 22, 2017

[23:01:11] lizmat: Hm, in the initial prototyping I did I had the thread that we invoke the .hyper/.race on play coordinator; it would then .push-exactly elements into the batch buffers and then hand them off to workers. That approach avoided the need to lock on the source iterator, which seemed a bit simpler
[23:08:08] lizmat: Will have to dig further through your gist tomorrow when I can think clearer.

Perhaps I should explain my thoughts:

  1. What I basically developed here is an iterator that takes an iterator, then parallelizes it, does something to each element of the iterator, and then produces a result: in the case of race, as they come in. In the case of hyper, it will collect results until the appropriate result set arrives and then starts producing those. So, from the outside, it looks like a normal iterator, while inside it parallelizes.

  2. The only communication from unthreaded to threaded are by the lock protected use of the source iterator. The reason for this approach is that it is simple, and that a faster worker will do more work without need for having a coordinator. You could argue that the protect lock handler is the coordinator, but at a very low level. The disadvantage of a HLL coordinator is that a worker needs to inform the coordinator that they're done, which would involve another set of lock / semaphores, no?

  3. The only communication from threaded to unthreaded is through a Queue (repr('ConcBlockingQueue')). Each worker must generate an IterationEnd value in the queue when it is done. Since the end-unthreaded part knows how many workers there are, it knows how many IterationEnds it should see until it knows nothing will be coming anymore. So looking for new values is as simple as doing an nqp::shift on the Queue: it will block until there is a value available. Again, use the low level blocking feature and keep it away from HLL as far as possible. The only proviso is that a worker must always generate an IterationEnd when it's done. So that's done in a LEAVE phaser.

  4. Providing back pressure could be as simple as a "shaped" Queue if you will: aka a Queue of a fixed size on which an nqp::push will block if it has reached maximum length.

  5. Tagging the result of each worker with an ordinal number, allows for easy reconstruction at the unthreaded end. Since the result of each worker is already in order, we just need to process the received results in ordinal number order, and then in internal order.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment