Skip to content

Instantly share code, notes, and snippets.

@hermanbanken
Last active October 4, 2016 06:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hermanbanken/a00898288a223cadb787cef9f9bc3d81 to your computer and use it in GitHub Desktop.
Save hermanbanken/a00898288a223cadb787cef9f9bc3d81 to your computer and use it in GitHub Desktop.
Cycle.JS (RxJS 4.1) combining component collection properties (see https://github.com/cyclejs/cyclejs/issues/312)
import Rx from 'rx';
var Observable = Rx.Observable,
observableProto = Observable.prototype;
export default function(){
observableProto.collection = function(config) {
let root = this.share(), merge = [], latest = [];
if(config.merge) {
merge = Object.keys(config.merge).map(field => {
let mapping = config.merge[field]
if(typeof mapping == 'function') {
return [field, root
.flatMap(item => mapping(item))]
} else if(typeof mapping == 'string') {
return [field, root.flatMap(item => item[mapping])]
} else {
console.warn("unknown collection operation for merge-field", field)
}
})
}
if(config.latest) {
latest = Object.keys(config.latest).map(field => {
let mapping = config.latest[field]
let combined = root
.map((item, index) => {
if(typeof mapping == 'function') {
return [index, mapping(item)]
} else if(typeof mapping == 'string') {
return [index, item[mapping]]
} else {
console.warn("unknown collection operation for latest-field", field)
}
})
.flatMap(([index, obs]) => obs.publish(o => o
.filter(_ => false)
.concat(Observable.of([index, undefined]))
.startWith([index, o]))
)
.scan((memory, [index, observable]) => {
// TODO remove undesired shareReplay:
// the publish above causes the original source not to replay,
// since the subscription is kept open.
memory[index] = observable && observable.shareReplay(1);
return memory;
}, [])
.map(list => list.filter(v => typeof v !== 'undefined'))
.flatMapLatest((list, c) =>
Observable.combineLatest(list, (...args) => args)
)
return [field, combined]
})
}
return merge.concat(latest)
.reduce((obj, [field, obs]) => {
obj[field] = obs;
return obj;
}, {});
}
};
import Rx, { ReactiveTest, TestScheduler } from 'rx'
import { collectionAssert } from './utils'
import addOperator from './rx-collection'
addOperator()
var onNext = ReactiveTest.onNext,
onCompleted = ReactiveTest.onCompleted,
subscribe = ReactiveTest.subscribe
const never = Rx.Observable.never()
describe('component collections', function () {
var scheduler = new TestScheduler();
var components = [{
DOM: Rx.Observable.of("A1", "A2").merge(never),
clicks$: never
},{
DOM: Rx.Observable.of("B1", "B2", "B3").merge(never),
clicks$: Rx.Observable.of(1).delay(2, scheduler).merge(never)
},{
DOM: Rx.Observable.of("C1").merge(never),
clicks$: Rx.Observable.of(4).delay(5, scheduler).merge(never)
}];
const output = () => Rx.Observable
.fromArray(components)
.merge(never)
.collection({
latest: {
DOM: (item) => item.DOM.shareReplay(1),
},
merge: {
clicks$: (item) => item.clicks$.map(item)
}
});
it('should combine latest', function(){
var results = scheduler.startScheduler(
function () {
return output().DOM.debounce(10, scheduler)
}, { created: 0, subscribed: 0, disposed: 400 }
);
collectionAssert.assertEqual(results.messages, [
onNext(11, ["A2", "B3", "C1"])
])
})
it('should merge streams', function(){
var results = scheduler.startScheduler(
function () {
return output().clicks$
}, { created: 500, subscribed: 500, disposed: 800 }
);
collectionAssert.assertEqual(results.messages, [
onNext(502, components[1]),
onNext(505, components[2])
])
})
it('should close', function(){
var scheduler = new TestScheduler();
const nots = Rx.Observable
.merge(
Rx.Observable.of([1,2,3]),
Rx.Observable.of([1,2]).delay(100, scheduler)
)
.merge(never)
const comps = nots.publish(nots => nots
.flatMap(n => n)
.groupByUntil(
d => d,
v => v,
d => nots
.map(list => !list.some(ad => ad === d.key))
.filter(b => b)
)
)
.map(group => ({ DOM: group }))
var output = comps
.collection({
latest: {
DOM: (item) => item.DOM.distinctUntilChanged(),
}
});
var results = scheduler.startScheduler(() => output.DOM, { created: 0, subscribed: 0, disposed: 1000 });
collectionAssert.assertEqual(results.messages, [
onNext(1, [1]),
onNext(1, [1, 2]),
onNext(1, [1, 2, 3]),
onNext(101, [1, 2])
])
})
});
import Rx from 'rx'
import { ok, fail } from 'assert'
function createMessage(expected, actual) {
return 'Expected: [' + expected.toString() + ']\r\nActual: [' + actual.toString() + ']';
}
module.exports = {
collectionAssert: {
assertEqual: function (actual, expected) {
var comparer = Rx.internals.isEqual, isOk = true;
if (expected.length !== actual.length) {
fail('Not equal length. Expected: ' + expected.length + ' Actual: ' + actual.length);
return;
}
for(var i = 0, len = expected.length; i < len; i++) {
isOk = comparer(expected[i], actual[i]);
if (!isOk) {
break;
}
}
ok(isOk, createMessage(expected, actual));
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment