Skip to content

Instantly share code, notes, and snippets.

@mariusGundersen
Created December 27, 2017 13:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mariusGundersen/f96b5360978ce1fafa8a73307043d5eb to your computer and use it in GitHub Desktop.
Save mariusGundersen/f96b5360978ce1fafa8a73307043d5eb to your computer and use it in GitHub Desktop.
Async iterator stream
async function run(){
console.log('start');
const result = await source()
::map(t => t*2)
::take(30)
::buffer(10)
::map(t => Promise.delay(Math.random()*10, t))
::filter(t => t%4 == 0)
::reduce((a, b) => a+b, 0);
console.log('done', result);
}
run()
Promise.defer = function(){
let resolve, reject;
return {
promise: new Promise((res, rej) => {resolve = res; reject = rej;}),
resolve,
reject
}
}
Promise.delay = function(time, value){
return new Promise(res => setTimeout(() => res(value), time));
}
async function* source(){
let n = 1;
while(true){
yield n++;
}
}
async function* map(func){
for await(const chunk of this){
yield await func(chunk);
}
}
async function* flatMap(func){
for await(const chunk of this){
yield* await func(chunk);
}
}
async function* take(count){
for await(const chunk of this){
if(count-- <= 0) break;
yield chunk;
}
}
async function* filter(func){
for await(const chunk of this){
if(await func(chunk)) yield chunk;
}
}
async function reduce(func, initial){
for await(const chunk of this){
initial = func(initial, chunk);
}
return initial;
}
async function* buffer(size=100){
let producerSleep = Promise.defer();
let consumerSleep = Promise.defer();
const buffer = [];
async function consumer(iterator){
for await(const chunk of iterator){
buffer.push(chunk);
if(buffer.length >= size){
producerSleep = Promise.defer();
await producerSleep.promise;
}else if(buffer.length === 1){
consumerSleep.resolve(false);
}
}
return true;
}
const done = consumer(this);
while(true){
if(buffer.length === 0){
consumerSleep = Promise.defer();
if(await Promise.race([consumerSleep.promise, done])) break;
}
const value = buffer.shift();
if(buffer.length === size-1){
producerSleep.resolve();
}
yield value;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment