Skip to content

Instantly share code, notes, and snippets.

@axefrog
Last active January 17, 2017 04:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save axefrog/3f3b7204a68527ed955ac97ed1a8614a to your computer and use it in GitHub Desktop.
Save axefrog/3f3b7204a68527ed955ac97ed1a8614a to your computer and use it in GitHub Desktop.
A stream-flavoured critical section implementation for isolating access to a stream subgraph
/*
# CriticalSection
"A stream-flavoured critical section implementation for isolating access to a stream subgraph"
Events entering a stream graph fenced off by a critical section will cause all other entry points to switch into
blocking mode, with events received at a given entry point being retained in that entry point's internal buffer,
and the entry point buffer enqueued for subsequent processing. Each time an event flows through an exit point, the
next entry point in the queue will be dequeued and its next buffered event emitted into the inner stream graph.
When no data is buffered at any entry point, the critical section unblocks, allowing direct dispatch of the next
future data item to arrive, at which point it will switch back into blocking mode. Errors ignore critical section
fences by default, but this behaviour can be disabled using the second parameter of the CriticalSection.enter()
function.
Example:
function isolate<A, B>(...streams: Stream<A>[]): Stream<B> {
const cs = new CriticalSection();
const result$ = combineArray<B>(doWork<A, B>, streams.map(s => cs.enter(s)));
return cs.exit(result$);
}
My personal use case:
I have a graph of actor-like nodes, each of which have a dynamically-reconfigurable internal pipeline that is
triggered through any one of at least five different input streams, each addressing a different concern. When
an inbound event is received on a given stream, the pipeline must be allowed to conclude before the next input
can be dealt with safely. There are multiple possible exit conditions, and those exit conditions can not only
be triggered asynchronously, but the rest of the internal pipeline graph can itself be reconfigured dynamically
when new configuration is received via one of the input streams. Additionally, the processing of one pipeline
will often need to be suspended pending the outcome of a descendant node. A new stream could hypothetically be
built as needed, depending on input data, and then managed via `mergeConcurrently(1)`, but that would introduce
an extra layer of complex logic into the mix, and cause redundancies that would damage the performance
characteristics that are required for the overall process.
Risks:
A dynamic inner graph could give rise to instances of the combine problem, or of unexpected filtering preventing
an exit node from ever being dispatched through. These problems can be mitigated by assuming that they will occur
and setting up the static portion of the fenced-off stream graph so that inbound stream segments (received through
high-order streams) that could cause a problem are monitored correctly and dealt with as needed if misbehaving.
If an inbound event is received that is known to only be possible when the inner stream graph is idle, or which has
characteristics that should cause the fenced-off section to unblock, it could make use of the exit() method of the
active CriticalSection instance as a backup. Another approach would be to have the fenced-off stream graph expose a
status stream that can be monitored by an external node.
Potential Improvements:
The CriticalSection class could be changed to behave more like a semaphore as a way to manage flow control. Inbound
events would increase a counter, and exit points would decrease the counter. When a configurable threshold is met,
entry points would switch to blocking mode until the counter is decreased. Another configuration option could allow
buffering characteristics to be controlled, such as how many items to retain in a buffer, and whether excess events
should be discarded, or unprocessed events removed from the front of the buffer. Another option could include a time
window after which a blocked section would automatically unblock.
*/
import {Source, Sink, Scheduler} from 'most';
import {Disposable} from 'most/src/disposable/dispose';
interface StreamType<T> {
source: Source<T>;
}
export class CriticalSection {
private _dispatcher = new Dispatcher();
enter<T extends StreamType<any>>(stream: T, blockErrors = false): T {
const source = new EntrySource<T>(this._dispatcher, blockErrors);
return new (<any>stream.constructor)(source);
}
exit<T extends StreamType<any>>(stream: T): T {
const source = new ExitSource<T>(this._dispatcher);
return new (<any>stream.constructor)(source);
}
}
class Dispatcher {
private current: EntrySink<any>|undefined = void 0;
private loaded = false;
private queue: EntrySink<any>[] = [];
enter(entry: EntrySink<any>): boolean {
if(this.current === void 0) {
this.current = entry;
return true;
}
this.queue.push(entry);
return false;
}
cancel(entry: EntrySink<any>): void {
if(this.current === entry) {
if(this.reload()) {
this.trigger();
}
}
}
reload(): boolean {
if(this.current === void 0) {
return false;
}
if(this.loaded) {
return false; // only the call site that performed the reload may trigger the next emission
}
do {
if(this.queue.length === 0) {
this.current = void 0;
return false;
}
var entry = <EntrySink<any>>this.queue.shift();
} while(entry.disposed);
this.current = entry;
this.loaded = true;
return true;
}
trigger(): void {
if(this.loaded) {
this.loaded = false;
(<EntrySink<any>>this.current).dispatch();
}
}
}
class EntrySource<T> implements Source<T> {
constructor(
private dispatcher: Dispatcher,
private blockErrors: boolean
) {}
run(sink: Sink<T>, scheduler: Scheduler): Disposable<any> {
return new Disposable<T>(new EntrySink<T>(sink, this.dispatcher, this.blockErrors), void 0);
}
}
interface QueuedEvent {
type: 'event'|'end'|'error';
t: number;
x: any;
}
class EntrySink<T> implements Sink<T> {
public disposed = false;
private queue: QueuedEvent[] = [];
constructor(
private sink: Sink<T>,
private dispatcher: Dispatcher,
private blockErrors: boolean
) {}
dispatch(): void {
var event = <QueuedEvent>this.queue.pop();
switch(event.type) {
case 'event':
this.sink.event(event.t, event.x);
break;
case 'end':
this.sink.end(event.t, event.x);
break;
case 'error':
this.sink.error(event.t, event.x);
break;
}
}
event(t: number, x: any): void {
if(this.disposed) return;
if(this.dispatcher.enter(this)) {
this.sink.event(t, x);
}
else {
this.queue.push({type: 'event', t, x});
}
}
end(t: number, x: any): void {
if(this.disposed) return;
if(this.dispatcher.enter(this)) {
this.sink.event(t, x);
}
else {
this.queue.push({type: 'end', t, x});
}
}
error(t: number, e: Error): void {
if(this.disposed) return;
if(this.blockErrors && !this.dispatcher.enter(this)) {
this.queue.push({type: 'error', t, x: e});
}
else {
this.sink.error(t, e);
}
}
dispose() {
this.disposed = true;
this.dispatcher.cancel(this);
}
}
class ExitSource<T> implements Source<T> {
constructor(private dispatcher: Dispatcher) {}
run(sink: Sink<T>, scheduler: Scheduler): Disposable<any> {
return new Disposable<T>(new ExitSink<T>(sink, this.dispatcher), void 0);
}
}
class ExitSink<T> implements Sink<T> {
public disposed = false;
constructor(
private sink: Sink<T>,
private dispatcher: Dispatcher
) {}
event(t: number, x: any): void {
if(this.disposed) return;
const reloaded = this.dispatcher.reload();
this.sink.event(t, x);
if(reloaded) this.dispatcher.trigger();
}
end(t: number, x: any): void {
if(this.disposed) return;
const reloaded = this.dispatcher.reload();
this.sink.end(t, x);
if(reloaded) this.dispatcher.trigger();
}
error(t: number, e: Error): void {
if(this.disposed) return;
const reloaded = this.dispatcher.reload();
this.sink.error(t, e);
if(reloaded) this.dispatcher.trigger();
}
dispose() {
this.disposed = true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment