Skip to content

Instantly share code, notes, and snippets.

@thenikso
Last active August 20, 2019 18:55
Show Gist options
  • Save thenikso/1f154dd76d771940f86f6e4a0ac4e1ac to your computer and use it in GitHub Desktop.
Save thenikso/1f154dd76d771940f86f6e4a0ac4e1ac to your computer and use it in GitHub Desktop.
This is how you could have the RxJS observables using async generators and for-await-of
function makeSubject() {
const COMPLETE = {};
let send;
let error;
let deferred;
const subject = async function * () {
while(true) {
const data = await deferred;
if (data === COMPLETE) {
return;
}
yield data;
prepare();
}
};
const prepare = ()=> {
deferred = new Promise((resolve, reject) => {
subject.send = resolve;
subject.error = reject;
subject.complete = () => resolve(COMPLETE);
});
};
prepare();
return subject;
}
function map(f) {
return async function * (source) {
for await (const i of source) {
yield f(i);
}
}
}
function forEach(f) {
return async function (source) {
for await (const i of source) {
f(i);
}
}
}
function pipe(...sources) {
let res = sources[0];
for (let i = 1, n = sources.length; i < n; i++) res = sources[i](res);
return res;
}
// Try it out on any modern browser
const subject = makeSubject();
pipe(
subject(),
map(x => x * 2),
forEach(x => console.log(x)),
)
subject.send(3);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment