Created
April 24, 2017 08:45
-
-
Save mikimaine/fd5b9636858545cf3817ee957803188f to your computer and use it in GitHub Desktop.
sample observable pattern from RxJs talk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
* @param transformFn | |
* @returns {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}} | |
*/ | |
function map(transformFn){ | |
const inputObservable = this; | |
const outputObservable = createObservable(function subscribe(outputObserver) { | |
inputObservable.subscribe({ | |
next: function (x) { | |
const y = transformFn(x); | |
outputObserver.next(y); | |
}, | |
error: e => outputObserver.error(e), | |
complete: () => outputObserver.complete(), | |
}); | |
}); | |
return outputObservable; | |
} | |
/** | |
* | |
* @param conditionFn | |
* @returns {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}} | |
*/ | |
function filter(conditionFn){ | |
const inputObservable = this; | |
const outputObservable = createObservable(function subscribe(outputObserver) { | |
inputObservable.subscribe({ | |
next: function (x) { | |
if (conditionFn(x)){ | |
outputObserver.next(x); | |
} | |
}, | |
error: e => outputObserver.error(e), | |
complete: () => outputObserver.complete(), | |
}); | |
}); | |
return outputObservable; | |
} | |
/** | |
* | |
* @param period | |
* @returns {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}} | |
*/ | |
function delay(period){ | |
const inputObservable = this; | |
const outputObservable = createObservable(function subscribe(outputObserver) { | |
inputObservable.subscribe({ | |
next: function (x) { | |
setTimeout(function () { | |
outputObservable.next(x); | |
},period); | |
}, | |
error: e => outputObserver.error(e), | |
complete: () => outputObserver.complete(), | |
}); | |
}); | |
return outputObservable; | |
} | |
/** | |
* @TODO implement reducer on array | |
* @param reduceFn | |
* @returns {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}} | |
*/ | |
function reduce(reduceFn){ | |
const inputObservable = this; | |
const outputObservable = createObservable(function subscribe(outputObserver) { | |
inputObservable.subscribe({ | |
next: function (x) { | |
// TODO | |
}, | |
error: e => outputObserver.error(e), | |
complete: () => outputObserver.complete(), | |
}); | |
}); | |
return outputObservable; | |
} | |
/** | |
* | |
* @param subscribe | |
* @returns {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}} | |
*/ | |
function createObservable(subscribe){ | |
return { | |
subscribe,subscribe, | |
map,map, | |
filter,filter, | |
delay,delay, | |
reduce,reduce, | |
} | |
} | |
/** | |
* | |
* @type {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}} | |
*/ | |
const clickObservable = createObservable(function subscribe(ob) { | |
document.addEventListener('click',ob.next); | |
}); | |
/** | |
* | |
* @type {{subscribe: *, subscribe: *, map: map, map: map, filter: filter, filter: filter, delay: delay, delay: delay, reduce: reduce, reduce: reduce}} | |
*/ | |
const arrayObservable = createObservable( function subscribe(ob) { | |
[10,20,30].forEach(ob.next); | |
ob.complete(); | |
}); | |
/** | |
* | |
* @type {{next: observer.nextCallback, error: observer.errorCallback, complete: observer.completeCallback}} | |
*/ | |
const observer = { | |
next: function nextCallback(data) { | |
console.log(data); | |
}, | |
error: function errorCallback(err) { | |
console.error(err); | |
}, | |
complete: function completeCallback() { | |
console.log('done!') | |
} | |
}; | |
/** | |
* this example execute arrayObservable and apply common function(observers) | |
*/ | |
arrayObservable | |
.map(x => x/10) | |
.filter(x => x !== 2) | |
.subscribe(observer); | |
/** | |
* this example is how it looks when observables implement reduce | |
*/ | |
//valueObservable | |
// .reduce((x, z) => x + z ) | |
// .subscribe(observer); | |
/** | |
* this example is observable for click event | |
*/ | |
// clickObservable | |
// .map(ev => ev.clientX) | |
// .filter(x => x < 200) | |
// .delay(2000) | |
// .subscribe(observer); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment