Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Observable to Async Iterator
Observable.prototype[Symbol.asyncIterator] = async function*() {
function promiseCapability() {
x = {};
x.promise = new Promise((a, b) => {
x.resolve = a;
x.reject = b;
});
return x;
}
// This observer will drop values if the consumer is still busy. I think this is the only
// default that makes sense, but other observable combinators can add more specific buffering
// strategies.
let observer = {
_next: promiseCapability(),
_done: false,
next(v) {
this._next.resolve(v);
this._next = promiseCapability();
},
error(e) {
this._next.reject(v);
},
complete(x) {
this._next.resolve(x);
this._done = true;
},
};
this.subscribe(observer);
try {
while (true) {
let value = await observer._next.promise;
if (observer._done)
return value;
yield value;
}
} finally {
subscription.unsubscribe();
}
};
Observable.prototype.toAsyncIterator = async function*(maxSize) {
maxSize = maxSize >>> 0;
if (maxSize < 1)
throw new RangeError("Invalid buffer size");
function promiseCapability() {
x = {};
x.promise = new Promise((a, b) => {
x.resolve = a;
x.reject = b;
});
return x;
}
let observer = {
_buffer: [promiseCapability()],
_done: false,
next(v) {
this._buffer[this._buffer.length - 1].resolve(v);
this._buffer.push(promiseCapability());
if (this._buffer.length > maxSize)
this._buffer.unshift();
},
error(e) {
this._buffer[this._buffer.length - 1].reject(v)
this._done = true;
},
complete(x) {
this._buffer[this._buffer.length - 1].resolve(v)
this._done = true;
},
};
this.subscribe(observer);
try {
while (true) {
let value = await observer._buffer[0].promise
observer._buffer.unshift();
if (observer._done)
return value;
yield value;
}
} finally {
subscription.unsubscribe();
}
};
Observable.prototype[Symbol.asyncIterator] = function() {
return this.toAsyncIterator(1);
};
@mgtitimoli

This comment has been minimized.

Copy link

mgtitimoli commented Nov 21, 2016

Hi @zenparsing,

I was wondering if Observable.prototype[Symbol.asyncIterator] is gonna be added to the proposal, is this the case?

Thanks!

@mgtitimoli

This comment has been minimized.

Copy link

mgtitimoli commented Nov 21, 2016

In case you haven't noticed, there is a missing variable declaration here:
https://gist.github.com/zenparsing/4e65592f184dafc82125#file-observable-to-async-iterator-js-L4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.