Skip to content

Instantly share code, notes, and snippets.

@axefrog
Last active June 16, 2017 02:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save axefrog/83f22161d0ffb39b112f837a0f13d56e to your computer and use it in GitHub Desktop.
Save axefrog/83f22161d0ffb39b112f837a0f13d56e to your computer and use it in GitHub Desktop.
Hot-swappable pipelines for Most.js streams
import {Stream, Sink, Scheduler, Disposable} from '@most/types';
import {disposeOnce, disposeAll} from '@most/disposable';
export type ActivateExtension<A = any> = (source: Stream<A>) => Stream<A>;
export type DeactivateExtension = null;
export type ExtensionAction<A = any> = ActivateExtension<A>|DeactivateExtension;
export type ExtensionLifecycle<A = any> = Stream<ExtensionAction<A>>;
export function extensible<A>(source: Stream<A>, extensions: Stream<ExtensionLifecycle<A>>): Stream<A> {
return new ExtensiblePipeline(extensions, source);
}
export class ExtensiblePipeline<A> implements Stream<A> {
constructor(
public readonly extensions: Stream<ExtensionLifecycle<A>>,
public readonly source: Stream<A>
) {}
run(sink: Sink<A>, scheduler: Scheduler): Disposable {
const head = new Boundary<A>();
const tail = new Boundary<A>();
head.next = tail;
head.sink = tail;
tail.previous = head;
tail.sink = sink;
const extender = new PipelineExtender(head, tail, scheduler);
return disposeOnce(disposeAll([
extender,
this.extensions.run(extender, scheduler),
this.source.run(head, scheduler)
]));
}
}
class PipelineExtender<A> implements Sink<ExtensionLifecycle<A>> {
constructor(
private _head: Boundary<A>,
private _tail: Boundary<A>,
private _scheduler: Scheduler
) {}
event(time: number, stream: ExtensionLifecycle<A>): void {
const segment = new LoadableSegment(this._tail);
segment.run(stream, this._scheduler);
}
end(time: number): void {}
error(time: number, error: Error): void {
this._tail.error(time, error);
}
dispose() {
this._head.sink = this._tail.sink;
this._tail.dispose();
}
}
interface PipelineSegment<A> extends Sink<A> {
previous?: PipelineSegment<A>;
next?: PipelineSegment<A>;
sink: Sink<A>;
active: boolean;
}
class Boundary<A> implements PipelineSegment<A> {
public previous?: PipelineSegment<A>;
public next?: PipelineSegment<A> = void 0;
public sink: Sink<A>;
public readonly active = true;
event(time: number, value: A): void {
this.sink.event(time, value);
}
end(time: number): void {
this.sink.end(time);
}
error(time: number, error: Error): void {
this.sink.error(time, error);
}
dispose() {
let previous = this.previous;
while(previous instanceof LoadableSegment) {
previous.dispose();
previous = previous.previous;
}
}
}
class LoadableSegment<A> implements PipelineSegment<A> {
private _loader: ExtensionLoader<A> = <any>void 0;
private _disposable: Disposable;
private _disposed = false;
public previous: PipelineSegment<A>;
public next: PipelineSegment<A>;
public sink: Sink<A>;
public active = false;
constructor(public tail: Boundary<A>) {
this.previous = tail.previous!;
this.previous.next = this;
tail.previous = this;
this.next = tail;
this.sink = tail;
}
run(stream: ExtensionLifecycle<A>, scheduler: Scheduler): void {
this._loader = new ExtensionLoader<A>(this, scheduler);
this._disposable = stream.run(this._loader, scheduler);
}
private _getPreviousActive() {
let previous = this.previous!;
let segment: PipelineSegment<A> = this;
while(!previous.active) {
segment = previous;
previous = segment.previous!;
}
return previous;
}
load(sink: Sink<A>): void {
this.active = true;
const previous = this._getPreviousActive();
this.sink = previous.sink;
previous.sink = sink;
}
unload(): void {
if(!this.active) return;
this.active = false;
const previous = this._getPreviousActive();
previous.sink = this.sink;
}
event(time: number, value: A): void {
if(this._disposed) return;
this.sink.event(time, value);
}
end(time: number): void {
if(this._disposed) return;
this.sink.end(time);
this.dispose();
}
error(time: number, error: Error): void {
if(this._disposed) return;
this.sink.error(time, error);
this.dispose();
}
dispose() {
if(this._disposed) return;
this._disposed = true;
this.unload();
this._disposable.dispose();
this._loader.dispose();
const previous = this.previous;
const next = this.next;
previous.next = next;
next.previous = previous;
}
}
class ExtensionLoader<A> implements Sink<ExtensionAction<A>>, Disposable {
private _current?: Disposable = void 0;
private _disposed = false;
constructor(
private _segment: LoadableSegment<A>,
private _scheduler: Scheduler
) {}
private _disposeCurrent(): void {
if(this._current !== void 0) {
this._current.dispose();
}
}
private _mount(extend: ActivateExtension<A>): void {
const source = new DisconnectableSource(this._segment);
const stream = extend(source);
const sink = new DisconnectableSink(this._segment);
this._current = disposeAll([source, sink, stream.run(sink, this._scheduler)]);
}
event(time: number, upgrade: ExtensionAction<A>): void {
this._disposeCurrent();
if(upgrade !== null) {
this._mount(upgrade);
}
}
end(time: number): void {
// end signal signifies that this pipeline segment is no longer required
this._segment.dispose();
}
error(time: number, error: Error): void {
this._segment.error(time, error);
}
dispose(): void {
if(!this._disposed) {
this._disposed = true;
this._disposeCurrent();
}
}
}
class DisconnectableSource<A> implements Stream<A>, Disposable {
constructor(private _segment: LoadableSegment<A>) {}
run(sink: Sink<A>, scheduler: Scheduler): Disposable {
this._segment.load(sink);
return disposeOnce(this);
}
dispose() {
this._segment.unload();
}
}
class DisconnectableSink<A> implements Sink<A>, Disposable {
private _segment: LoadableSegment<A>;
private _disposed = false;
constructor(segment: LoadableSegment<A>) {
this._segment = segment;
}
event(time: number, value: A): void {
if(this._disposed) return;
this._segment.event(time, value);
}
end(time: number): void {
if(this._disposed) return;
this._segment.end(time);
}
error(time: number, error: Error): void {
if(this._disposed) return;
this._segment.error(time, error);
}
dispose() {
this._disposed = true;
}
}
import {runEffects, propagateEventTask, propagateEndTask, scan, skip, map, periodic, take, tap} from '@most/core';
import {newDefaultScheduler} from '@most/scheduler';
const periodicCount = (interval: number) => scan(n => n + 1, 0, skip(1, periodic(interval)));
const count$ = take(50, periodicCount(150));
const logNow = (msg: string) => () => console.log(`${msg}: ${(new Date()).toISOString()}`);
const times100 = (stream: Stream<number>) => (console.log('[MULTIPLY x 100]'), map((n: number) => n * 100, stream));
const times1337 = (stream: Stream<number>) => (console.log('[MULTIPLY x 1337]'), map((n: number) => n * 1337, stream));
const sqrt = (stream: Stream<number>) => (console.log('[SQUARE ROOT]'), map((n: number) => Math.sqrt(n), stream));
const subtract1 = (stream: Stream<number>) => (console.log('[SUBTRACT 1]'), map((n: number) => n - 1, stream));
function makeEmitter<A>(lifespan: number, emissions: [number, A][]) {
let disposed = false;
const stream: Stream<A> = {
run(sink: Sink<A>, scheduler: Scheduler): Disposable {
if(disposed) throw new Error();
disposed = true;
emissions.forEach(([delay, f]) => scheduler.delay(delay, propagateEventTask(f, sink)));
scheduler.delay(lifespan, propagateEndTask(sink));
return {dispose() {}};
}
};
return stream;
}
const applyTimes100 = (event$: Stream<number>) => times100(event$);
const applyTimes1337 = (event$: Stream<number>) => times1337(event$);
const applySubtract1 = (event$: Stream<number>) => subtract1(event$);
const applySqrt = (event$: Stream<number>) => sqrt(event$);
const a$ = makeEmitter<ExtensionAction<number>>(3500, [
[750, applyTimes100],
[1750, null],
[2500, applySqrt]
]);
const b$ = makeEmitter<ExtensionAction<number>>(1250, [
[250, applySubtract1],
[750, null]
]);
const c$ = makeEmitter<ExtensionAction<number>>(3750, [
[3000, applyTimes1337]
]);
const d$ = makeEmitter<ExtensionAction<number>>(1000, [
[500, applySqrt]
]);
const extensions$ = makeEmitter(2000, [[750, a$], [750, b$], [1, c$], [5000, d$]]);
const number$ = extensible(count$, extensions$);
logNow('start')();
runEffects(tap(n => console.log(n), number$), newDefaultScheduler()).then(logNow('end')).catch(e => console.error(e));
start: 2017-06-16T02:00:23.937Z
0
1
2
3
4
5
6
[SUBTRACT 1]
6
7
8
[MULTIPLY x 100]
1000
1100
1200
1300
1400
1500
1600
17
18
19
20
[MULTIPLY x 1337]
28077
[SQUARE ROOT]
171.50510196492698
175.3596304740632
179.13123680698462
182.8250529878222
5.0990195135927845
5.196152422706632
5.291502622129181
29
30
31
32
33
34
35
36
[SQUARE ROOT]
6.082762530298219
6.164414002968976
6.244997998398398
6.324555320336759
41
42
43
44
45
46
47
48
49
end: 2017-06-16T02:00:31.291Z
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment