Skip to content

Instantly share code, notes, and snippets.

@zenparsing
Last active July 7, 2022 12:43
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zenparsing/4e65592f184dafc82125 to your computer and use it in GitHub Desktop.
Save zenparsing/4e65592f184dafc82125 to your computer and use it in GitHub Desktop.
Observable to Async Iterator
Observable.prototype[Symbol.asyncIterator] = async function*() {
function promiseCapability() {
x = {};
x.promise = new Promise((a, b) => {
x.resolve = a;
x.reject = b;
});
return x;
}
// This observer will drop values if the consumer is still busy. I think this is the only
// default that makes sense, but other observable combinators can add more specific buffering
// strategies.
let observer = {
_next: promiseCapability(),
_done: false,
next(v) {
this._next.resolve(v);
this._next = promiseCapability();
},
error(e) {
this._next.reject(v);
},
complete(x) {
this._next.resolve(x);
this._done = true;
},
};
this.subscribe(observer);
try {
while (true) {
let value = await observer._next.promise;
if (observer._done)
return value;
yield value;
}
} finally {
subscription.unsubscribe();
}
};
Observable.prototype.toAsyncIterator = async function*(maxSize) {
maxSize = maxSize >>> 0;
if (maxSize < 1)
throw new RangeError("Invalid buffer size");
function promiseCapability() {
x = {};
x.promise = new Promise((a, b) => {
x.resolve = a;
x.reject = b;
});
return x;
}
let observer = {
_buffer: [promiseCapability()],
_done: false,
next(v) {
this._buffer[this._buffer.length - 1].resolve(v);
this._buffer.push(promiseCapability());
if (this._buffer.length > maxSize)
this._buffer.unshift();
},
error(e) {
this._buffer[this._buffer.length - 1].reject(v)
this._done = true;
},
complete(x) {
this._buffer[this._buffer.length - 1].resolve(v)
this._done = true;
},
};
this.subscribe(observer);
try {
while (true) {
let value = await observer._buffer[0].promise
observer._buffer.unshift();
if (observer._done)
return value;
yield value;
}
} finally {
subscription.unsubscribe();
}
};
Observable.prototype[Symbol.asyncIterator] = function() {
return this.toAsyncIterator(1);
};
@mgtitimoli
Copy link

Hi @zenparsing,

I was wondering if Observable.prototype[Symbol.asyncIterator] is gonna be added to the proposal, is this the case?

Thanks!

@mgtitimoli
Copy link

In case you haven't noticed, there is a missing variable declaration here:
https://gist.github.com/zenparsing/4e65592f184dafc82125#file-observable-to-async-iterator-js-L4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment