Created
March 22, 2017 16:30
-
-
Save lizmat/6256f19136990b071ecc6695e7f5b5f2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use MONKEY; | |
class ConcurrentChain { # perhaps this should be HyperSeq ? | |
has $!source; | |
has int $!batch; | |
has int $!workers; | |
has str $!method; | |
has $!actions; | |
my $method-lock := Lock.new; | |
my $method-hash := nqp::hash; | |
# listy methods | |
BEGIN for <grep map squish repeated unique> { | |
my $call-method := List.^find_method($_); | |
my $chain-method := method (ConcurrentChain:D: |c) { | |
nqp::stmts( | |
nqp::push( | |
nqp::getattr(self,ConcurrentChain,'$!actions'), | |
Pair.new($call-method,c) | |
), | |
self | |
) | |
} | |
$chain-method.set_name($_); | |
::?CLASS.^add_method($_,$chain-method) | |
} | |
# stringy methods | |
BEGIN for <gist perl Str> { | |
my $call-method := Seq.^find_method($_); | |
my $chain-method := method (ConcurrentChain:D: |c) { | |
$call-method(Seq.new(self.iterator)) | |
} | |
$chain-method.set_name($_); | |
::?CLASS.^add_method($_,$chain-method) | |
} | |
method !SET-SELF(\source, \batch, \workers, $method) { | |
nqp::stmts( | |
($!source := source), | |
($!batch = batch), | |
($!workers = workers), | |
($!method = $method), | |
($!actions := nqp::create(IterationBuffer)), | |
self | |
) | |
} | |
method new(\source, \batch, \workers, $method) { | |
nqp::if( | |
nqp::isle_i(batch,0), | |
X::Invalid::Value.new( | |
:$method, :name<batch>, :value(batch) | |
).throw, | |
nqp::if( | |
nqp::isle_i(workers,0), | |
X::Invalid::Value.new( | |
:$method, :name<degree>, :value(workers) | |
).throw, | |
nqp::create(self)!SET-SELF(source, batch, workers, $method) | |
) | |
) | |
} | |
method FALLBACK(Str:D $name, |params) { | |
nqp::stmts( | |
nqp::push( | |
$!actions, | |
Pair.new( | |
$method-lock.protect({ | |
nqp::if( | |
nqp::existskey($method-hash,$name), | |
nqp::atkey($method-hash,$name), | |
nqp::if( | |
(my $found := List.^find_method($name)), | |
nqp::bindkey($method-hash,$name,$found), | |
X::Method::NotFound.new( | |
invocant => List, | |
method => $name, | |
typename => 'List' | |
).throw | |
) | |
) | |
}), | |
params | |
) | |
), | |
self | |
) | |
} | |
method iterator() { | |
nqp::if( | |
nqp::iseq_s($!method,'hyper'), | |
Rakudo::Iterator.HyperActions( | |
$!source,$!actions,$!batch,$!workers | |
), | |
Rakudo::Iterator.RaceActions( | |
$!source,$!actions,$!batch,$!workers | |
) | |
) | |
} | |
method sink() { self.iterator.sink-all } | |
} | |
augment class Seq { | |
method hijper(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) { | |
ConcurrentChain.new(self.iterator,$batch,$degree,"hyper") | |
} | |
method rees(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) { | |
ConcurrentChain.new(self.iterator,$batch,$degree,"race") | |
} | |
} | |
augment class Rakudo::Iterator { | |
my $empty := nqp::create(IterationBuffer); | |
# Returns a sort of hyper-iterator that can be called from different | |
# threads at the same, and which produces Pairs with an ordinal number | |
# and an IterationBuffer filled with as many elements that could be | |
# fetched from the given source iterator and the given batch size. | |
# If the source iterator is exhausted, it will keep returning | |
# IterationEnd no matter how many times .pull-buffer is called. | |
method ConcurrentBatcher(\iterator, Int:D $batch) { | |
class { | |
has $!iterator; # the source iterator | |
has $!lock; # making sure one worker runs this code | |
has int $!ordinal; # the key of the Pair we produce | |
has int $!batch; # 0 indicates we're exhausted | |
method !SET-SELF(\iterator, \batch) { | |
nqp::stmts( | |
($!iterator := iterator), | |
($!lock := Lock.new), | |
($!ordinal = -1), | |
($!batch = batch), | |
self | |
) | |
} | |
method new(\it,\ba) { nqp::create(self)!SET-SELF(it,ba) } | |
method pull-buffer() { | |
nqp::if( | |
$!batch, | |
nqp::stmts( # we're still in business | |
(my $buffer := nqp::create(IterationBuffer)), | |
(my $iterator := $!iterator), | |
(my int $batch = $!batch), | |
(my int $found = -1), | |
$!lock.protect({ | |
nqp::stmts( # we haz the source iterator | |
nqp::until( | |
nqp::iseq_i( | |
($found = nqp::add_i($found,1)), | |
$batch | |
) || nqp::eqaddr( | |
(my $pulled := $iterator.pull-one), | |
IterationEnd | |
), | |
nqp::bindpos($buffer,$found,$pulled) | |
), | |
nqp::if( # source iterator exhausted | |
nqp::islt_i($found,$batch), | |
nqp::stmts( # only produce what we found | |
nqp::splice( | |
$buffer, | |
$empty, | |
$found, | |
nqp::sub_i($batch,$found) | |
), | |
($!batch = 0) # tell the world we're done | |
) | |
), | |
nqp::if( | |
$found, | |
Pair.new( # produce a batch | |
($!ordinal = nqp::add_i($!ordinal,1)), | |
$buffer | |
), | |
IterationEnd # we're done here | |
) | |
) | |
}) | |
), | |
IterationEnd # we were already done | |
) | |
} | |
}.new(iterator,$batch) | |
} | |
# This role provides the basis for concurrent processing of an | |
# iterator. Its .new method expects a source iterator, a list of | |
# actions to be performed (consisting of method => capture pairs), | |
# on each value obtained from the source iterator, the number of | |
# workers and the maximum size of the buffer to be processed inside | |
# a single worker. Classes must provide at least a pull-one and a | |
# !TWEAK method (which is expected to return self). | |
role ConcurrentActionator does Iterator { | |
has $!queue; | |
has $!promises; | |
has Int $!alive; # alas, cannot be a native int :-( | |
my class Queue is repr('ConcBlockingQueue') { } | |
method !TWEAK { ... } | |
method !SET-SELF(\source, \actions, int $batch, int $workers) { | |
nqp::stmts( | |
($!queue := my $queue := nqp::create(Queue)), | |
($!promises := nqp::setelems(nqp::list,$!alive = $workers)), | |
(my $actions := nqp::if( # lowlevel the actions list | |
nqp::istype(actions,List), | |
nqp::getattr(actions,List,'$!reified'), | |
actions) | |
), | |
(my int $todo = nqp::elems($actions)), | |
(my $methods := nqp::setelems(nqp::list,$todo)), | |
(my $captures := nqp::setelems(nqp::list,$todo)), | |
(my int $w = -1), | |
nqp::while( | |
nqp::islt_i(($w = nqp::add_i($w,1)),$todo), | |
nqp::stmts( | |
(my $m := nqp::getattr(nqp::atpos($actions,$w),Pair,'$!key')), | |
nqp::if( | |
nqp::istype($m,Method), | |
nqp::bindpos($methods,$w,$m), | |
nqp::bindpos($methods,$w,List.^find_method($m.Str)) | |
), | |
nqp::bindpos($captures,$w, | |
nqp::getattr(nqp::atpos($actions,$w),Pair,'$!value') | |
) | |
) | |
), | |
(my $batcher := | |
Rakudo::Iterator.ConcurrentBatcher(source,$batch)), | |
($w = -1), | |
nqp::while( | |
nqp::islt_i(($w = nqp::add_i($w,1)),$workers), | |
nqp::bindpos($!promises,$w, | |
start { | |
CATCH { .say } | |
LEAVE nqp::push($queue,IterationEnd); | |
nqp::stmts( | |
(my $List := nqp::create(List)), | |
nqp::until( | |
nqp::eqaddr( | |
(my $enroute := $batcher.pull-buffer), | |
IterationEnd | |
), | |
nqp::stmts( | |
(my int $i = -1), | |
nqp::while( | |
nqp::islt_i(($i = nqp::add_i($i,1)),$todo), | |
nqp::stmts( | |
nqp::bindattr($List,List,'$!reified', | |
nqp::getattr($enroute,Pair,'$!value') | |
), | |
nqp::atpos($methods,$i)( | |
$List,|nqp::atpos($captures,$i) | |
).iterator.push-all( | |
nqp::bindattr($enroute,Pair,'$!value', | |
nqp::create(IterationBuffer) | |
) | |
) | |
) | |
) | |
), | |
nqp::push($queue,$enroute) | |
) | |
) | |
} | |
) | |
), | |
self | |
) | |
} | |
method new(\iterator, \actions, \batch, \workers) { | |
nqp::create(self)!SET-SELF(iterator,actions,batch,workers)!TWEAK | |
} | |
method sink-all(--> IterationEnd) is raw { | |
nqp::while( | |
$!alive, | |
nqp::stmts( | |
nqp::until( | |
nqp::eqaddr(nqp::shift($!queue),IterationEnd), | |
nqp::null | |
), | |
nqp::unless( | |
--$!alive, | |
self!cleanup, | |
) | |
) | |
) | |
} | |
method !cleanup(--> IterationEnd) { | |
nqp::while( | |
nqp::elems($!promises), | |
nqp::shift($!promises).result | |
) | |
} | |
} | |
method HyperActions(\iterator,\actions,\workers,\batch) { | |
class :: does ConcurrentActionator { | |
has $!slipped; # current list of values to produce | |
has $!processed; # list of processed chunks | |
has int $!offset; # ordinal number of chunk at index 0 | |
method !TWEAK() { | |
nqp::stmts( | |
($!slipped := $empty), | |
($!processed := nqp::list), | |
self | |
) | |
} | |
method pull-one() is raw { | |
nqp::if( | |
nqp::elems($!slipped), | |
nqp::shift($!slipped), # produce from the chunk | |
nqp::stmts( | |
nqp::if( # no chunk to produce from | |
nqp::existspos($!processed,0), | |
nqp::stmts( # next chunk is available | |
($!offset = nqp::add_i($!offset,1)), | |
($!slipped := nqp::shift($!processed)) | |
), | |
nqp::if( # next chunk not there | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::if( # a worker has expired | |
--$!alive, | |
self.pull-one, # others not, try again | |
self!cleanup, # mohican time, bye bye | |
), | |
nqp::if( # a fresh chunk | |
nqp::iseq_i($!offset,(my int $ordinal = $chunk.key)), | |
nqp::stmts( # in sequence chunk | |
($!offset = nqp::add_i($!offset,1)), | |
nqp::if( # lose placeholder if any | |
nqp::elems($!processed), | |
nqp::shift($!processed) | |
), | |
($!slipped := $chunk.value) | |
), | |
nqp::stmts( # out of sequence | |
nqp::bindpos( # store for later usage | |
$!processed, | |
nqp::sub_i($ordinal,$!offset), | |
$chunk.value | |
), | |
) | |
) | |
) | |
), | |
self.pull-one # rinse and repeat | |
) | |
) | |
} | |
method push-all($target --> IterationEnd) { | |
nqp::stmts( | |
nqp::while( # produce from available | |
nqp::elems($!slipped), | |
$target.push(nqp::shift($!slipped)) | |
), | |
nqp::while( # do the other chunks | |
$!alive, | |
nqp::if( | |
nqp::existspos($!processed,0), | |
nqp::stmts( # next chunk is available | |
($!offset = nqp::add_i($!offset,1)), | |
(my $slipped := nqp::shift($!processed)), | |
nqp::while( | |
nqp::elems($slipped), | |
$target.push(nqp::shift($slipped)) | |
) | |
), | |
nqp::if( # next chunk not there | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::unless( # worker expired | |
--$!alive, | |
self!cleanup # mohican time, bye bye | |
), | |
nqp::bindpos( # store for later usage | |
$!processed, | |
nqp::sub_i($chunk.key,$!offset), | |
$chunk.value | |
) | |
) | |
) | |
) | |
) | |
} | |
}.new(iterator, actions, batch, workers) | |
} | |
method RaceActions(\iterator,\actions,\batch,\workers) { | |
class :: does ConcurrentActionator { | |
has $!slipped; | |
method !TWEAK() { | |
nqp::stmts( | |
($!slipped := $empty), | |
self | |
) | |
} | |
method pull-one() is raw { | |
nqp::if( | |
nqp::elems($!slipped), | |
nqp::shift($!slipped), # produce from available | |
nqp::if( # no values available | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::if( # worker exhausted | |
--$!alive, | |
self.pull-one, # but still others | |
self!cleanup, # mohican time, bye bye | |
), | |
nqp::stmts( | |
($!slipped := $chunk.value), # could be empty | |
self.pull-one # so try again | |
) | |
) | |
) | |
} | |
method push-all($target --> IterationEnd) { | |
nqp::stmts( | |
nqp::while( # produce from available | |
nqp::elems($!slipped), | |
$target.push(nqp::shift($!slipped)) | |
), | |
nqp::while( # do the other chunks | |
$!alive, | |
nqp::until( | |
nqp::eqaddr( | |
(my $chunk := nqp::shift($!queue)), | |
IterationEnd | |
), | |
nqp::stmts( | |
(my $slipped := $chunk.value), | |
nqp::while( | |
nqp::elems($slipped), | |
$target.push(nqp::shift($slipped)) | |
) | |
) | |
), | |
nqp::unless( | |
--$!alive, | |
self!cleanup, | |
) | |
) | |
) | |
} | |
}.new(iterator, actions, batch, workers) | |
} | |
} | |
my @a = ^106; | |
my $now = now; | |
dd @a.Seq.hijper(:10batch).map({ sleep rand / 1000; $_++ }).grep({ $_ %% 2 }); | |
say "parallel processed in {now - $now}"; | |
dd @a; | |
$now = now; | |
dd @a.map( { sleep rand / 1000; $_++ } ).grep: { $_ %% 2 }; | |
say "serial processed in {now - $now}"; | |
#================= | |
(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104).Seq | |
parallel processed in 0.01360739 | |
Array @a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106] | |
(2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106).Seq | |
serial processed in 0.0882528 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment