Skip to content

Instantly share code, notes, and snippets.

@lizmat
Created March 22, 2017 16:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lizmat/6256f19136990b071ecc6695e7f5b5f2 to your computer and use it in GitHub Desktop.
Save lizmat/6256f19136990b071ecc6695e7f5b5f2 to your computer and use it in GitHub Desktop.
use MONKEY;
class ConcurrentChain { # perhaps this should be HyperSeq ?
has $!source;
has int $!batch;
has int $!workers;
has str $!method;
has $!actions;
my $method-lock := Lock.new;
my $method-hash := nqp::hash;
# listy methods
BEGIN for <grep map squish repeated unique> {
my $call-method := List.^find_method($_);
my $chain-method := method (ConcurrentChain:D: |c) {
nqp::stmts(
nqp::push(
nqp::getattr(self,ConcurrentChain,'$!actions'),
Pair.new($call-method,c)
),
self
)
}
$chain-method.set_name($_);
::?CLASS.^add_method($_,$chain-method)
}
# stringy methods
BEGIN for <gist perl Str> {
my $call-method := Seq.^find_method($_);
my $chain-method := method (ConcurrentChain:D: |c) {
$call-method(Seq.new(self.iterator))
}
$chain-method.set_name($_);
::?CLASS.^add_method($_,$chain-method)
}
method !SET-SELF(\source, \batch, \workers, $method) {
nqp::stmts(
($!source := source),
($!batch = batch),
($!workers = workers),
($!method = $method),
($!actions := nqp::create(IterationBuffer)),
self
)
}
method new(\source, \batch, \workers, $method) {
nqp::if(
nqp::isle_i(batch,0),
X::Invalid::Value.new(
:$method, :name<batch>, :value(batch)
).throw,
nqp::if(
nqp::isle_i(workers,0),
X::Invalid::Value.new(
:$method, :name<degree>, :value(workers)
).throw,
nqp::create(self)!SET-SELF(source, batch, workers, $method)
)
)
}
method FALLBACK(Str:D $name, |params) {
nqp::stmts(
nqp::push(
$!actions,
Pair.new(
$method-lock.protect({
nqp::if(
nqp::existskey($method-hash,$name),
nqp::atkey($method-hash,$name),
nqp::if(
(my $found := List.^find_method($name)),
nqp::bindkey($method-hash,$name,$found),
X::Method::NotFound.new(
invocant => List,
method => $name,
typename => 'List'
).throw
)
)
}),
params
)
),
self
)
}
method iterator() {
nqp::if(
nqp::iseq_s($!method,'hyper'),
Rakudo::Iterator.HyperActions(
$!source,$!actions,$!batch,$!workers
),
Rakudo::Iterator.RaceActions(
$!source,$!actions,$!batch,$!workers
)
)
}
method sink() { self.iterator.sink-all }
}
augment class Seq {
method hijper(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) {
ConcurrentChain.new(self.iterator,$batch,$degree,"hyper")
}
method rees(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) {
ConcurrentChain.new(self.iterator,$batch,$degree,"race")
}
}
augment class Rakudo::Iterator {
my $empty := nqp::create(IterationBuffer);
# Returns a sort of hyper-iterator that can be called from different
# threads at the same, and which produces Pairs with an ordinal number
# and an IterationBuffer filled with as many elements that could be
# fetched from the given source iterator and the given batch size.
# If the source iterator is exhausted, it will keep returning
# IterationEnd no matter how many times .pull-buffer is called.
method ConcurrentBatcher(\iterator, Int:D $batch) {
class {
has $!iterator; # the source iterator
has $!lock; # making sure one worker runs this code
has int $!ordinal; # the key of the Pair we produce
has int $!batch; # 0 indicates we're exhausted
method !SET-SELF(\iterator, \batch) {
nqp::stmts(
($!iterator := iterator),
($!lock := Lock.new),
($!ordinal = -1),
($!batch = batch),
self
)
}
method new(\it,\ba) { nqp::create(self)!SET-SELF(it,ba) }
method pull-buffer() {
nqp::if(
$!batch,
nqp::stmts( # we're still in business
(my $buffer := nqp::create(IterationBuffer)),
(my $iterator := $!iterator),
(my int $batch = $!batch),
(my int $found = -1),
$!lock.protect({
nqp::stmts( # we haz the source iterator
nqp::until(
nqp::iseq_i(
($found = nqp::add_i($found,1)),
$batch
) || nqp::eqaddr(
(my $pulled := $iterator.pull-one),
IterationEnd
),
nqp::bindpos($buffer,$found,$pulled)
),
nqp::if( # source iterator exhausted
nqp::islt_i($found,$batch),
nqp::stmts( # only produce what we found
nqp::splice(
$buffer,
$empty,
$found,
nqp::sub_i($batch,$found)
),
($!batch = 0) # tell the world we're done
)
),
nqp::if(
$found,
Pair.new( # produce a batch
($!ordinal = nqp::add_i($!ordinal,1)),
$buffer
),
IterationEnd # we're done here
)
)
})
),
IterationEnd # we were already done
)
}
}.new(iterator,$batch)
}
# This role provides the basis for concurrent processing of an
# iterator. Its .new method expects a source iterator, a list of
# actions to be performed (consisting of method => capture pairs),
# on each value obtained from the source iterator, the number of
# workers and the maximum size of the buffer to be processed inside
# a single worker. Classes must provide at least a pull-one and a
# !TWEAK method (which is expected to return self).
role ConcurrentActionator does Iterator {
has $!queue;
has $!promises;
has Int $!alive; # alas, cannot be a native int :-(
my class Queue is repr('ConcBlockingQueue') { }
method !TWEAK { ... }
method !SET-SELF(\source, \actions, int $batch, int $workers) {
nqp::stmts(
($!queue := my $queue := nqp::create(Queue)),
($!promises := nqp::setelems(nqp::list,$!alive = $workers)),
(my $actions := nqp::if( # lowlevel the actions list
nqp::istype(actions,List),
nqp::getattr(actions,List,'$!reified'),
actions)
),
(my int $todo = nqp::elems($actions)),
(my $methods := nqp::setelems(nqp::list,$todo)),
(my $captures := nqp::setelems(nqp::list,$todo)),
(my int $w = -1),
nqp::while(
nqp::islt_i(($w = nqp::add_i($w,1)),$todo),
nqp::stmts(
(my $m := nqp::getattr(nqp::atpos($actions,$w),Pair,'$!key')),
nqp::if(
nqp::istype($m,Method),
nqp::bindpos($methods,$w,$m),
nqp::bindpos($methods,$w,List.^find_method($m.Str))
),
nqp::bindpos($captures,$w,
nqp::getattr(nqp::atpos($actions,$w),Pair,'$!value')
)
)
),
(my $batcher :=
Rakudo::Iterator.ConcurrentBatcher(source,$batch)),
($w = -1),
nqp::while(
nqp::islt_i(($w = nqp::add_i($w,1)),$workers),
nqp::bindpos($!promises,$w,
start {
CATCH { .say }
LEAVE nqp::push($queue,IterationEnd);
nqp::stmts(
(my $List := nqp::create(List)),
nqp::until(
nqp::eqaddr(
(my $enroute := $batcher.pull-buffer),
IterationEnd
),
nqp::stmts(
(my int $i = -1),
nqp::while(
nqp::islt_i(($i = nqp::add_i($i,1)),$todo),
nqp::stmts(
nqp::bindattr($List,List,'$!reified',
nqp::getattr($enroute,Pair,'$!value')
),
nqp::atpos($methods,$i)(
$List,|nqp::atpos($captures,$i)
).iterator.push-all(
nqp::bindattr($enroute,Pair,'$!value',
nqp::create(IterationBuffer)
)
)
)
)
),
nqp::push($queue,$enroute)
)
)
}
)
),
self
)
}
method new(\iterator, \actions, \batch, \workers) {
nqp::create(self)!SET-SELF(iterator,actions,batch,workers)!TWEAK
}
method sink-all(--> IterationEnd) is raw {
nqp::while(
$!alive,
nqp::stmts(
nqp::until(
nqp::eqaddr(nqp::shift($!queue),IterationEnd),
nqp::null
),
nqp::unless(
--$!alive,
self!cleanup,
)
)
)
}
method !cleanup(--> IterationEnd) {
nqp::while(
nqp::elems($!promises),
nqp::shift($!promises).result
)
}
}
method HyperActions(\iterator,\actions,\workers,\batch) {
class :: does ConcurrentActionator {
has $!slipped; # current list of values to produce
has $!processed; # list of processed chunks
has int $!offset; # ordinal number of chunk at index 0
method !TWEAK() {
nqp::stmts(
($!slipped := $empty),
($!processed := nqp::list),
self
)
}
method pull-one() is raw {
nqp::if(
nqp::elems($!slipped),
nqp::shift($!slipped), # produce from the chunk
nqp::stmts(
nqp::if( # no chunk to produce from
nqp::existspos($!processed,0),
nqp::stmts( # next chunk is available
($!offset = nqp::add_i($!offset,1)),
($!slipped := nqp::shift($!processed))
),
nqp::if( # next chunk not there
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::if( # a worker has expired
--$!alive,
self.pull-one, # others not, try again
self!cleanup, # mohican time, bye bye
),
nqp::if( # a fresh chunk
nqp::iseq_i($!offset,(my int $ordinal = $chunk.key)),
nqp::stmts( # in sequence chunk
($!offset = nqp::add_i($!offset,1)),
nqp::if( # lose placeholder if any
nqp::elems($!processed),
nqp::shift($!processed)
),
($!slipped := $chunk.value)
),
nqp::stmts( # out of sequence
nqp::bindpos( # store for later usage
$!processed,
nqp::sub_i($ordinal,$!offset),
$chunk.value
),
)
)
)
),
self.pull-one # rinse and repeat
)
)
}
method push-all($target --> IterationEnd) {
nqp::stmts(
nqp::while( # produce from available
nqp::elems($!slipped),
$target.push(nqp::shift($!slipped))
),
nqp::while( # do the other chunks
$!alive,
nqp::if(
nqp::existspos($!processed,0),
nqp::stmts( # next chunk is available
($!offset = nqp::add_i($!offset,1)),
(my $slipped := nqp::shift($!processed)),
nqp::while(
nqp::elems($slipped),
$target.push(nqp::shift($slipped))
)
),
nqp::if( # next chunk not there
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::unless( # worker expired
--$!alive,
self!cleanup # mohican time, bye bye
),
nqp::bindpos( # store for later usage
$!processed,
nqp::sub_i($chunk.key,$!offset),
$chunk.value
)
)
)
)
)
}
}.new(iterator, actions, batch, workers)
}
method RaceActions(\iterator,\actions,\batch,\workers) {
class :: does ConcurrentActionator {
has $!slipped;
method !TWEAK() {
nqp::stmts(
($!slipped := $empty),
self
)
}
method pull-one() is raw {
nqp::if(
nqp::elems($!slipped),
nqp::shift($!slipped), # produce from available
nqp::if( # no values available
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::if( # worker exhausted
--$!alive,
self.pull-one, # but still others
self!cleanup, # mohican time, bye bye
),
nqp::stmts(
($!slipped := $chunk.value), # could be empty
self.pull-one # so try again
)
)
)
}
method push-all($target --> IterationEnd) {
nqp::stmts(
nqp::while( # produce from available
nqp::elems($!slipped),
$target.push(nqp::shift($!slipped))
),
nqp::while( # do the other chunks
$!alive,
nqp::until(
nqp::eqaddr(
(my $chunk := nqp::shift($!queue)),
IterationEnd
),
nqp::stmts(
(my $slipped := $chunk.value),
nqp::while(
nqp::elems($slipped),
$target.push(nqp::shift($slipped))
)
)
),
nqp::unless(
--$!alive,
self!cleanup,
)
)
)
}
}.new(iterator, actions, batch, workers)
}
}
my @a = ^106;
my $now = now;
dd @a.Seq.hijper(:10batch).map({ sleep rand / 1000; $_++ }).grep({ $_ %% 2 });
say "parallel processed in {now - $now}";
dd @a;
$now = now;
dd @a.map( { sleep rand / 1000; $_++ } ).grep: { $_ %% 2 };
say "serial processed in {now - $now}";
#=================
(0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104).Seq
parallel processed in 0.01360739
Array @a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106]
(2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106).Seq
serial processed in 0.0882528
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment