Skip to content

Instantly share code, notes, and snippets.

@bmeck
Last active June 29, 2018 15:44
Show Gist options
  • Save bmeck/fb90b23234aee41f392d4a19b61ba6eb to your computer and use it in GitHub Desktop.
Save bmeck/fb90b23234aee41f392d4a19b61ba6eb to your computer and use it in GitHub Desktop.
Show fs.readFile callback acting as if it was sync in another thread (naive impl)
module.exports = {
__proto__: null,
START: 0,
WORKER_READY: 2,
BUS_READY: 3,
BUS_DRAINED: 4,
BUS_END: 5,
WORKER_ERROR: 6,
};
// run via node --experimental-modules --experimental-worker ./main.mjs
import worker from 'worker_threads';
import commands from './commands.js';
import util from 'util';
const {TextDecoder} = util;
const child = new worker.Worker(new URL('./worker.js', import.meta.url).pathname);
const cmd = new Int32Array(new SharedArrayBuffer(8));
const size = new Uint32Array(new SharedArrayBuffer(4));
let busSize = 4096;
const data = new Uint8Array(new SharedArrayBuffer(busSize));
child.postMessage({
cmd: cmd.buffer,
size: size.buffer,
data: data.buffer,
});
let error = false;
let parts = [];
let bodySize = 0;
Atomics.wait(cmd, 0, commands.START);
while (true) {
if (cmd[0] === commands.BUS_READY) {
const part = new Uint8Array(size[0]);
bodySize += part.byteLength;
part.set(data.slice(0, part.byteLength), 0);
parts.push(part);
cmd[0] = commands.BUS_DRAINED;
Atomics.wake(cmd, 0);
Atomics.wait(cmd, 0, commands.BUS_DRAINED);
} else if (cmd[0] === commands.WORKER_ERROR) {
error = true;
parts = [];
bodySize = 0;
cmd[0] = commands.BUS_DRAINED;
Atomics.wake(cmd, 0);
Atomics.wait(cmd, 0, commands.BUS_DRAINED);
} else if (cmd[0] === commands.BUS_END) {
break;
}
}
const body = new Uint8Array(bodySize);
let i = 0;
for (const part of parts) {
body.set(part, i);
i += part.byteLength;
}
const text = new TextDecoder().decode(body);
if (error) {
throw new Error(text);
} else {
console.log(text);
}
const fs = require('fs');
const {TextEncoder} = require('util');
const {parentPort} = require('worker_threads');
const commands = require('./commands.js');
parentPort.on('message', (bufs) => {
const cmd = new Int32Array(bufs.cmd);
const size = new Uint32Array(bufs.size);
const data = new Uint8Array(bufs.data);
const write = (buf) => {
buf = new Uint8Array(buf.buffer, 0, buf.byteLength);
let i = 0;
while (i + data.byteLength < buf.byteLength) {
size[0] = data.byteLength;
data.set(buf.slice(i, i + data.byteLength), 0);
cmd[0] = commands.BUS_READY;
Atomics.wake(cmd, 0);
Atomics.wait(cmd, 0, commands.BUS_READY);
i += data.byteLength;
}
size[0] = buf.byteLength % data.byteLength;
data.set(buf.slice(i, i + size[0]), 0);
cmd[0] = commands.BUS_READY;
Atomics.wake(cmd, 0);
Atomics.wait(cmd, 0, commands.BUS_READY);
cmd[0] = commands.BUS_END;
Atomics.wake(cmd, 0);
}
fs.readFile(__filename, (err, body) => {
if (err) {
cmd[0] = commands.WORKER_ERROR;
Atomics.wake(cmd, 0);
Atomics.wait(cmd, 0, commands.WORKER_ERROR);
write(new TextEncoder().encode(err.message));
process.exit(1);
} else {
write(body);
process.exit();
}
})
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment