Skip to content

Instantly share code, notes, and snippets.

@zenparsing
Last active February 9, 2016 18:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zenparsing/ee0675e378eff5899fbf to your computer and use it in GitHub Desktop.
Save zenparsing/ee0675e378eff5899fbf to your computer and use it in GitHub Desktop.
Promise-Observable Combinators with Implicit Cancel Tokens
function takeUntil(control) {
return new Observable(next => new Promise((resolve, reject) => {
this.forEach(next).then(resolve, reject);
control.forEach(resolve).catch(reject);
}));
}
function switch() {
return new Observable(next => new Promise((resolve, reject) => {
let innerContext, innerPromise;
this.forEach(inner => {
if (innerContext)
innerContext.requestCancel();
innerContext = new Context(_=> {
innerPromise = inner.forEach(next).catch(reject);
});
}).then(x => innerPromise.then(_=> resolve(x)), reject);
}));
}
function listen(eventName) {
return new Observable(next => {
this.addEventListener(eventName, next);
return function.cancelToken.promise.then(_=> this.removeEventListener(eventName, next));
});
}
function merge() {
return new Observable(next => new Promise((resolve, reject) => {
let innerPromises = [];
this.forEach(inner => {
// TODO: Probably want to remove from _innerPromises_ list after done
innerPromises.push(inner.forEach(next).catch(reject));
}).then(x => Promise.all(innerPromises).then(_=> resolve(x)), reject);
}));
}
function concat() {
return new Observable(next => new Promise((resolve, reject) => {
let innerPromise = Promise.resolve();
this.forEach(inner => {
innerPromise = innerPromise.then(_=> inner.forEach(next).catch(reject));
}).then(x => innerPromise.then(_=> resolve(x)), reject);
}));
}
function exhaust() {
return new Observable(next => new Promise((resolve, reject) => {
let innerPromise = Promise.resolve();
this.forEach(inner => {
if (innerPromise)
return;
innerPromise = inner.forEach(next).then(_=> innerPromise = null, reject);
}).then(x => innerPromise.then(_=> resolve(x)), reject);
}));
}
function pairwise() {
return new Observable(next => {
let none = {}, prev = none;
return this.forEach(x => {
if (prev !== none)
next([prev, x]);
prev = x;
});
});
}
function find(fn) {
return new Observable(next => new Promise((resolve, reject) => {
this.forEach(x => {
if (fn(x))
resolve(x);
}).then(x => resolve(), reject);
}));
}
function findIndex(fn) {
return new Observable(next => new Promise((resolve, reject) => {
let i = -1;
this.forEach(x => {
i++;
if (fn(x))
resolve(i);
}).then(x => resolve(-1), reject);
}));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment