Skip to content

Instantly share code, notes, and snippets.

@apk
Created April 11, 2012 13:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apk/2359201 to your computer and use it in GitHub Desktop.
Save apk/2359201 to your computer and use it in GitHub Desktop.
A (very) simple pub/sub server for node.js

Trivial pubsub in node.js: performance

This is an performance analysis for a simple pubsub service written in node.js. It accepts connection on two ports; those on one port are expected to send lines of text which are the messages. All such messages are sent to all connections that are made to the other port. There is no method for subscribing to subsets of the messages.

All measurement are done with 100 'pub' clients and 20 'sub' clients, which means that every incoming message is sent twenty times.

The source code is in pubsub.js, along with the code to break the input into messages (lines, that it) in split.js. I have done the measurements with four different versions of the splitter (see the history of this gist).

All graphs have 'message size' on the x axis.

The first one, marked raw in the graphs, is without any actual splitting, and thus isn't correct since there is no guarantee that node.js will deliver the data in the same chunks as they are sent on the other side.

The next one (1) is the simples possible correct one; it has the unfortunate effect that it always does an empty send when the messages came in in one piese, and it shows in a clearly lower achievable rate.

2 and 3 improve on that; the most significant improvement is just not sending the empty buffer which yields an improvement of 30%.

Also apparent is that the processing is dominated by the per-message handling overhead; there is only a slight drop towards bigger messages (all of this is best seen in the 'messages per second' diagrams), and that is probably in the biggest part due to the loop looking for line ends.

The drop in message througput with message bigger than 1000 bytes is caused by saturation of the 1Gbit/s network interface; it can only deliver about 120MBytes/s, and 1k messages duplicated to 20 'sub' clients are already 20kBytes, so there is a limit of about 5000 messages. Below that, throughput is limited by the CPU consumption (single-core) of node.js.

For comparison, there is a measurement (marked C) of a similar setup implemented in C (with a more complex message format), which still outperforms node.js by a factor of two to three. (This will probably get worse when actually starting to look into the messages.)

The test tool used to generate input is in Plain.java; it opens the required number of connections to the node.js process, sends messages to it, and counts the number of messages it receives on the 'sub' connections. To avoid overrunning the server it limits the number of outstanding messages.

tl;dr

node.js is actually remarkably fast.

It also greatly reduces coding overhead in comparison to a C library designed on very similar principles simply due to the use of garbage collection and functional programming; and makes possible very simple interfaces, as the one to the splitter (which is a function taking the next input block and a delivery function for outputting blocks).

// -*- mode: Java; c-basic-offset: 3; tab-width: 8; indent-tabs-mode: nil -*-
import java.io.*;
import java.net.*;
public class Plain {
public static class Client {
protected Socket s;
public Client (String host, int port) throws IOException {
synchronized (this) {
try {
wait (200);
} catch (InterruptedException e) {
System.out.println ("IntEx: " + e);
}
}
s = new Socket (host, port);
s.setTcpNoDelay (true);
}
}
public static int NREC = 20;
public static int NSND = 100;
public static int NMSG = 500000;
public static int MSGSZ = 125;
int cnt = 0; // Outstanding connections/messages
int num = 0; // For numbering the connections
int down = 0;
String host = "192.168.56.102";
public synchronized void inc_cnt (int n) {
cnt += n;
}
public synchronized void dec_cnt () {
cnt --;
down ++;
notify ();
}
public synchronized void wait_cnt (int max) {
while (cnt > max) {
try {
wait ();
} catch (InterruptedException e) {
System.out.println ("IntEx: " + e);
}
}
}
public static void main (String [] args) {
try {
String ms = System.getProperty ("msgsz", "1000");
int sz = Integer.parseInt (ms);
if (sz < 2) sz = 2;
MSGSZ = sz;
new Plain ().run ();
} catch (IOException e) {
System.out.println ("IoEx: " + e);
System.exit (1);
}
}
public class Receiver extends Client implements Runnable {
public Receiver () throws IOException {
super (host, 4456);
new Thread (this).start ();
}
public void run () {
try {
InputStream is = s.getInputStream ();
byte [] a = new byte [10000];
while (true) {
int r = is.read (a);
for (int i = 0; i < r; i ++) {
if (a [i] == 13) {
dec_cnt ();
}
}
}
} catch (IOException e) {
System.out.println ("IoEx: " + e);
System.exit (1);
}
}
}
public class Sender extends Client {
public Sender () throws IOException {
super (host, 4455);
}
public void write (byte [] a) throws IOException {
s.getOutputStream ().write (a);
}
}
public void run () throws IOException {
Sender [] senders = new Sender [NSND];
try {
for (int i = 0; i < NREC; i ++) {
new Receiver ();
}
for (int i = 0; i < senders.length; i ++) {
senders [i] = new Sender ();
}
wait_cnt (0);
} catch (IOException e) {
System.out.println ("Base ex: " + e);
System.exit (1);
}
System.out.println ("Ok");
long t = System.currentTimeMillis ();
for (int i = 0; i < NMSG; i ++) {
Sender s = senders [i % senders.length];
byte [] a = new byte [MSGSZ];
for (int k = 0; k < MSGSZ - 1; k ++) {
a [k] = 32;
}
a [MSGSZ - 1] = 13;
s.write (a);
inc_cnt (NREC);
wait_cnt (500);
}
System.out.println ("Almost...");
wait_cnt (0);
long dur = System.currentTimeMillis () - t;
System.out.println ("Done " + down + " recvs in " + dur + " ms");
System.out.println ("msgsz: " + MSGSZ);
if (dur > 0) {
System.out.println ("=> " + ((NMSG * 1000) / dur) + " msgs/s");
System.out.println ("=> " + ((NMSG * (long)MSGSZ * (1 + NREC) * 1000) / dur) + " bytes/s");
}
Writer w = new FileWriter ("of-" + (100000 + MSGSZ) + ".txt");
w.write ("" + MSGSZ + " " + ((NMSG * 1000) / dur) + " " + ((NMSG * (long)MSGSZ * (1 + NREC) * 1000) / dur) + "\n");
w.close ();
System.exit (0);
}
}
var net = require('net');
var splitter = require('./split');
var receivers = [];
function sendout(data) {
// This code is missing two relevant things:
// - It needs to catch EPIPE and similar
// - It needs to monitor the output queue size,
// and kill the connection when it gets too big;
// otherwise a dead receiver can cause us to eat
// massive memory (at high load).
for (var i = 0; i < receivers.length; i ++) {
var r = receivers [i];
if (r) {
r.write(data);
}
}
}
function sub_hdlr(c) {
for (var i = 0; ; i ++) {
if (i == receivers.length || !receivers [i]) {
receivers [i] = c;
break;
}
}
c.on('data', function(data) {
});
c.on('end', function(data) {
});
c.on('close', function(data) {
for (var i = 0; i < receivers.length; i ++) {
if (receivers[i] === c) {
receivers[i] = 0;
}
}
});
}
function pub_hdlr(c) {
var sp = splitter.makeSplitter ();
c.on('data', function(data) {
sp(data, sendout);
});
c.on('end', function(data) {
});
c.on('close', function(data) {
});
}
var server = net.createServer(pub_hdlr);
server.listen(4455);
var subsrv = net.createServer(sub_hdlr);
subsrv.listen(4456);
// -*- mode: Javascript; js-indent-level:2; indent-tabs-mode: nil; -*-
var splitterMaker = require("./split").makeSplitter;
var util = require("util");
function testSplitter(arr) {
var splitter = splitterMaker ();
for (var i in arr) {
var io = arr[i];
res = [];
console.log ("In: " + util.inspect(io));
splitter (
new Buffer (io.in),
function (r) {res.push(r.toString()); }
);
var want = util.inspect(io.out);
var have = util.inspect(res);
if (want == have) {
console.log ("Good: " + want);
} else {
console.log ("*** Want " + want + "; have " + have + ".");
}
}
}
testSplitter(
[
{ in: "asdf", out: [] },
{ in: "as\ndf", out: ["asdf", "as\n"] },
{ in: "df\ner\nst", out: ["df", "df\n", "er\n"] },
{ in: "zxcv", out: [] },
{ in: "\n\n", out: ["stzxcv", "\n", "\n"] },
{ in: "uiop\n", out: ["uiop\n"] },
{ in: "aio\ntyu\njkl\n", out: ["aio\n", "tyu\n", "jkl\n"] },
{ in: "as", out: [] },
{ in: "df", out: [] },
{ in: "jk\nrs", out: ["asdf", "jk\n"] },
{ in: "jk\n", out: ["rs", "jk\n"] },
]
);
// -*- mode: Javascript; js-indent-level:2; indent-tabs-mode: nil; -*-
(function () {
function makeSplitter() {
var last = null;
return function(data, send) {
var p = 0;
for(var i = 0; i < data.length; i ++) {
if (data[i]==10 || data[i]==13) {
if (last) send(last);
send(data.slice(p,i+1));
last=null;
p=i+1;
}
}
if (p < data.length) {
// Remaining data
if (last) {
// Previous remaining data -> join
var n = new Buffer(last.length + data.length - p);
last.copy(n);
data.copy(n,last.length,p);
last=n;
} else {
last=data.slice(p);
}
}
}
}
exports.makeSplitter = makeSplitter;
}) ()
@0xCourtney
Copy link

👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment