Skip to content

Instantly share code, notes, and snippets.

@indexzero
Created August 3, 2011 10:07
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save indexzero/1122318 to your computer and use it in GitHub Desktop.
Save indexzero/1122318 to your computer and use it in GitHub Desktop.
EventEmitter2 digs DIRT and ETL applications.

require('child_process').fork() and beyond

Abstract

The purpose of this sample is to show the power of EventEmitter2 in the context of a specific example centered around DIRT (Data-Intensive Realtime) and ETL (Extract, Transform, Load) applications in node.js. Given the clear limitations of the V8 heap-size doing any exceptionally large data processing in node will require such patterns, and it is in the interest of the community that we start solidifying them.

Scenario

Lets suppose that you have an ETL that you need to run on a large set of logs which has already been partitioned into files of a size that will by themselves not overload the V8 heap. These kind of size-limited log or data files are common and should need no explaination.

This ETL runs with initial conditions (very common), and thus there may be many sets of worker processes analyzing the same data for different purposes. As an intelligent developer knowning the blocking nature of in-memory data manipulation you decided to aggregate data and perform calculations on partions of data within individual worker processes to avoid overloading the parent process with CPU intensive calculation. In other words, the sole purpose parent process is for performing fast non-blocking I/O between the workers and these workers will listen for specific events from peers.

var fs = require('fs'),
path = require('path'),
fork = require('child_process').fork;
var fileDir = '/path/to/data/files',
files = fs.readdirSync(fileDir),
children = {};
//
// Application specific topics for which to partion data calculation
//
var topics = [
'some-topic-0',
'some-topic-1',
'some-topic-2',
'some-topic-3',
'some-topic-4'
//
// Etc, etc.
//
];
function rebroadcast (topic) {
var args = Array.prototype.slice.call(arguments),
namespaceEvent = ['data', topic].join('.');
//
// Rebroadcast the message from any child to
// all children using a **wildcard** event: `data::{topic}`
//
topics.forEach(function (target) {
children[target].send.apply(children[target], [namespaceEvent].concat(args));
});
}
function spawnChild (topic) {
if (files.length === 0) {
//
// If there are no more files left to
// process, do nothing, wait for all children to exit.
//
return;
}
var file = files.shift(),
child = fork('worker.js', [path.join(fileDir, file), topic]);
children[topic] = child;
child.on('message', rebroadcast.bind(null, topic));
child.on('exit', spawnChild.bind(null, topic));
}
topics.forEach(spawnChild);
var fs = require('fs');
var argv = process.argv.slice(2),
file = argv[0],
topic = argv[1];
//
// Clearly `.readFile()` is not the most efficient here,
// but line-by-line reading is just a little more syntax.
//
fs.readFile(file, function (err, data) {
if (err) {
process.send('error', err);
process.exit(1);
return;
}
data.toString().split('\n').forEach(function (line) {
//
// Perform topic extraction and emit blindly. This is the very
// application specific stuff that makes ETL such a pain (for everyone).
//
var lineTopic = extractTopicFromLine(line);
process.send(['data', lineTopic], line);
});
});
//
// Only listen for the `data.{topic}` events that we have been
// assigned to perform calculation. For the simple example where there
// is only a `data.*` event, this could easily be refactored to simply listen
// on an event of the topic name. However, it is more likely that `data::*` is
// only one type of event pertaining to the {topic} and wildcard / namespaced events
// are really necessary.
//
process.on(['data', topic].join('.'), function (message) {
//
// Perform analysis of this topic here, and only here.
//
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment