Last active
April 30, 2019 18:09
-
-
Save krisselden/61a77d5140152f964ce7f3d8f12cb87b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { EventEmitter } = require("events"); | |
// Imagine your reading a stream of events from a socket | |
// and a data chunk event contains more than one event | |
// how do you adapt that one event to multiple events | |
// in a way that will prevent race conditions in async/await | |
// code? | |
// | |
// For example say you are waiting for a frame loaded event, then waiting | |
// loaded event of a child of that frame conditionally based on the | |
// prior event and these were both parsed during the same data event. | |
runTests({ | |
// definitely does not work | |
synchronous: emit => emit(), | |
// all next ticks are flushed | |
nextTick: process.nextTick, | |
// does not work because when flushing the next tick queue | |
// new nextTicks are appended to the end of the current flush | |
nextTickChained: chained(process.nextTick), | |
// works in node 11 because only one task is pulled | |
// per "event frame" from the immediate queue | |
setImmediate, | |
// works in node 8-11 because during the flush of the immediate queue | |
// calls to setImmediate enqueue in a new queue and not | |
// the currently flushing queue (node 11 flushes each immediate | |
// like a top level event). | |
setImmediateChained: chained(setImmediate), | |
}); | |
function chained(enqueue) { | |
let scheduled = false; | |
let queue = []; | |
return emit => { | |
queue.push(emit); | |
scheduleNext(); | |
}; | |
function next() { | |
scheduled = false; | |
let task = queue.shift(); | |
scheduleNext(); | |
task(); | |
} | |
function scheduleNext() { | |
if (scheduled) { | |
return; | |
} | |
if (queue.length === 0) { | |
return; | |
} | |
scheduled = true; | |
enqueue(next); | |
} | |
} | |
async function runTests(testCases) { | |
for (const testName of Object.keys(testCases)) { | |
console.log(`${testName}: begin test`); | |
await runTest(testName, testCases[testName]); | |
console.log(`${testName}: end test`); | |
} | |
} | |
function runTest(testName, scheduleEmit) { | |
return runWithTimeout(async raceTimeout => { | |
try { | |
const evented = new EventEmitter(); | |
const events = ["A", "B"]; | |
for (const event of events) { | |
emit(testName, scheduleEmit, evented, event); | |
} | |
for (const event of events) { | |
await until(testName, evented, event, raceTimeout); | |
} | |
} catch (err) { | |
console.log(`${testName}: ${err}`); | |
} | |
}, 1000); | |
} | |
function emit(testName, schedule, evented, event) { | |
console.log(`${testName}: enqueue event ${event}`); | |
schedule(() => { | |
console.log(`${testName}: emit event ${event}`); | |
evented.emit(event); | |
}); | |
} | |
async function until(testName, evented, event, raceTimeout) { | |
console.log(`${testName}: await Promise of event ${event}`); | |
await raceTimeout( | |
() => | |
new Promise(resolve => { | |
console.log(`${testName}: Promise subscribing to event ${event}`); | |
evented.once(event, () => { | |
console.log(`${testName}: resolving Promise of event ${event}`); | |
resolve(); | |
}); | |
}), | |
); | |
} | |
async function runWithTimeout(task, ms) { | |
let id; | |
let promise; | |
let start = () => { | |
if (promise === undefined) { | |
promise = new Promise(resolve => { | |
id = setTimeout(resolve, ms); | |
}).then(() => { | |
throw new Error("timed out"); | |
}); | |
} | |
return promise; | |
}; | |
try { | |
await task(subtask => Promise.race([subtask(), start()])); | |
} finally { | |
clearTimeout(id); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment