Skip to content

Instantly share code, notes, and snippets.

@bmeck
Created June 23, 2018 13:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bmeck/7b43e9dffa114f658563a0e9e0f694ce to your computer and use it in GitHub Desktop.
Save bmeck/7b43e9dffa114f658563a0e9e0f694ce to your computer and use it in GitHub Desktop.
it is a better API at least
// Example way to construct a Bus
//
// const ctrl = new SharedArrayBuffer(12);
// const size = new SharedArrayBuffer(4);
// let busSize = 4096;
// const data = new SharedArrayBuffer(busSize);
// const bufs = {ctrl,size,data,};
// const bus = new Bus(bufs);
'use strict';
const CONSUMER_INDEX = 0;
const PRODUCER_INDEX = 1;
const COMMAND_INDEX = 2;
const READER_PENDING = 0;
const READER_READY = 1;
const WRITER_DATA = 2;
const WRITER_DONE = 3;
const UNUSED = 0;
const USED = 1;
const spinOn = (buf, index, whileValue) => {
while (Atomics.wait(buf, index, whileValue) === 'ok') {
Atomics.wake(buf, index);
}
};
class Mutex {
constructor(cmd, index) {
this.cmd = cmd;
this.index = index;
}
acquire() {
const {cmd, index} = this;
const prev = Atomics.compareExchange(cmd, index, UNUSED, USED);
if (prev === UNUSED) {
Atomics.wake(cmd, index);
return true;
}
return false;
}
release() {
const {cmd, index} = this;
Atomics.store(cmd, index, UNUSED);
Atomics.wake(cmd, index);
return true;
}
wait() {
const {cmd, index} = this;
while (Atomics.wait(cmd, index, USED) === 'ok') {
Atomics.wake(cmd, index);
}
}
}
module.exports = class Bus {
constructor(bufs) {
const ctrl = new Int32Array(bufs.ctrl);
if (ctrl.length !== 3) throw Error();
// fast fail on dual writing
this.writing = new Mutex(ctrl, PRODUCER_INDEX);
// fast fail on dual reading
this.reading = new Mutex(ctrl, CONSUMER_INDEX);
const size = new Uint32Array(bufs.size);
const data = new Uint8Array(bufs.data);
Object.assign(this, {ctrl, size, data});
}
*read() {
const { ctrl, size, data } = this;
while (true) {
if (this.reading.acquire()) {
try {
spinOn(ctrl, PRODUCER_INDEX, UNUSED);
while (true) {
Atomics.store(ctrl, COMMAND_INDEX, READER_READY);
Atomics.wake(ctrl, COMMAND_INDEX);
spinOn(ctrl, COMMAND_INDEX, READER_READY);
const req = Atomics.load(ctrl, COMMAND_INDEX);
if (req === WRITER_DATA) {
const byteLength = Atomics.load(size, 0);
const part = new Uint8Array(byteLength);
part.set(data.slice(0, byteLength), 0);
yield part;
} else if (req === WRITER_DONE) {
Atomics.store(ctrl, COMMAND_INDEX, READER_PENDING);
Atomics.wake(ctrl, COMMAND_INDEX);
return true;
} else {
throw new Error('supposedly unreachable');
}
}
} finally {
this.reading.release();
}
} else {
this.reading.wait();
}
}
}
write(bufs) {
const { ctrl, size, data } = this;
while (true) {
if (this.writing.acquire()) {
try {
spinOn(ctrl, CONSUMER_INDEX, UNUSED);
for (var buf of bufs) {
buf = new Uint8Array(buf.buffer, 0, buf.byteLength);
let i = 0;
const push = (slice) => {
Atomics.store(size, 0, slice.byteLength);
data.set(slice, 0);
Atomics.store(ctrl, COMMAND_INDEX, WRITER_DATA);
Atomics.wake(ctrl, COMMAND_INDEX);
spinOn(ctrl, COMMAND_INDEX, WRITER_DATA);
}
while (i + data.byteLength < buf.byteLength) {
spinOn(ctrl, COMMAND_INDEX, READER_PENDING);
const req = Atomics.load(ctrl, COMMAND_INDEX);
if (req === READER_READY) {
const slice = buf.slice(i, i + data.byteLength);
i += data.byteLength;
push(slice);
}
}
spinOn(ctrl, COMMAND_INDEX, READER_PENDING);
const remaining = buf.byteLength % data.byteLength;
const slice = buf.slice(i, i + remaining);
push(slice);
}
Atomics.store(ctrl, COMMAND_INDEX, WRITER_DONE);
Atomics.wake(ctrl, COMMAND_INDEX);
return;
} finally {
this.writing.release();
}
} else {
this.writing.wait();
}
}
}
}
// dumps files using readFileSyncViaWorker
//
// node --experimental-modules --experimental-worker ./main.mjs $PATH_TO_DUMP
import worker from 'worker_threads';
import Bus from './bus.js';
import util from 'util';
const {TextDecoder, TextEncoder} = util;
const {Worker} = worker;
const child = new Worker(new URL('./worker.js', import.meta.url).pathname);
const ctrl = new SharedArrayBuffer(12);
const size = new SharedArrayBuffer(4);
let busSize = 4096;
const data = new SharedArrayBuffer(busSize);
const bufs = {ctrl,size,data,};
const bus = new Bus(bufs);
child.postMessage(bufs);
child.on('error', console.dir);
function readFileSyncViaWorker(file) {
bus.write([new TextEncoder().encode(file)]);
let parts = [...bus.read()];
const body = Buffer.concat(parts);
const isError = body[body.length - 1] === 1;
const text = new TextDecoder().decode(body.slice(0, -1));
if (isError) {
throw new Error(text);
}
return text;
}
for (const file of process.argv.slice(2)) {
console.log(readFileSyncViaWorker(file));
}
child.terminate();
const fs = require('fs');
const {TextDecoder, TextEncoder} = require('util');
const {parentPort} = require('worker_threads');
const Bus = require('./bus.js');
parentPort.on('message', (bufs) => {
const bus = new Bus(bufs);
function action() {
const filename = new TextDecoder().decode(Buffer.concat([...bus.read()]));
fs.readFile(filename, (err, body) => {
if (err) {
bus.write([
new TextEncoder().encode(err.message),
new Uint8Array([1]),
]);
} else {
bus.write([body]);
}
action();
});
}
action();
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment