This document describes the eventual intended semantics of await
, lays out
some syntactic relief for working with supplies, considers the status of
channels, and proposes the end of the syntax formerly known as earliest
,
winner
, etc. - which I've never really liked. One further area to be
covered in a similar document to this is cancellation.
Feedback welcome! -- jnthn
The await
function is used to efficiently wait for one or more asynchronous
operations to have completed. A common use is with a Promise
:
my $p1 = start sub-long-computation();
my $p2 = start another-long-computation();
await $p1, $p2;
say "P1: $p1.result()";
say "P2: $p2.result()";
Since await also unpacks results, you can also write:
my $p1 = start sub-long-computation();
my $p2 = start another-long-computation();
my ($r1, $r2) = await $p1, $p2;
say "P1: $r1";
say "P2: $r2";
If any Promise
is broken, then the exception it was broken with is thrown
by await
. It's also possible to use await
with a Supply
. In this csae,
await
results in the final value emitted by a Supply
, Nil
if it was
done
without a value, or throws any exception that it quit
with. Note
that await
will tap
the Supply
, and so trigger execution if it's an
on-demand supply.
The await
function takes one or more objects that do the Awaitable
role.
This is done by both Promise
and Supply
. It then looks up the current
awaiter in dynamic scope, through the $*AWAITER
dynamic variable, and
passes the list of Awaitable
s to its await
method. Put another way, the
await
function is something like:
sub await(*@awaitables) {
for @awaitables.grep(* !~~ Awaitable) {
die "Cannot await a $_.^name()";
}
$*AWAITER.await(@awaitables);
}
The Awaitable
role requires provision of a subscribe-awaiter
method,
which takes two closures: one invoked with a result if the awaited operation
is successful, and another invoked with an Exception
object if the awaited
operation failed.
The default awaiter, installed in PROCESS
, blocks the current thread until
the awaited operation has completed. However, thread pool threads, managed by
ThreadPoolScheduler
, have a rather different awaiter. Theirs will take a
continuation rooted at the scheduler's work loop, and pass closures to the
subscribe-awaiter
method to schedule the resumption of that closure on the
thread pool when the operation completes. This means that while writing sleep
sort this way will end up with a huge number of kernel threads, and possibly
exhaust the thread pool if you sort enough values:
await do for @values -> $value {
start {
sleep $value;
say $value;
}
}
You can do it potentially with only a single thread pool thread being needed by writing it as:
await do for @values -> $value {
start {
await Promise.in($value);
say $value;
}
}
Note that here the outermost await
, presuming it's in the main program body
and so hits the default awaiter, will actually block the main program thread.
But the await
inside of the start
is being run on a thread pool thread,
and so will quickly return control to the thread pool until the amount of
time has elapsed, and then be resumed.
A serial supply promises you that it will never do concurrent emit
, done
,
or quit
calls. Every supply knows whether it can promise this, and if it
promises it then things downstream can depend on it. That is to say, on a
serial supply, .tap
is equivalent to .act
. Whether a supply is serial
or not can be introspected with the serial
method.
Serial supplies are strongly preferred, since those tapping the supply can
be confident they will not encounter concurrency issues in their projection
functions. All of the built-in Supply
factory methods will ensure that any
supplies they produce are serial. This includes supply combinators such as
merge
and zip
, which tap multiple supplies. The serialize
method takes
a non-serial supply and produces a serial one.
XXX method forms of hyper/race TBD (see further down for how sugar looks, though)
A supply
block is a convenient way to create an on-demand supply. When the
supply is tapped, the supply
block is run. Within it, emit
can be used to
emit values to the tapper, and done
can be used to convey that there will be
no more values.
my $s = supply {
emit 'a value!';
emit 'another!';
done;
}
The emit
and done
can be done in nested scopes, and follow the same rules
as gather and take. An unhandled exception thrown inside of a supply
block
will be passed onwards using quit
.
For consuming supplies, there is an asynchronous looping construct, known as
whenever
. A whenever
placed lexically inside of a supply
(it need not
be directly) will be associated with that Supply. Therefore, a close
of the
supply will also close a tap opened by a whenever
. As a simple example,
consider writing something to filter negative values out (granted, you could
have done this with Supply.grep
):
sub remove-negatives($value-supply) {
return supply {
whenever $value-supply -> $value {
emit $value if $value >= 0;
}
}
}
Since whenever
promises you will only ever be inside of its loop body in
one thread at a time, you can also do stateful things:
sub add-sequence-number($supply-in) {
return supply {
whenever $supply-in -> $value {
state $seq-number = 1;
emit [$seq-number++, $value];
}
}
}
Since whenever
is asynchronous, and falling out of a supply
block does
not close the supply (an explicit done
is needed for that), you can set up
many whenever
s in a supply
block. The "in one loop body at a time" rule
is extended to cover all the whenever
loop bodies, so you can safely keep
state visible between them. Thus you can do things like:
sub rate-limit($supply-in, $max-per-second) {
return supply {
my $in-last-second = 0;
# Emit up to the limit values per second.
whenever $supply-in -> $value {
if $in-last-second++ < $max-per-second {
emit $value;
}
}
# Clear the limit once per second.
whenever Supply.interval(1) {
$in-last-second = 0;
}
}
}
Just be sure to keep the state inside of the supply
block itself, so you
have it per time the supply is tapped.
Since the whenever
does not need to be directly inside of a supply
block,
it's also possible to set up combinators that will tap any number of supplies.
sub merge(*@supplies) {
return supply {
for @supplies -> $mergee {
whenever $mergee -> $value {
emit $value;
}
}
}
}
The usual set of control exceptions can be used inside of the construct. That
is redo
just re-runs the block with the current value, next
skips the rest
of the code in the loop, and last
un-taps the supply we're reacting to (this
is the in-loop way to close the tap). So we can write something that starts to
emit values when one matching a certain condition comes its way, and then shut
down the supply when a value matching the off condition comes its way:
sub on-off($supply-in, &on, &off) {
return supply {
whenever $supply-in -> $value {
state $on = False;
unless $on {
next unless on($value);
$on = True;
}
last if off($value);
emit $value;
}
}
}
If all whenever
blocks in a supply
are done
, then the supply
itself
will also signal done. Therefore, our merge
routine above handles finite
supplies correctly "automatically". By contrast, our rate limiting example is
problematic as the per-second interval we're using is eternal. Thankfully, we
can use loop phasers to spot the last event in the supply we're filtering, and
mark the surrounding supply as done:
sub rate-limit($supply-in, $max-per-second) {
return supply {
my $in-last-second = 0;
# Emit up to the limit values per second.
whenever $supply-in -> $value {
if $in-last-second++ < $max-per-second {
emit $value;
LAST done;
}
}
# Clear the limit once per second.
whenever Supply.interal(1) {
$in-last-second = 0;
}
}
}
If any of the whenever
s in a supply
block should quit
, or if the code in
a whenever
block throws an exception, then this will be sent onwards and all
taps on other whenever
blocks will be closed. More control over errors can
be obtained using the QUIT
phaser. A QUIT
phaser applies to all whenever
blocks in the exact scope that it exists in. Like CATCH
, it expects you to
smartmatch on the exception, and failure to match it will cause the quit
to
carry onwards.
We could write an infinite retry as:
sub retry-forever($supply-in) {
return supply {
sub try-it() {
whenever $supply-in -> $value {
emit $value;
}
QUIT {
default {
try-it();
}
}
}
try-it();
}
}
Though that particular example should already be available as a .retry(*)
method on any Supply
. Also note that, due to the asynchronous nature of
whenever
and QUIT
, that is not actually an infinite recursion!
A slightly more intresting example would be implementing a fallback:
sub fallback($supply-a, $supply-b) {
return supply {
whenever $supply-a -> $value {
emit $value;
}
QUIT {
default {
whenever $supply-b -> $value {
emit $value;
}
}
}
}
}
Here, the QUIT only applies to the first whenever
, and the second is not
protected. This one should also be available simply as a .catch(...)
method
on a Supply
, though. You'll most likely use QUIT
in other more complex
supplies.
For a supply
block with many whenever
s, one QUIT
can be used to deal
with any of them quitting. To get finer control (if you wanted to do different
things for different whenever
s), putting one or more whenever
s in a bare
block along with a QUIT
will suffice.
If you nest one whenever
inside of another, then it will be associated with
any enclosing supply. This can be useful for writing things over supplies of
supplies. For example, we may have a supply of news topics. Whenever we start
to be interested in a new topic, it's emitted, and the object it sends has a
supply of posts. We want to then also tap those posts, and emit strings that
have the topic name along with the post title. We can write this as:
class Topic {
has Str $.name;
has Supply $.posts;
...
}
class Post {
has Str $.title;
...
}
sub active-topic-titles($topic-supply) {
return supply {
whenever $topic-supply -> $topic {
whenever $topic.posts -> $post {
emit "[$topic.name()]: $post.title()";
}
}
}
}
Alternatively, we might only we interested in one topic at a time, and want to
close the tap on the last topic. Since a whenever
evaluates to a Tap
, we
can just write:
sub latest-topic-titles($topic-supply) {
return supply {
my $cur-posts-tap;
whenever $topic-supply -> $topic {
$cur-posts-tap.?close();
$cur-posts-tap = whenever $topic.posts -> $post {
emit "[$topic.name()]: $post.title()";
}
}
}
}
XXX further semantics of nesting?
It's also possible to use whenever
on a Promise
. This will actually coerce
it into a Supply
first.
sub timeout-until-first-value($supply-in, $timeout) {
return supply {
my $saw-value = False;
whenever $supply-in -> $value {
emit $value;
$saw-value = True;
}
whenever Promise.in($timeout) {
die "Timed out" unless $saw-value;
}
}
}
Note that the die
here causes a quit
on the supply overall, and that in
turn closes the tap of $supply-in
, which may invoke some cancellation
logic (for example, cancelling an in-flight web request).
One may wonder what to do at the top level of an application, where it's
mostly just interesting to "sink" other supplies and do side-effects rather
than emit values. Even if values won't be emitted, it still makes sense to
nest this in a supply
block for the sake of error handling. Here's a
simple echo server, to demonstrate:
await supply {
whenever IO::Socket::Async.listen('localhost', 3333) -> $conn {
say "Incoming connection";
whenever $conn.bytes_supply -> $buf {
try say $buf.decode;
await $conn.write($buf);
LAST $conn.close;
}
}
}
The use of await
inside a whenever
, as in this example, will prevent that
whenever
loop from seeing any more values until after the await
has been
completed. It has no effect on any outer whenever
blocks, however. This
means we will not echo output back out-of-order, but equally we will not
block processing of other incoming connections.
The "one thread at a time" semantics are good for safety, but sometimes you
are dealing with a supply where you'd rather process the incoming events over
a number of threads. For example, there's no reason we can't process incoming
requests to our echo server over multiple threads. We do this using race
:
await supply {
race whenever IO::Socket::Async.listen('localhost', 3333) -> $conn {
say "Incoming connection";
whenever $conn.bytes_supply -> $buf {
try say $buf.decode;
await $conn.write($buf);
LAST $conn.close;
}
}
}
Note that we do not apply race
to the inner whenever
, since we do care
about ordering there. The race
prefix will cause the whenever
loop body
to be scheduled on the thread pool, meaning that even if the supply we are
tapping is serial, we will behave as if it were not.
All supplies produced by supply
are serial, even in the presence of race
.
That is to say:
my $results = supply {
race whenever $things-to-compute -> $value {
emit do-long-computation($value);
}
}
my $total = 0;
$results.tap: {
$total += $_;
say "$_ ($total)";
}
Will never be a data race on $total. However, with race
the order of the
emitted results may not match the input order. To get that, use hyper
:
my $results = supply {
hyper whenever $things-to-compute -> $value {
emit do-long-computation($value);
}
}
Our current channels are really just a blocking concurrent queue. This can be
useful, but since it implies blocked threads waiting for values then it's not
the best of solutions for most user-space problems. Better is to use await
with a Promise
, or whenever
over a Supply
. Also, the name channel
is
evocative of the concept in Go, but our channels aren't like that. They don't
block senders until the receiver is ready, for example. While a blocking queue
is a very useful thing to have in a language, and we should keep it, we may
want to stop calling it Channel
, to avoid confusion (and keep the name free
for something that has the Go-ish semantics). Finally, the error propagation
could now go away (completion propagation, on the other hand, can still be
very useful). In general, this type becomes a little more "low level", since
the good uses of it are usually in implementing things like thread pools -
that is, infrastructure.
One of the reasons we've had trouble with this construct is that it tried to
support both promises and channels, but the former are non-blocking and the
latter are blocking. Its origins date back to before supplies. Now we have
supplies, and with a proposal for some nice syntactic relief for them, which
also handles promises nicely, it's probably time to say goodbye to earliest
and friends.
And what of consuming a blocking queue (currently Channel
)? There's still
the .list
coercer that produces a lazy list of things coming out of it, and
we can (keep/add) a coercer to Supply
. In fact, that would even mean that we
get whenever
support for channels "for free" and people can simply move to
the new syntax.
It would also seem reasonable for 'await' to return any thrown-then-resumed Exceptions in the corresponding slot in the unpacked array.
The QUIT example claiming it is "not an infinite recursion" strikes me as confusing. The QUIT block appears to be inside the dynamic scope of the sub. It does not matter that the sub returns, immediately it is still in that scope (persisted as a continuation). Would sub a { my $_a = 1; whenever ...; QUIT {$_a.say }} find the $ *a?
At first it looks like the parent and child nested whenevers still are in the same "only in one loop body at a time rule" and that is why the one-topic-at-a-time example doesn't just prematurely close topics before viewing all the posts in the topic. Then the echo server example states that awaiting inside the child whenever does not block processing new connections. This is causing me some cognitive dissonance.