Created
December 27, 2017 13:59
-
-
Save mariusGundersen/f96b5360978ce1fafa8a73307043d5eb to your computer and use it in GitHub Desktop.
Async iterator stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async function run(){ | |
console.log('start'); | |
const result = await source() | |
::map(t => t*2) | |
::take(30) | |
::buffer(10) | |
::map(t => Promise.delay(Math.random()*10, t)) | |
::filter(t => t%4 == 0) | |
::reduce((a, b) => a+b, 0); | |
console.log('done', result); | |
} | |
run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Promise.defer = function(){ | |
let resolve, reject; | |
return { | |
promise: new Promise((res, rej) => {resolve = res; reject = rej;}), | |
resolve, | |
reject | |
} | |
} | |
Promise.delay = function(time, value){ | |
return new Promise(res => setTimeout(() => res(value), time)); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async function* source(){ | |
let n = 1; | |
while(true){ | |
yield n++; | |
} | |
} | |
async function* map(func){ | |
for await(const chunk of this){ | |
yield await func(chunk); | |
} | |
} | |
async function* flatMap(func){ | |
for await(const chunk of this){ | |
yield* await func(chunk); | |
} | |
} | |
async function* take(count){ | |
for await(const chunk of this){ | |
if(count-- <= 0) break; | |
yield chunk; | |
} | |
} | |
async function* filter(func){ | |
for await(const chunk of this){ | |
if(await func(chunk)) yield chunk; | |
} | |
} | |
async function reduce(func, initial){ | |
for await(const chunk of this){ | |
initial = func(initial, chunk); | |
} | |
return initial; | |
} | |
async function* buffer(size=100){ | |
let producerSleep = Promise.defer(); | |
let consumerSleep = Promise.defer(); | |
const buffer = []; | |
async function consumer(iterator){ | |
for await(const chunk of iterator){ | |
buffer.push(chunk); | |
if(buffer.length >= size){ | |
producerSleep = Promise.defer(); | |
await producerSleep.promise; | |
}else if(buffer.length === 1){ | |
consumerSleep.resolve(false); | |
} | |
} | |
return true; | |
} | |
const done = consumer(this); | |
while(true){ | |
if(buffer.length === 0){ | |
consumerSleep = Promise.defer(); | |
if(await Promise.race([consumerSleep.promise, done])) break; | |
} | |
const value = buffer.shift(); | |
if(buffer.length === size-1){ | |
producerSleep.resolve(); | |
} | |
yield value; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment