Skip to content

Instantly share code, notes, and snippets.

@mikimaine
Created April 24, 2017 08:45
Show Gist options
  • Save mikimaine/fd5b9636858545cf3817ee957803188f to your computer and use it in GitHub Desktop.
Save mikimaine/fd5b9636858545cf3817ee957803188f to your computer and use it in GitHub Desktop.
sample observable pattern from RxJs talk
/**
*
* @param transformFn
* @returns {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}}
*/
function map(transformFn){
const inputObservable = this;
const outputObservable = createObservable(function subscribe(outputObserver) {
inputObservable.subscribe({
next: function (x) {
const y = transformFn(x);
outputObserver.next(y);
},
error: e => outputObserver.error(e),
complete: () => outputObserver.complete(),
});
});
return outputObservable;
}
/**
*
* @param conditionFn
* @returns {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}}
*/
function filter(conditionFn){
const inputObservable = this;
const outputObservable = createObservable(function subscribe(outputObserver) {
inputObservable.subscribe({
next: function (x) {
if (conditionFn(x)){
outputObserver.next(x);
}
},
error: e => outputObserver.error(e),
complete: () => outputObserver.complete(),
});
});
return outputObservable;
}
/**
*
* @param period
* @returns {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}}
*/
function delay(period){
const inputObservable = this;
const outputObservable = createObservable(function subscribe(outputObserver) {
inputObservable.subscribe({
next: function (x) {
setTimeout(function () {
outputObservable.next(x);
},period);
},
error: e => outputObserver.error(e),
complete: () => outputObserver.complete(),
});
});
return outputObservable;
}
/**
* @TODO implement reducer on array
* @param reduceFn
* @returns {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}}
*/
function reduce(reduceFn){
const inputObservable = this;
const outputObservable = createObservable(function subscribe(outputObserver) {
inputObservable.subscribe({
next: function (x) {
// TODO
},
error: e => outputObserver.error(e),
complete: () => outputObserver.complete(),
});
});
return outputObservable;
}
/**
*
* @param subscribe
* @returns {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}}
*/
function createObservable(subscribe){
return {
subscribe,subscribe,
map,map,
filter,filter,
delay,delay,
reduce,reduce,
}
}
/**
*
* @type {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}}
*/
const clickObservable = createObservable(function subscribe(ob) {
document.addEventListener('click',ob.next);
});
/**
*
* @type {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}}
*/
const arrayObservable = createObservable( function subscribe(ob) {
[10,20,30].forEach(ob.next);
ob.complete();
});
/**
*
* @type {{next: observer.nextCallback, error: observer.errorCallback, complete: observer.completeCallback}}
*/
const observer = {
next: function nextCallback(data) {
console.log(data);
},
error: function errorCallback(err) {
console.error(err);
},
complete: function completeCallback() {
console.log('done!')
}
};
/**
* this example execute arrayObservable and apply common function(observers)
*/
arrayObservable
.map(x => x/10)
.filter(x => x !== 2)
.subscribe(observer);
/**
* this example is how it looks when observables implement reduce
*/
//valueObservable
// .reduce((x, z) => x + z )
// .subscribe(observer);
/**
* this example is observable for click event
*/
// clickObservable
// .map(ev => ev.clientX)
// .filter(x => x < 200)
// .delay(2000)
// .subscribe(observer);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment