Created
June 23, 2018 13:51
-
-
Save bmeck/7b43e9dffa114f658563a0e9e0f694ce to your computer and use it in GitHub Desktop.
it is a better API at least
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Example way to construct a Bus | |
// | |
// const ctrl = new SharedArrayBuffer(12); | |
// const size = new SharedArrayBuffer(4); | |
// let busSize = 4096; | |
// const data = new SharedArrayBuffer(busSize); | |
// const bufs = {ctrl,size,data,}; | |
// const bus = new Bus(bufs); | |
'use strict'; | |
const CONSUMER_INDEX = 0; | |
const PRODUCER_INDEX = 1; | |
const COMMAND_INDEX = 2; | |
const READER_PENDING = 0; | |
const READER_READY = 1; | |
const WRITER_DATA = 2; | |
const WRITER_DONE = 3; | |
const UNUSED = 0; | |
const USED = 1; | |
const spinOn = (buf, index, whileValue) => { | |
while (Atomics.wait(buf, index, whileValue) === 'ok') { | |
Atomics.wake(buf, index); | |
} | |
}; | |
class Mutex { | |
constructor(cmd, index) { | |
this.cmd = cmd; | |
this.index = index; | |
} | |
acquire() { | |
const {cmd, index} = this; | |
const prev = Atomics.compareExchange(cmd, index, UNUSED, USED); | |
if (prev === UNUSED) { | |
Atomics.wake(cmd, index); | |
return true; | |
} | |
return false; | |
} | |
release() { | |
const {cmd, index} = this; | |
Atomics.store(cmd, index, UNUSED); | |
Atomics.wake(cmd, index); | |
return true; | |
} | |
wait() { | |
const {cmd, index} = this; | |
while (Atomics.wait(cmd, index, USED) === 'ok') { | |
Atomics.wake(cmd, index); | |
} | |
} | |
} | |
module.exports = class Bus { | |
constructor(bufs) { | |
const ctrl = new Int32Array(bufs.ctrl); | |
if (ctrl.length !== 3) throw Error(); | |
// fast fail on dual writing | |
this.writing = new Mutex(ctrl, PRODUCER_INDEX); | |
// fast fail on dual reading | |
this.reading = new Mutex(ctrl, CONSUMER_INDEX); | |
const size = new Uint32Array(bufs.size); | |
const data = new Uint8Array(bufs.data); | |
Object.assign(this, {ctrl, size, data}); | |
} | |
*read() { | |
const { ctrl, size, data } = this; | |
while (true) { | |
if (this.reading.acquire()) { | |
try { | |
spinOn(ctrl, PRODUCER_INDEX, UNUSED); | |
while (true) { | |
Atomics.store(ctrl, COMMAND_INDEX, READER_READY); | |
Atomics.wake(ctrl, COMMAND_INDEX); | |
spinOn(ctrl, COMMAND_INDEX, READER_READY); | |
const req = Atomics.load(ctrl, COMMAND_INDEX); | |
if (req === WRITER_DATA) { | |
const byteLength = Atomics.load(size, 0); | |
const part = new Uint8Array(byteLength); | |
part.set(data.slice(0, byteLength), 0); | |
yield part; | |
} else if (req === WRITER_DONE) { | |
Atomics.store(ctrl, COMMAND_INDEX, READER_PENDING); | |
Atomics.wake(ctrl, COMMAND_INDEX); | |
return true; | |
} else { | |
throw new Error('supposedly unreachable'); | |
} | |
} | |
} finally { | |
this.reading.release(); | |
} | |
} else { | |
this.reading.wait(); | |
} | |
} | |
} | |
write(bufs) { | |
const { ctrl, size, data } = this; | |
while (true) { | |
if (this.writing.acquire()) { | |
try { | |
spinOn(ctrl, CONSUMER_INDEX, UNUSED); | |
for (var buf of bufs) { | |
buf = new Uint8Array(buf.buffer, 0, buf.byteLength); | |
let i = 0; | |
const push = (slice) => { | |
Atomics.store(size, 0, slice.byteLength); | |
data.set(slice, 0); | |
Atomics.store(ctrl, COMMAND_INDEX, WRITER_DATA); | |
Atomics.wake(ctrl, COMMAND_INDEX); | |
spinOn(ctrl, COMMAND_INDEX, WRITER_DATA); | |
} | |
while (i + data.byteLength < buf.byteLength) { | |
spinOn(ctrl, COMMAND_INDEX, READER_PENDING); | |
const req = Atomics.load(ctrl, COMMAND_INDEX); | |
if (req === READER_READY) { | |
const slice = buf.slice(i, i + data.byteLength); | |
i += data.byteLength; | |
push(slice); | |
} | |
} | |
spinOn(ctrl, COMMAND_INDEX, READER_PENDING); | |
const remaining = buf.byteLength % data.byteLength; | |
const slice = buf.slice(i, i + remaining); | |
push(slice); | |
} | |
Atomics.store(ctrl, COMMAND_INDEX, WRITER_DONE); | |
Atomics.wake(ctrl, COMMAND_INDEX); | |
return; | |
} finally { | |
this.writing.release(); | |
} | |
} else { | |
this.writing.wait(); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// dumps files using readFileSyncViaWorker | |
// | |
// node --experimental-modules --experimental-worker ./main.mjs $PATH_TO_DUMP | |
import worker from 'worker_threads'; | |
import Bus from './bus.js'; | |
import util from 'util'; | |
const {TextDecoder, TextEncoder} = util; | |
const {Worker} = worker; | |
const child = new Worker(new URL('./worker.js', import.meta.url).pathname); | |
const ctrl = new SharedArrayBuffer(12); | |
const size = new SharedArrayBuffer(4); | |
let busSize = 4096; | |
const data = new SharedArrayBuffer(busSize); | |
const bufs = {ctrl,size,data,}; | |
const bus = new Bus(bufs); | |
child.postMessage(bufs); | |
child.on('error', console.dir); | |
function readFileSyncViaWorker(file) { | |
bus.write([new TextEncoder().encode(file)]); | |
let parts = [...bus.read()]; | |
const body = Buffer.concat(parts); | |
const isError = body[body.length - 1] === 1; | |
const text = new TextDecoder().decode(body.slice(0, -1)); | |
if (isError) { | |
throw new Error(text); | |
} | |
return text; | |
} | |
for (const file of process.argv.slice(2)) { | |
console.log(readFileSyncViaWorker(file)); | |
} | |
child.terminate(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require('fs'); | |
const {TextDecoder, TextEncoder} = require('util'); | |
const {parentPort} = require('worker_threads'); | |
const Bus = require('./bus.js'); | |
parentPort.on('message', (bufs) => { | |
const bus = new Bus(bufs); | |
function action() { | |
const filename = new TextDecoder().decode(Buffer.concat([...bus.read()])); | |
fs.readFile(filename, (err, body) => { | |
if (err) { | |
bus.write([ | |
new TextEncoder().encode(err.message), | |
new Uint8Array([1]), | |
]); | |
} else { | |
bus.write([body]); | |
} | |
action(); | |
}); | |
} | |
action(); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment