Skip to content

Instantly share code, notes, and snippets.

@pstjean
Last active December 19, 2018 21:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pstjean/f43eced4467d56fa1b1d78229e0b472b to your computer and use it in GitHub Desktop.
Save pstjean/f43eced4467d56fa1b1d78229e0b472b to your computer and use it in GitHub Desktop.
RxJS from scratch
// From André Staltz's talk at ng-europe 2016
// https://www.youtube.com/watch?v=uQ1zhJHclvs
function map(transformFn) {
const inputObservable = this; // Object created by createObservable
const outputObservable = createObservable(function subscribe(outputObserver) {
inputObservable.subscribe({ // Give me data from the inputObservable
next: function (x) {
const y = transformFn(x);
outputObserver.next(y);
},
error: e => outputObserver.error(e),
complete: () => outputObserver.complete()
})
})
return outputObservable;
}
function filter(conditionFn) {
const inputObservable = this;
const outputObservable = createObservable(function subscribe(outputObserver) {
inputObservable.subscribe({
next: function (x) {
if(conditionFn(x)) {
outputObserver.next(x);
}
},
error: e => outputObserver.error(e),
complete: () => outputObserver.complete()
})
})
return outputObservable;
}
function delay(period) {
const inputObservable = this;
const outputObservable = createObservable(function subscribe(outputObserver) {
inputObservable.subscribe({
next: function (x) {
setTimeout(function() {
outputObserver.next(x);
}, period);
},
error: e => outputObserver.error(e),
complete: () => outputObserver.complete()
})
})
return outputObservable;
}
function createObservable(subscribe) {
return {
subscribe: subscribe,
map: map,
filter: filter,
delay: delay
}
}
const clickObservable = createObservable(function subscribe(ob) {
document.addEventListener('click', ob.next);
})
const arrayObservable = createObservable(function subscribe(ob) {
[10,20,30].forEach(ob.next);
ob.complete();
})
const observer = {
next: function nextCallback(data) {
console.log(data);
},
error: function errorCallback(err) {
console.error(err);
},
complete: function completeCallback() {
console.log('done');
}
}
clickObservable
.map(ev => ev.clientX)
.filter(x => x < 200)
.delay(2000)
.subscribe(observer);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment