Skip to content

Instantly share code, notes, and snippets.

@sbrl
Created May 26, 2020 19:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sbrl/33bc3afc170a12f6eaf49b4ca8e03602 to your computer and use it in GitHub Desktop.
Save sbrl/33bc3afc170a12f6eaf49b4ca8e03602 to your computer and use it in GitHub Desktop.
Multi-process line-by-line reading from a single pipe in Node.js
"use strict";
import EventEmitter from 'events';
import child_process from 'child_process';
/**
* Helper method that waits for an event to be fired on a given object.
* @param {EventEmitter} obj The object that will fire the event - must inherit from EventEmitter
* @param {string} event_name The name of the event to wait for.
* @return {Promise} A promise that resolves when the specified event is fired on the given object.
*/
function wait_for_event(obj, event_name) {
return new Promise((resolve) => {
obj.once(event_name, resolve);
});
}
class Controller extends EventEmitter {
constructor() {
super();
this.worker_count = 4;
this.workers = [];
this.counter = 0;
this.counter_last = 0;
this.stats_update_last = 0;
this.someone_is_reading = false;
}
log_msg(...msg) {
console.log(`[master] `, ...msg);
}
stats_update() {
if(new Date() - this.stats_update_last < 1000) return;
let counter_diff = this.counter - this.counter_last;
console.log(`Rate: ${counter_diff} items / sec`)
this.counter_last = this.counter;
this.stats_update_last = +new Date();
}
async send_work(worker) {
while(this.someone_is_reading) {
// this.log_msg(`someone is reading, waiting for slot`);
await wait_for_event(this, "reading_complete");
}
// this.log_msg(`got slot`);
this.someone_is_reading = true;
worker.send({ event: "work", count: this.counter });
this.counter++;
}
async handle_message(worker, i, message) {
switch(message.event) {
case "read_complete":
this.someone_is_reading = false;
this.emit("reading_complete");
break;
case "ready":
case "done":
this.stats_update();
process.nextTick(() => this.send_work(worker));
break;
case "end":
worker.send({ event: "exit" });
this.log_msg(`Worker ${i} exited`);
this.workers.splice(i, 1);
break;
}
}
start() {
for(let i = 0; i < this.worker_count; i++) {
this.log_msg(`Spawning worker ${i} / ${this.worker_count}`);
let next = child_process.fork("worker.mjs", {
stdio: [ 0, 1, 2, "ipc" ]
});
next.on("message", this.handle_message.bind(this, next, i));
this.workers.push(next);
}
}
}
let controller = new Controller();
controller.start();
#!/usr/bin/env bash
# This test demonstrates reading from a single pipe using multiple processes.
# License: Mozilla Public Licence 2.0 (MPL-2.0)
seq 1 1000000 | awk '{ print("LINE_START This is a number [" $0 "] more text more text LINE_END") }' | node main.mjs
"use strict";
import fs from 'fs';
// Global buffer to avoid unnecessary memory churn
let buffer = Buffer.alloc(4096);
function read_line_unbuffered(fd) {
let i = 0;
while(true) {
let bytes_read = fs.readSync(fd, buffer, i, 1);
if(bytes_read !== 1 || buffer[i] == 0x0A) {
if(i == 0 && bytes_read == null) return null;
return buffer.toString("utf-8", 0, i); // This is not inclusive, so we can abuse it to trim the \n off the end
}
i++;
if(i == buffer.length) {
let new_buffer = new Buffer(Math.ceil(buffer.length * 1.5));
buffer.copy(new_buffer);
buffer = new_buffer;
}
}
}
function log_msg(...msg) {
console.log(`[worker ${process.pid}] `, ...msg);
}
log_msg(`hello, world`);
function sleep_async(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
process.on("message", async (message) => {
switch(message.event) {
case "work":
// log_msg(`got work, count ${message.count}`);
// log_msg(`starting read`); let start = new Date();
let next = read_line_unbuffered(0);
// log_msg(`ended read in ${new Date() - start}ms`);
process.send({ event: "read_complete" });
if(next == null) {
log_msg(`Done reading`);
process.send({ event: "end" });
}
// Simulate doing hard work
// let start = +new Date();
// while(new Date() - start < 1000) {
// // noop
// }
log_msg(`[${message.count}] Processed '${next}'`);
// await sleep_async(500 + Math.random()*1000s);
process.send({ event: "done" });
break;
case "exit":
log_msg(`exiting`);
process.exit(0);
}
});
process.send({ event: "ready" });
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment