Skip to content

Instantly share code, notes, and snippets.

@axefrog
Last active July 16, 2016 07:57
Show Gist options
  • Save axefrog/03db5387c4487d1b4cae3cd02f0c07e4 to your computer and use it in GitHub Desktop.
Save axefrog/03db5387c4487d1b4cae3cd02f0c07e4 to your computer and use it in GitHub Desktop.
A selective, key-based multicast operator for most.js. Some code borrowed/adapted from mostjs/multicast.
export const dispose = disposable => disposable.dispose();
export const emptyDisposable = {
dispose() {}
};
export class DispatchDisposable
{
constructor(source, sink, key) {
this.source = source;
this.sink = sink;
this.key = key;
this.disposed = false;
}
dispose() {
if(this.disposed) {
return;
}
this.disposed = true;
const remaining = this.source.remove(this.key, this.sink);
return remaining === 0 && this.source._dispose();
}
}
import DispatchSource from './source';
export default function dispatch(f, store) {
return function(stream) {
if(stream.source instanceof DispatchSource && stream.source.f === f) {
return stream;
}
const source = new DispatchSource(stream, f, store);
const newStream = new stream.constructor(source);
newStream.select = key => source.select(key);
return newStream;
};
}
import Pipe from 'most/lib/sink/Pipe';
import {DispatchDisposable, emptyDisposable, dispose} from './dispose';
import {BasicStore} from './stores';
import {tryEvent, tryEnd} from './try';
const defaultKey = Symbol('DispatchSource');
export default class DispatchSource
{
constructor(stream, f, store) {
this.stream = stream;
this.f = f;
this._store = store || new BasicStore();
this._disposable = emptyDisposable;
this._boundSelect = key => this.select(key);
}
_dispose () {
const disposable = this._disposable;
this._disposable = emptyDisposable;
return Promise.resolve(disposable).then(dispose);
}
run(sink, scheduler) {
return this.add(sink, scheduler, defaultKey);
}
event(t, x) {
const key = this.f(x);
event(this._store.get(defaultKey), t, wrap(x, this._boundSelect));
event(this._store.get(key), t, x);
}
end(t, x) {
end(this._store[Symbol.iterator](), t, x);
}
error(t, e) {
error(this._store[Symbol.iterator](), t, e);
}
select(key) {
const source = new TargetSource(this, key);
return new this.stream.constructor(source);
}
add(sink, scheduler, key) {
if(this._store.add(key, sink)) {
this._disposable = this.stream.source.run(this, scheduler);
}
return new DispatchDisposable(this, sink, key);
}
remove(key, sink) {
this._store.remove(key, sink);
}
}
function wrap(event, select) {
return [event, select];
}
function event(it, t, x) {
let i = 0;
for(let sink of it) {
i++;
tryEvent(t, x, sink);
}
}
function end(it, t, x) {
let i = 0;
for(let sink of it) {
i++;
tryEnd(t, x, sink);
}
}
function error(it, t, e) {
for(let sink of it) {
sink.error(t, e);
}
}
class TargetSource
{
constructor(source, key) {
this.key = key;
this.source = source;
}
run(sink, scheduler) {
return this.source.add(new Pipe(sink), scheduler, this.key);
}
}
import Pipe from 'most/lib/sink/Pipe';
import {DispatchDisposable, emptyDisposable, dispose} from './dispose';
import {BasicStore} from './stores';
import {tryEvent, tryEnd} from './try';
const defaultKey = Symbol('DispatchSource');
export default class DispatchSource
{
constructor(stream, f, store) {
this.stream = stream;
this.f = f;
this._store = store || new BasicStore();
this._disposable = emptyDisposable;
this._boundSelect = key => this.select(key);
}
_dispose () {
const disposable = this._disposable;
this._disposable = emptyDisposable;
return Promise.resolve(disposable).then(dispose);
}
run(sink, scheduler) {
return this.add(sink, scheduler, defaultKey);
}
event(t, x) {
const key = this.f(x);
event(this._store.get(defaultKey), t, wrap(x, this._boundSelect));
event(this._store.get(key), t, x);
}
end(t, x) {
end(this._store[Symbol.iterator](), t, x);
}
error(t, e) {
error(this._store[Symbol.iterator](), t, e);
}
select(key) {
const source = new TargetSource(this, key);
return new this.stream.constructor(source);
}
add(sink, scheduler, key) {
if(this._store.add(key, sink)) {
this._disposable = this.stream.source.run(this, scheduler);
}
return new DispatchDisposable(this, sink, key);
}
remove(key, sink) {
this._store.remove(key, sink);
}
}
function wrap(event, select) {
return [event, select];
}
function event(it, t, x) {
let i = 0;
for(let sink of it) {
i++;
tryEvent(t, x, sink);
}
}
function end(it, t, x) {
let i = 0;
for(let sink of it) {
i++;
tryEnd(t, x, sink);
}
}
function error(it, t, e) {
for(let sink of it) {
sink.error(t, e);
}
}
class TargetSource
{
constructor(source, key) {
this.key = key;
this.source = source;
}
run(sink, scheduler) {
return this.source.add(new Pipe(sink), scheduler, this.key);
}
}
export function tryEvent (t, x, sink) {
try {
sink.event(t, x);
}
catch (e) {
sink.error(t, e);
}
}
export function tryEnd (t, x, sink) {
try {
sink.end(t, x);
}
catch (e) {
sink.error(t, e);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment