Created
March 5, 2018 17:02
-
-
Save awto/75c1f59047e248f2f461b5d0462e23dd to your computer and use it in GitHub Desktop.
leaking async generator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function subject(...args) { | |
let callback | |
const queue = [] | |
const iter = thread() | |
let running = true | |
return { | |
send(event) { | |
if (callback) | |
callback() | |
queue.push(event) | |
}, | |
return() { | |
running = false | |
if (callback) | |
callback() | |
return iter.return() | |
}, | |
next() { return iter.next() }, | |
[Symbol.asyncIterator]() { return this } | |
} | |
async function* thread() { | |
try { | |
for(;;) { | |
while(queue.length) | |
yield queue.shift() | |
if (!running) | |
break | |
await new Promise(i => callback = i) | |
callback = null | |
} | |
} finally { | |
console.log("exit thread") | |
} | |
} | |
} | |
async function* combine(...args) { | |
const threads = args.map(i => i[Symbol.asyncIterator]()) | |
const sparks = new Set(threads.map(i => ({thread:i,step:i.next()}))) | |
try { | |
while(sparks.size) { | |
const v = await Promise.race([...sparks] | |
.map(i => i.step.then(({done,value}) => ({done,value,spark:i})))) | |
sparks.delete(v.spark) | |
if (!v.done) { | |
sparks.add({...v.spark,step:v.spark.thread.next()}) | |
yield v.value | |
} | |
} | |
} finally { | |
await Promise.all([...threads].map((i) => i.return())) | |
} | |
} | |
async function* copy1(input) { | |
for await(const i of input) | |
yield i | |
} | |
const subj1 = subject() | |
const subj2 = subject() | |
const subj3 = subject() | |
const subj4 = subject() | |
async function test(src1, src2) { | |
let cnt = 0 | |
for await(const i of combine(src1,src2)) { | |
console.log("value:",i) | |
if (cnt++) | |
break | |
} | |
} | |
async function main() { | |
console.log("-- no leaks") | |
await test(subj1,subj2) | |
console.log("-- leaks") | |
await test(subj3,copy1(subj4)) | |
// never shown - awaiting return in combine | |
console.log("done") | |
} | |
main() | |
subj1.send(1) | |
subj2.send(2) | |
subj3.send(3) | |
subj4.send(4) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment