Skip to content

Instantly share code, notes, and snippets.

@jnthn
Last active March 31, 2017 17:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jnthn/6a80a9712fb38b32537f9f0e46fca6d7 to your computer and use it in GitHub Desktop.
Save jnthn/6a80a9712fb38b32537f9f0e46fca6d7 to your computer and use it in GitHub Desktop.

Planning hyper/race implementation

Existing work

Public Types

All these names are provisional.

Seqqy

A role is extracted containing various of the methods in Seq. Working name: Seqqy (after Setty, Baggy), but hope we'll end up with something better. For now, the structure is more important than the naming. The following methods currently in Seq will move to this role:

  • Array
  • List
  • Slip
  • Str
  • Stringy
  • AT-POS
  • EXISTS-POS
  • eager
  • fmt
  • gist
  • invert (why is this in Seq, not Any?)
  • new-consumed

This role will also do PositionalBindFailover, which provides:

  • cache
  • list

HyperSeq

Type representing an operation in a hyper pipeline. Does Seqqy. Used to build up a chain of operations that should operate under the hyper paradigm.

RaceSeq

Type representing an operation in a race pipeline. Does Seqqy. Used to build up a chain of operations that should operate under the race paradigm.

Pipeline construction and operation

Generalities

Like Seq, both HyperSeq and RaceSeq should be immutable types (so, we want more of a chain style construction than the mutable @!actions approach that has been suggested). As with Seq, no work happens until something either obtains a sequential iterator or sinks the chain (we may implement the latter in terms of .iterator.sink-all anyway).

Back-pressure

For hyper/race pipelines terminated with a sequential iteration, the rate of this iteration is used as a back-pressure mechanism. The sequential iteration determines when we start new batches of work, and we will never send off new batches for parallel processing when there are more than $degree outstanding completed batches.

For example, if the degree is 4, then we would immediately start producing 4 batches and start workers to process them. The consumer starts eating the first, so we schedule another batch to be produced and worked on. The other batches complete in the meantime, and then the 5th. If we have a slow consumer, it may still be processing the results of the first batch. There are now 4 that it has not even started eating, so we don't start any more parallel work. When the consumer then starts to eat the second batch, we are down to 4 outstanding completed batches, and have currently no work ongoing, so we schedule 4 more batches to the worked on. These produce results. If the consumer were to still be on the second batch after this, we'd now be up to 6 outstanding batches, and would not schedule any more work until the consumer has eaten 2 more, so we're back down to four outstanding batches. This means that, memory wise, we have a limit of 2 * $degree batches in memory at a time as a result of the parallel pipeline.

Work Batches

A work batch represents a batch of items that will be processed by a worker. For now we keep this as a Rakudo internal type.

class Rakudo::Internals::HyperWorkBatch {
    # The items in the batch.
    has IterationBuffer $.items;

    # Sequence number of the batch, starting from zero.
    has int $.sequence-number;

    # Is this the first batch that was produced at the last fork point or the last batch that the
    # fork point will produce?
    has Bool $.first;
    has Bool $.last;

    # Any extra state that an operation wishes to convey as a result of processing the batch.
    has Mu $.extra;

    # A callback set by the coordinator so it can take action if needed when the batch has
    # been consumed.
    has &.consumed-callback is rw;

    method mark-consumed(--> Nil) {
        .() with &!consumed-callback;
    }
}

Work Stages

There are a number of repeating patterns that show up across operations that we want to write parallel implementations of. These will be captured by roles, which will be implemented concretely by operations. Note that these are intended to be lower level that the higher level operations users perform. Some user-level operations will produce multiple work stages. Work stages form a fork/join graph, which in the simplest cases will be a single fork, some work on batches, and a join, but may in more complex cases have a number of join/fork points. Examples will be given after each of the types of work stage has been introduced.

Work stages form a linked list, with each stage pointing to the one before it. This is contained in a role.

role Rakudo::Internals::HyperWorkStage {
    has Rakudo::Internals::HyperWorkStage $.source;
}

HyperProcessor

This is a stage that wants to work at an individual batch level. It works in-place on the Rakudo::Internals::HyperWorkBatch instance that it is passed.

role Rakudo::Internals::HyperProcessor does Rakudo::Internals::HyperWorkStage {
    method process-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... }
}

There are no operation-agnostic implementations of this role.

HyperBatcher

This is a stage that takes a sequence of things and breaks them up into batches. A pipeline must start with one of these.

role Rakudo::Interanls::HyperBatcher does Rakudo::Internals::HyperWorkStage {
    has $!sequence = 0;

    method next-sequence-number() {
        $!sequence++
    }

    method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) { ... }
}

Many sequences will start with a Rakudo::Internals::HyperBatcher::Iterator, which makes batches out of an iterator. This would look something like:

class Rakudo::Interanls::HyperBatcher does Rakudo::Interanls::HyperBatcher {
    my constant NO_LOOKAHEAD = Mu.new;
    has Iterator $.iterator is required;
    has $!lookahead = NO_LOOKAHEAD;
    
    method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) {
        my IterationBuffer $items .= new;
        my Bool $first;
        my Bool $last;
        if $!lookahead === NO_LOOKAHEAD {
            $first = True;
            if $!iterator.push-exectly($batch-size, $items) === IterationEnd {
                $last = True;
            }
            else {
                $!lookahead = $!iterator.pull-one;
                $last = True if $!lookahead === IterationEnd;
            }
        }
        else {
            $first = False;
            $items.push($!lookahead);
            if $!iterator.push-exectly($batch-size - 1, $items) === IterationEnd {
                $last = True;
            }
            else {
                $!lookahead = $!iterator.pull-one;
                $last = True if $!lookahead === IterationEnd;
            }
        }
        my $sequence-number = self.next-sequence-number();
        return Rakudo::Interanls::HyperBatcher.new(:$sequence-number, :$items, :$first, :$last);
    }
}

A hyper-batcher that didn't yet produce its last batch, but that isn't ready to produce another one, can return a Rakudo::Interanls::HyperBatcher type object.

HyperJoiner

This is a stage that takes a sequence of batches and joins them together. When no more batches will be delivered, batches-completed will be called. (Note that this is distinct from the last property on an individual batch; the worker working on the last batch may complete ahead of other works, and so the batch marked last by the producer may well not be the last one that accept-batch will be called with.)

role Rakudo::Internals::HyperJoiner does Rakudo::Internals::HyperWorkStage {
    method accept-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... }
    method batches-completed(--> Nil) { ... }
}

A common implementation will be that placed at the end of a race pipeline that will be consumed in a sequential iteration. For that reason, it also implements the Iterator API.

class Rakudo::Internals::RaceToIterator does Rakudo::Internals::HyperJoiner does Iterator {
    # Note we can use a ConcurrentQueue REPR object rather than a Channel, for optimization.
    has Channel $.batches;

    method accept-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) {
        $batches.send($batch);
    }

    method batches-completed(--> Nil) { $batches.send(Mu) };

    has IterationBuffer $!current-items;

    method pull-one() {
        until $!current-items { # Handles empty batches
            my $batch = $!batches.receive // return IterationEnd;
            $batch.mark-consumed();
            $!current-items = $batch.items;
        }
        $!current-items.shift
    }
}

SequencedHyperJoiner

Just like HyperJoiner, except accept-batch will be called with the batches in their sequence order.

Relationship with HyperSeq/RaceSeq

A HyperSeq and a RaceSeq contain the current head of the work stage chain, together a configuration object that is obtained from the prior one.

my class HyperConfiguration {
    has int $.batch;
    has Int $.degree;
}

class HyperSeq {
    has HyperConfiguration $.configuration;
    has Rakudo::Internals::HyperWorkStage $!work-stage-head;
    method !work-stage-head() { $!work-stage-head }

    ...
}

class RaceSeq {
    has HyperConfiguration $.configuration;
    has Rakudo::Internals::HyperWorkStage $!work-stage-head;
    method !work-stage-head() { $!work-stage-head }

    ...
}

Composition of operations using work stages

A pipeline is composed by passing the tail end of it to Rakudo::Internals::HyperCoordinator.start. This starts processing, and is in charge of coordinating the operation, including launching workers.

class RaceSeq {
    method iterator() {
        my $iter = Rakudo::Internals::RaceToIterator.new(source => $!work-stage-head);
        Rakudo::Internals::HyperCoordinator.start($iter);
        $iter
    }
}

Operations would be composed by implementing various stages. For example:

  • Both hyper and race map on single items would be implemented as a HyperProcessor
  • A sort with a projection would be implemented by a HyperProcessor that does the projection of each item in the batch, putting it into the extra slot, together with something doing both HyperJoiner and HyperBatchProducer. This would only be able to start producing batches once it as received all input batches.
  • Hyper unique would be implemented by a HyperProcessor that does the unique on each block, followed by a SequencedHyperJoiner that does a final uniq run over the items, maintinaing the state between them. For race unique it's just a HyperJoiner. (There may be a way to share out the central "already seen" view to the workers also.)

The composer coordinates the overall operation. It is responsible for:

  • Producing batches when we need them, and making sure we're only ever producing one at a time from a given BatchProducer.
  • Making sure that we only call accept-batch on a given batcher one batch at a time, and only call batches-completed when we won't call accept-batch again.
  • Making sure that a sequence of BatchProcessors are executed in a single worker for a given batch, for better locality of reference.
  • When an object both a batch joiner and a batch producer, making sure that we give it a chance to produce a batch each time after we've fed it a batch.
@moritz
Copy link

moritz commented Mar 23, 2017

Sounds sensible to me.

Proposed name for Seqqy: Sequential, to mirror Positional and Associative.

@jnthn
Copy link
Author

jnthn commented Mar 23, 2017

That's certainly attractive in the "real word" sense. I do worry a bit about having parallel constructs doing a role called Sequential, however...

@lizmat
Copy link

lizmat commented Mar 23, 2017

We also need a way to stop work of other workers (or at least don't start any new jobs), e.g. when an exception occurs, or when a map/grep does a "last", or when a .first finds something. Did I miss that in the above description?

@lizmat
Copy link

lizmat commented Mar 24, 2017

Finally read the whole thing while resting on a cycling trip: and I have a few comments :-)

  1. class Rakudo::Internals::HyperBatcher does class Rakudo::Internals::HyperBatcher # this seems wrong :-)

  2. many HyperProcessors won't care if they process a first or a last batch. So it feels to me that class Rakudo::Internals::HyperBatcher needs to be split into one that does lookahead, and one that doesn't. Alternately, perhaps have a Smart and a Dumb HyperWorkBatch, with the dumb one having 'method first(--> Nil) { }' / 'method last(--> Nil) {} .

  3. I would recommend a HyperBatcher that isn't finished, but which cannot produce new values, to return Nil, rather than a type object, as different implementations might return different type objects. And Nil is generally accepted as indicating the absence of a value where there should be one.

  4. Shouldn't HyperJoiner be called RaceJoiner, and SequencedHyperJoiner be HyperJoiner?

  5. re HyperJoiner for .unique: the seen hash could be passed as the first element of the HyperWorkBatch once it's done. Simple, no additional mechanics needed. Just a HyperJoiner that understands this.

  6. "Making sure that a sequence of BatchProcessors are executed in a single worker for a given batch, for better locality of reference." How is this important in a world where a thread can be interrupted while waiting for something, and have it continue on another thread? Aka, we already cannot depend on locality of reference. Or am I missing something?

Ok, that's it, I think. For now :-)

@donaldh
Copy link

donaldh commented Mar 28, 2017

What about Sequency?

@zostay
Copy link

zostay commented Mar 31, 2017

Some name brainstorming.

  • Flowy - Seq follows a flow and so does a HyperSeq, even if they aren't both serial/sequential.
  • Succy - A succession? That name succs, though.
  • Chainy - A chain is a little more discrete than a flow, which makes more sense in a way, but a chain also gives a stronger suggestion of Sequential.
  • Flux - This feels too loaded and inspecific for the task to me, but I call dibs on Flux::Capacitor if it sticks.
  • Cascade - I can think of a Seq as being a kind of Cascade. This might be my favorite from the thesaurus so far.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment