Skip to content

Instantly share code, notes, and snippets.

@jnthn
Last active Feb 25, 2016
Embed
What would you like to do?
Answers to some concurrency questions
  1. Should two different taps of a a serial supply be run simultaneuosly?

$ perl6 -e 'my $p = Supplier.new; my $s = $p.Supply; $s.serial.say; $s.tap({ sleep 1; "42 {now}".say; }); $s.tap({ sleep 1; "43 {now}".say }); $p.emit(42); sleep 3;' True 42 Instant:1449454902.847377 43 Instant:1449454903.854135

No. Supplies do not introduce concurrency, at least not unless you explicitly ask for it with something like the start method. So the emit here is doing no more than a loop through the taps, calling the closures passed to tap one at a time on the thread that did the emit. To be very clear, this code is single-threaded.

  1. Should two different taps on a non-serial supply be run simultaneously?

$ perl6 -e 'my $p = Supplier.new; my $s = $p.unsanitized-supply; $s.serial.say; $s.tap({ sleep 1; "42 {now}".say; }); $s.tap({ sleep 1; "43 {now}".say }); $p.emit(42); sleep 3;' False 42 Instant:1449454968.236965 43 Instant:1449454969.243628

No, for the same reason as above.

  1. Should the same tap of a non-serial supply run simultaneously with itself?

$ perl6 -e 'my $p = Supplier.new; my $s = $p.unsanitized-supply; $s.serial.say; $s.tap({ sleep 1; "$_ {now}".say; }); $p.emit(42); $p.emit(43); sleep 3;' False 42 Instant:1449455041.944167 43 Instant:1449455042.947257

Again, no. Control does not come back to the sender until emit returns, and since no concurrency is introduced then the second emit won't run until after the the first one has blocked on the sleep.

More interesting is to consider:

my $p = Supplier.new;
my $s = $p.unsanitized-supply;
$s.serial.say;
$s.tap({ sleep 1; "$_ {now}".say; });
start $p.emit(42);
start $p.emit(43);
sleep 3;

Here, due to it being unsanitized, you'll get two tap blocks running together:

False
42 Instant:1456431413.332803
43 Instant:1456431413.332803

But if we just call Supply:

my $p = Supplier.new;
my $s = $p.Supply;
$s.serial.say;
$s.tap({ sleep 1; "$_ {now}".say; });
start $p.emit(42);
start $p.emit(43);
sleep 3;

Then we get concurrency control:

True
42 Instant:1456431508.621904
43 Instant:1456431509.626031
  1. Should Supply.list reach IterationEnd when the supply is quit, or throw?

my $r = Supplier.new; my $s = $r.Supply; start { for $s.list { $.say }; 42.say; CATCH { "OW $, $!".say } }; sleep 0.5; for 1..4 { $r.emit($_); sleep 0.5 }; $r.quit(X::AdHoc.new(:message)); sleep 1; ' 1 2 3 4 Use of Nil in string context in block at -e:1 OW Unexplained error,

Throw, simply out of the duality between the observer and iterator patterns. Note that:

for gather { die 'foo' } { }

Will also throw. Throwing is a perfectly acceptable thing for an iterator to do, and a quit is just an asynchronous throw.

  1. Should Supply.wait throw when the supply is quit?

$ perl6 -e 'my $r = Supplier.new; my $s = $r.Supply; start { $s.wait; 42.say; CATCH { "OW $, $!".say }}; sleep 0.5; for 1..4 { $r.emit($); sleep 0.5 }; $r.quit(X::AdHoc.new(:message)); sleep 1;' Use of Nil in string context in block at -e:1 OW Unexplained error,

Yes. Note that wait is actually doing a tap and just ignoring all of the emits. It only cares for done and quit. A wait is very much like a sink on a Seq, where you evaluate it only for its side-effects. You'll of course receive any exceptions thrown in that sinking.

  1. I'm guessing this one is clearly a bug, but then after that's dealt with, should this just IterationEnd?

$ perl6 -e 'my $r = Supplier.new; my $s = $r.Supply; start { for $s.Channel { $.say }; 42.say; }; sleep 0.5; for 1..4 { $r.emit($); sleep 0.5 }; $r.quit(X::AdHoc.new(:message)); sleep 1;' 1 2 3 4 Method 'quit' not found for invocant of class 'Channel' in block at -e:1

It's indeed a bug. It should really be calling .fail on the Channel, which should then be resulting in the exception being thrown in the for iteration.

  1. Behavior of "quit" again, for Supply.Promise:

$ perl6 -e 'my $r = Supplier.new; my $s = $r.Supply; start { my $p = $s.Promise; $p.result.say; 42.say; $p.perl.say; CATCH { "OW $, $!".say } }; sleep 0.5; for 1..4 { $r.emit($); sleep 0.5 }; $r.quit(X::AdHoc.new(:message)); sleep 1;' Use of Nil in string context in block at -e:1 OW Unexplained error, $

  1. Is Supply.Promise supposed to let the last emitted value escape out to the recipient of the Promise? The Supply owner may not want that and if they do, could hand out Supply.last() instead.

$ perl6 -e 'my $r = Supplier.new; my $s = $r.Supply; start { my $p = $s.Promise; $p.result.say; 42.say; $p.perl.say }; sleep 0.5; for 3..5 { $r.emit($_); sleep 0.5 }; $r.done(); sleep 1;' 5 42 Promise.new(scheduler => ThreadPoolScheduler.new(initial_threads => 0, max_threads => 16, uncaught_handler => Callable), status => PromiseStatus::Kept)

These are both intended. Coercing a Supply to a Promise taps the Supply, keeps the Promise with the last emit or Nil upon receiving a done, or breaks the Promise on a quit. Generally, this is useful when you have a Supply that will only produce one interesting value.

In general, it's worth noting that Supply, Channel, Iterable, and HyperIterable (which is rather under-implemented yet) represent four different paradigms for processing streams of values. They all have a concept of completion and of failure, and when we go between these paradigms we take care to map these.

The earlier questions seem to be angling towards using Supply as a tool for introducing concurrency. This will inevitably feel awkward, because they are primarily geared towards taming concurrency. They are best understood as an implementation of the observer pattern with built-in concurrency control, and an emit synchronously notifies all its observers of the emitted value.

A Channel, by contrast, is something when a send will not block the sender, and any number of receivers can take and process values received. If there is only one worker receiving, then order is still promised.

And another alternative is HyperIterable, which is not ready for prime-time yet, but will come to be the preferred way for "I have a bunch of values to process and I want to do that processing over multiple threads, and get all the results".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment