Last active
July 16, 2016 07:57
-
-
Save axefrog/03db5387c4487d1b4cae3cd02f0c07e4 to your computer and use it in GitHub Desktop.
A selective, key-based multicast operator for most.js. Some code borrowed/adapted from mostjs/multicast.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export const dispose = disposable => disposable.dispose(); | |
export const emptyDisposable = { | |
dispose() {} | |
}; | |
export class DispatchDisposable | |
{ | |
constructor(source, sink, key) { | |
this.source = source; | |
this.sink = sink; | |
this.key = key; | |
this.disposed = false; | |
} | |
dispose() { | |
if(this.disposed) { | |
return; | |
} | |
this.disposed = true; | |
const remaining = this.source.remove(this.key, this.sink); | |
return remaining === 0 && this.source._dispose(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import DispatchSource from './source'; | |
export default function dispatch(f, store) { | |
return function(stream) { | |
if(stream.source instanceof DispatchSource && stream.source.f === f) { | |
return stream; | |
} | |
const source = new DispatchSource(stream, f, store); | |
const newStream = new stream.constructor(source); | |
newStream.select = key => source.select(key); | |
return newStream; | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Pipe from 'most/lib/sink/Pipe'; | |
import {DispatchDisposable, emptyDisposable, dispose} from './dispose'; | |
import {BasicStore} from './stores'; | |
import {tryEvent, tryEnd} from './try'; | |
const defaultKey = Symbol('DispatchSource'); | |
export default class DispatchSource | |
{ | |
constructor(stream, f, store) { | |
this.stream = stream; | |
this.f = f; | |
this._store = store || new BasicStore(); | |
this._disposable = emptyDisposable; | |
this._boundSelect = key => this.select(key); | |
} | |
_dispose () { | |
const disposable = this._disposable; | |
this._disposable = emptyDisposable; | |
return Promise.resolve(disposable).then(dispose); | |
} | |
run(sink, scheduler) { | |
return this.add(sink, scheduler, defaultKey); | |
} | |
event(t, x) { | |
const key = this.f(x); | |
event(this._store.get(defaultKey), t, wrap(x, this._boundSelect)); | |
event(this._store.get(key), t, x); | |
} | |
end(t, x) { | |
end(this._store[Symbol.iterator](), t, x); | |
} | |
error(t, e) { | |
error(this._store[Symbol.iterator](), t, e); | |
} | |
select(key) { | |
const source = new TargetSource(this, key); | |
return new this.stream.constructor(source); | |
} | |
add(sink, scheduler, key) { | |
if(this._store.add(key, sink)) { | |
this._disposable = this.stream.source.run(this, scheduler); | |
} | |
return new DispatchDisposable(this, sink, key); | |
} | |
remove(key, sink) { | |
this._store.remove(key, sink); | |
} | |
} | |
function wrap(event, select) { | |
return [event, select]; | |
} | |
function event(it, t, x) { | |
let i = 0; | |
for(let sink of it) { | |
i++; | |
tryEvent(t, x, sink); | |
} | |
} | |
function end(it, t, x) { | |
let i = 0; | |
for(let sink of it) { | |
i++; | |
tryEnd(t, x, sink); | |
} | |
} | |
function error(it, t, e) { | |
for(let sink of it) { | |
sink.error(t, e); | |
} | |
} | |
class TargetSource | |
{ | |
constructor(source, key) { | |
this.key = key; | |
this.source = source; | |
} | |
run(sink, scheduler) { | |
return this.source.add(new Pipe(sink), scheduler, this.key); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Pipe from 'most/lib/sink/Pipe'; | |
import {DispatchDisposable, emptyDisposable, dispose} from './dispose'; | |
import {BasicStore} from './stores'; | |
import {tryEvent, tryEnd} from './try'; | |
const defaultKey = Symbol('DispatchSource'); | |
export default class DispatchSource | |
{ | |
constructor(stream, f, store) { | |
this.stream = stream; | |
this.f = f; | |
this._store = store || new BasicStore(); | |
this._disposable = emptyDisposable; | |
this._boundSelect = key => this.select(key); | |
} | |
_dispose () { | |
const disposable = this._disposable; | |
this._disposable = emptyDisposable; | |
return Promise.resolve(disposable).then(dispose); | |
} | |
run(sink, scheduler) { | |
return this.add(sink, scheduler, defaultKey); | |
} | |
event(t, x) { | |
const key = this.f(x); | |
event(this._store.get(defaultKey), t, wrap(x, this._boundSelect)); | |
event(this._store.get(key), t, x); | |
} | |
end(t, x) { | |
end(this._store[Symbol.iterator](), t, x); | |
} | |
error(t, e) { | |
error(this._store[Symbol.iterator](), t, e); | |
} | |
select(key) { | |
const source = new TargetSource(this, key); | |
return new this.stream.constructor(source); | |
} | |
add(sink, scheduler, key) { | |
if(this._store.add(key, sink)) { | |
this._disposable = this.stream.source.run(this, scheduler); | |
} | |
return new DispatchDisposable(this, sink, key); | |
} | |
remove(key, sink) { | |
this._store.remove(key, sink); | |
} | |
} | |
function wrap(event, select) { | |
return [event, select]; | |
} | |
function event(it, t, x) { | |
let i = 0; | |
for(let sink of it) { | |
i++; | |
tryEvent(t, x, sink); | |
} | |
} | |
function end(it, t, x) { | |
let i = 0; | |
for(let sink of it) { | |
i++; | |
tryEnd(t, x, sink); | |
} | |
} | |
function error(it, t, e) { | |
for(let sink of it) { | |
sink.error(t, e); | |
} | |
} | |
class TargetSource | |
{ | |
constructor(source, key) { | |
this.key = key; | |
this.source = source; | |
} | |
run(sink, scheduler) { | |
return this.source.add(new Pipe(sink), scheduler, this.key); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export function tryEvent (t, x, sink) { | |
try { | |
sink.event(t, x); | |
} | |
catch (e) { | |
sink.error(t, e); | |
} | |
} | |
export function tryEnd (t, x, sink) { | |
try { | |
sink.end(t, x); | |
} | |
catch (e) { | |
sink.error(t, e); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment