Skip to content

Instantly share code, notes, and snippets.

@jnthn
Last active October 13, 2015 02:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jnthn/a56fd4a22e7c43080078 to your computer and use it in GitHub Desktop.
Save jnthn/a56fd4a22e7c43080078 to your computer and use it in GitHub Desktop.
Overview of planned some S17 changes, for community review

Changes and syntactic relief for S17

This document describes the eventual intended semantics of await, lays out some syntactic relief for working with supplies, considers the status of channels, and proposes the end of the syntax formerly known as earliest, winner, etc. - which I've never really liked. One further area to be covered in a similar document to this is cancellation.

Feedback welcome! -- jnthn

await

The await function is used to efficiently wait for one or more asynchronous operations to have completed. A common use is with a Promise:

my $p1 = start sub-long-computation();
my $p2 = start another-long-computation();
await $p1, $p2;
say "P1: $p1.result()";
say "P2: $p2.result()";

Since await also unpacks results, you can also write:

my $p1 = start sub-long-computation();
my $p2 = start another-long-computation();
my ($r1, $r2) = await $p1, $p2;
say "P1: $r1";
say "P2: $r2";

If any Promise is broken, then the exception it was broken with is thrown by await. It's also possible to use await with a Supply. In this csae, await results in the final value emitted by a Supply, Nil if it was done without a value, or throws any exception that it quit with. Note that await will tap the Supply, and so trigger execution if it's an on-demand supply.

The await function takes one or more objects that do the Awaitable role. This is done by both Promise and Supply. It then looks up the current awaiter in dynamic scope, through the $*AWAITER dynamic variable, and passes the list of Awaitables to its await method. Put another way, the await function is something like:

sub await(*@awaitables) {
    for @awaitables.grep(* !~~ Awaitable) {
        die "Cannot await a $_.^name()";
    }
    $*AWAITER.await(@awaitables);
}

The Awaitable role requires provision of a subscribe-awaiter method, which takes two closures: one invoked with a result if the awaited operation is successful, and another invoked with an Exception object if the awaited operation failed.

The default awaiter, installed in PROCESS, blocks the current thread until the awaited operation has completed. However, thread pool threads, managed by ThreadPoolScheduler, have a rather different awaiter. Theirs will take a continuation rooted at the scheduler's work loop, and pass closures to the subscribe-awaiter method to schedule the resumption of that closure on the thread pool when the operation completes. This means that while writing sleep sort this way will end up with a huge number of kernel threads, and possibly exhaust the thread pool if you sort enough values:

await do for @values -> $value {
    start {
        sleep $value;
        say $value;
    }
}

You can do it potentially with only a single thread pool thread being needed by writing it as:

await do for @values -> $value {
    start {
        await Promise.in($value);
        say $value;
    }
}

Note that here the outermost await, presuming it's in the main program body and so hits the default awaiter, will actually block the main program thread. But the await inside of the start is being run on a thread pool thread, and so will quickly return control to the thread pool until the amount of time has elapsed, and then be resumed.

Serial supplies

A serial supply promises you that it will never do concurrent emit, done, or quit calls. Every supply knows whether it can promise this, and if it promises it then things downstream can depend on it. That is to say, on a serial supply, .tap is equivalent to .act. Whether a supply is serial or not can be introspected with the serial method.

Serial supplies are strongly preferred, since those tapping the supply can be confident they will not encounter concurrency issues in their projection functions. All of the built-in Supply factory methods will ensure that any supplies they produce are serial. This includes supply combinators such as merge and zip, which tap multiple supplies. The serialize method takes a non-serial supply and produces a serial one.

XXX method forms of hyper/race TBD (see further down for how sugar looks, though)

Supply syntax constructs

A supply block is a convenient way to create an on-demand supply. When the supply is tapped, the supply block is run. Within it, emit can be used to emit values to the tapper, and done can be used to convey that there will be no more values.

my $s = supply {
    emit 'a value!';
    emit 'another!';
    done;
}

The emit and done can be done in nested scopes, and follow the same rules as gather and take. An unhandled exception thrown inside of a supply block will be passed onwards using quit.

For consuming supplies, there is an asynchronous looping construct, known as whenever. A whenever placed lexically inside of a supply (it need not be directly) will be associated with that Supply. Therefore, a close of the supply will also close a tap opened by a whenever. As a simple example, consider writing something to filter negative values out (granted, you could have done this with Supply.grep):

sub remove-negatives($value-supply) {
    return supply {
        whenever $value-supply -> $value {
            emit $value if $value >= 0;
        }
    }
}

Since whenever promises you will only ever be inside of its loop body in one thread at a time, you can also do stateful things:

sub add-sequence-number($supply-in) {
    return supply {
        whenever $supply-in -> $value {
            state $seq-number = 1;
            emit [$seq-number++, $value];
        }
    }
}

Since whenever is asynchronous, and falling out of a supply block does not close the supply (an explicit done is needed for that), you can set up many whenevers in a supply block. The "in one loop body at a time" rule is extended to cover all the whenever loop bodies, so you can safely keep state visible between them. Thus you can do things like:

sub rate-limit($supply-in, $max-per-second) {
    return supply {
        my $in-last-second = 0;
        
        # Emit up to the limit values per second.
        whenever $supply-in -> $value {
            if $in-last-second++ < $max-per-second {
                emit $value;
            }
        }
        
        # Clear the limit once per second.
        whenever Supply.interval(1) {
            $in-last-second = 0;
        }
    }
}

Just be sure to keep the state inside of the supply block itself, so you have it per time the supply is tapped.

Since the whenever does not need to be directly inside of a supply block, it's also possible to set up combinators that will tap any number of supplies.

sub merge(*@supplies) {
    return supply {
        for @supplies -> $mergee {
            whenever $mergee -> $value {
                emit $value;
            }
        }
    }
}

The usual set of control exceptions can be used inside of the construct. That is redo just re-runs the block with the current value, next skips the rest of the code in the loop, and last un-taps the supply we're reacting to (this is the in-loop way to close the tap). So we can write something that starts to emit values when one matching a certain condition comes its way, and then shut down the supply when a value matching the off condition comes its way:

sub on-off($supply-in, &on, &off) {
    return supply {
        whenever $supply-in -> $value {
            state $on = False;
            unless $on {
                next unless on($value);
                $on = True;
            }
            last if off($value);
            emit $value;
        }
    }
}

If all whenever blocks in a supply are done, then the supply itself will also signal done. Therefore, our merge routine above handles finite supplies correctly "automatically". By contrast, our rate limiting example is problematic as the per-second interval we're using is eternal. Thankfully, we can use loop phasers to spot the last event in the supply we're filtering, and mark the surrounding supply as done:

sub rate-limit($supply-in, $max-per-second) {
    return supply {
        my $in-last-second = 0;
        
        # Emit up to the limit values per second.
        whenever $supply-in -> $value {
            if $in-last-second++ < $max-per-second {
                emit $value;
                LAST done;
            }
        }
        
        # Clear the limit once per second.
        whenever Supply.interal(1) {
            $in-last-second = 0;
        }
    }
}

If any of the whenevers in a supply block should quit, or if the code in a whenever block throws an exception, then this will be sent onwards and all taps on other whenever blocks will be closed. More control over errors can be obtained using the QUIT phaser. A QUIT phaser applies to all whenever blocks in the exact scope that it exists in. Like CATCH, it expects you to smartmatch on the exception, and failure to match it will cause the quit to carry onwards.

We could write an infinite retry as:

sub retry-forever($supply-in) {
    return supply {
        sub try-it() {
            whenever $supply-in -> $value {
                emit $value;
            }
            QUIT {
                default {
                    try-it();
                }
            }
        }
        try-it();
    }
}

Though that particular example should already be available as a .retry(*) method on any Supply. Also note that, due to the asynchronous nature of whenever and QUIT, that is not actually an infinite recursion!

A slightly more intresting example would be implementing a fallback:

sub fallback($supply-a, $supply-b) {
    return supply {
        whenever $supply-a -> $value {
            emit $value;
        }
        QUIT {
            default {
                whenever $supply-b -> $value {
                    emit $value;
                }
            }
        }
    }
}

Here, the QUIT only applies to the first whenever, and the second is not protected. This one should also be available simply as a .catch(...) method on a Supply, though. You'll most likely use QUIT in other more complex supplies.

For a supply block with many whenevers, one QUIT can be used to deal with any of them quitting. To get finer control (if you wanted to do different things for different whenevers), putting one or more whenevers in a bare block along with a QUIT will suffice.

If you nest one whenever inside of another, then it will be associated with any enclosing supply. This can be useful for writing things over supplies of supplies. For example, we may have a supply of news topics. Whenever we start to be interested in a new topic, it's emitted, and the object it sends has a supply of posts. We want to then also tap those posts, and emit strings that have the topic name along with the post title. We can write this as:

class Topic {
    has Str $.name;
    has Supply $.posts;
    ...
}
class Post {
    has Str $.title;
    ...
}
sub active-topic-titles($topic-supply) {
    return supply {
        whenever $topic-supply -> $topic {
            whenever $topic.posts -> $post {
                emit "[$topic.name()]: $post.title()";
            }
        }
    }
}

Alternatively, we might only we interested in one topic at a time, and want to close the tap on the last topic. Since a whenever evaluates to a Tap, we can just write:

sub latest-topic-titles($topic-supply) {
    return supply {
        my $cur-posts-tap;
        whenever $topic-supply -> $topic {
            $cur-posts-tap.?close();
            $cur-posts-tap = whenever $topic.posts -> $post {
                emit "[$topic.name()]: $post.title()";
            }
        }
    }
}

XXX further semantics of nesting?

It's also possible to use whenever on a Promise. This will actually coerce it into a Supply first.

sub timeout-until-first-value($supply-in, $timeout) {
    return supply {
        my $saw-value = False;
        whenever $supply-in -> $value {
            emit $value;
            $saw-value = True;
        }
        whenever Promise.in($timeout) {
            die "Timed out" unless $saw-value;
        }
    }
}

Note that the die here causes a quit on the supply overall, and that in turn closes the tap of $supply-in, which may invoke some cancellation logic (for example, cancelling an in-flight web request).

One may wonder what to do at the top level of an application, where it's mostly just interesting to "sink" other supplies and do side-effects rather than emit values. Even if values won't be emitted, it still makes sense to nest this in a supply block for the sake of error handling. Here's a simple echo server, to demonstrate:

await supply {
    whenever IO::Socket::Async.listen('localhost', 3333) -> $conn {
        say "Incoming connection";
        whenever $conn.bytes_supply -> $buf {
            try say $buf.decode;
            await $conn.write($buf);
            LAST $conn.close;
        }
    }
}

The use of await inside a whenever, as in this example, will prevent that whenever loop from seeing any more values until after the await has been completed. It has no effect on any outer whenever blocks, however. This means we will not echo output back out-of-order, but equally we will not block processing of other incoming connections.

The "one thread at a time" semantics are good for safety, but sometimes you are dealing with a supply where you'd rather process the incoming events over a number of threads. For example, there's no reason we can't process incoming requests to our echo server over multiple threads. We do this using race:

await supply {
    race whenever IO::Socket::Async.listen('localhost', 3333) -> $conn {
        say "Incoming connection";
        whenever $conn.bytes_supply -> $buf {
            try say $buf.decode;
            await $conn.write($buf);
            LAST $conn.close;
        }
    }
}

Note that we do not apply race to the inner whenever, since we do care about ordering there. The race prefix will cause the whenever loop body to be scheduled on the thread pool, meaning that even if the supply we are tapping is serial, we will behave as if it were not.

All supplies produced by supply are serial, even in the presence of race. That is to say:

my $results = supply {
    race whenever $things-to-compute -> $value {
        emit do-long-computation($value);
    }
}
my $total = 0;
$results.tap: {
    $total += $_;
    say "$_ ($total)";
}

Will never be a data race on $total. However, with race the order of the emitted results may not match the input order. To get that, use hyper:

my $results = supply {
    hyper whenever $things-to-compute -> $value {
        emit do-long-computation($value);
    }
}

Channels

Our current channels are really just a blocking concurrent queue. This can be useful, but since it implies blocked threads waiting for values then it's not the best of solutions for most user-space problems. Better is to use await with a Promise, or whenever over a Supply. Also, the name channel is evocative of the concept in Go, but our channels aren't like that. They don't block senders until the receiver is ready, for example. While a blocking queue is a very useful thing to have in a language, and we should keep it, we may want to stop calling it Channel, to avoid confusion (and keep the name free for something that has the Go-ish semantics). Finally, the error propagation could now go away (completion propagation, on the other hand, can still be very useful). In general, this type becomes a little more "low level", since the good uses of it are usually in implementing things like thread pools - that is, infrastructure.

The fate of earliest/more/wait/done

One of the reasons we've had trouble with this construct is that it tried to support both promises and channels, but the former are non-blocking and the latter are blocking. Its origins date back to before supplies. Now we have supplies, and with a proposal for some nice syntactic relief for them, which also handles promises nicely, it's probably time to say goodbye to earliest and friends.

And what of consuming a blocking queue (currently Channel)? There's still the .list coercer that produces a lazy list of things coming out of it, and we can (keep/add) a coercer to Supply. In fact, that would even mean that we get whenever support for channels "for free" and people can simply move to the new syntax.

@skids
Copy link

skids commented Jul 19, 2015

It would also seem reasonable for 'await' to return any thrown-then-resumed Exceptions in the corresponding slot in the unpacked array.

The QUIT example claiming it is "not an infinite recursion" strikes me as confusing. The QUIT block appears to be inside the dynamic scope of the sub. It does not matter that the sub returns, immediately it is still in that scope (persisted as a continuation). Would sub a { my $_a = 1; whenever ...; QUIT { $_a.say }} find the $*a?

At first it looks like the parent and child nested whenevers still are in the same "only in one loop body at a time rule" and that is why the one-topic-at-a-time example doesn't just prematurely close topics before viewing all the posts in the topic. Then the echo server example states that awaiting inside the child whenever does not block processing new connections. This is causing me some cognitive dissonance.

@jido
Copy link

jido commented Jul 24, 2015

In the "stateful things" example, you have written return instead of emit:

return [$seq-number++, $value];

In the rate limiting example, LAST should be outside the if otherwise it may never be hit:

    # Emit up to the limit values per second.
    whenever $supply-in -> $value {
        if $in-last-second++ < $max-per-second {
            emit $value;
            LAST done;
        }
    }

Skids, I think that topics do get closed prematurely in the one at a time example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment