Skip to content

Instantly share code, notes, and snippets.

@krisselden
Last active April 30, 2019 18:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krisselden/61a77d5140152f964ce7f3d8f12cb87b to your computer and use it in GitHub Desktop.
Save krisselden/61a77d5140152f964ce7f3d8f12cb87b to your computer and use it in GitHub Desktop.
const { EventEmitter } = require("events");
// Imagine your reading a stream of events from a socket
// and a data chunk event contains more than one event
// how do you adapt that one event to multiple events
// in a way that will prevent race conditions in async/await
// code?
//
// For example say you are waiting for a frame loaded event, then waiting
// loaded event of a child of that frame conditionally based on the
// prior event and these were both parsed during the same data event.
runTests({
// definitely does not work
synchronous: emit => emit(),
// all next ticks are flushed
nextTick: process.nextTick,
// does not work because when flushing the next tick queue
// new nextTicks are appended to the end of the current flush
nextTickChained: chained(process.nextTick),
// works in node 11 because only one task is pulled
// per "event frame" from the immediate queue
setImmediate,
// works in node 8-11 because during the flush of the immediate queue
// calls to setImmediate enqueue in a new queue and not
// the currently flushing queue (node 11 flushes each immediate
// like a top level event).
setImmediateChained: chained(setImmediate),
});
function chained(enqueue) {
let scheduled = false;
let queue = [];
return emit => {
queue.push(emit);
scheduleNext();
};
function next() {
scheduled = false;
let task = queue.shift();
scheduleNext();
task();
}
function scheduleNext() {
if (scheduled) {
return;
}
if (queue.length === 0) {
return;
}
scheduled = true;
enqueue(next);
}
}
async function runTests(testCases) {
for (const testName of Object.keys(testCases)) {
console.log(`${testName}: begin test`);
await runTest(testName, testCases[testName]);
console.log(`${testName}: end test`);
}
}
function runTest(testName, scheduleEmit) {
return runWithTimeout(async raceTimeout => {
try {
const evented = new EventEmitter();
const events = ["A", "B"];
for (const event of events) {
emit(testName, scheduleEmit, evented, event);
}
for (const event of events) {
await until(testName, evented, event, raceTimeout);
}
} catch (err) {
console.log(`${testName}: ${err}`);
}
}, 1000);
}
function emit(testName, schedule, evented, event) {
console.log(`${testName}: enqueue event ${event}`);
schedule(() => {
console.log(`${testName}: emit event ${event}`);
evented.emit(event);
});
}
async function until(testName, evented, event, raceTimeout) {
console.log(`${testName}: await Promise of event ${event}`);
await raceTimeout(
() =>
new Promise(resolve => {
console.log(`${testName}: Promise subscribing to event ${event}`);
evented.once(event, () => {
console.log(`${testName}: resolving Promise of event ${event}`);
resolve();
});
}),
);
}
async function runWithTimeout(task, ms) {
let id;
let promise;
let start = () => {
if (promise === undefined) {
promise = new Promise(resolve => {
id = setTimeout(resolve, ms);
}).then(() => {
throw new Error("timed out");
});
}
return promise;
};
try {
await task(subtask => Promise.race([subtask(), start()]));
} finally {
clearTimeout(id);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment