Skip to content

Instantly share code, notes, and snippets.

@bcg
Created December 14, 2010 16:54
Show Gist options
  • Save bcg/740690 to your computer and use it in GitHub Desktop.
Save bcg/740690 to your computer and use it in GitHub Desktop.
A stomp.js driver ...
#!/usr/bin/env node
var net = require('net'),
stomp = require('../lib/stomp');
net.createServer(function (stream) {
stomp.createBroker(stream, function(proto) {
proto.on('send', function(queue, message) {
console.log('send: ' + queue + " " + message);
});
});
}).listen(61613, '127.0.0.1');
var net = require('net'),
sys = require('sys'),
util = require('util'),
events = require('events');
function trim (str) {
var str = str.replace(/^\s\s*/, ''),
ws = /\s/,
i = str.length;
while (ws.test(str.charAt(--i)));
return str.slice(0, i + 1);
}
// Parser
Parser = function() {
events.EventEmitter(this);
this.buffer = '';
this.on('frame', function (data) {
//console.log("\n\nFRAME\n" + data + "\n\n\n");
var parts = data.split('\n', -1);
var command = parts.splice(0, 1);
var body = parts.splice(-1, 1);
var headers = new Array();
for (var i = 0; i < parts.length; i++) {
var kv = parts[i].split(':', 2);
if (kv.length == 2)
headers[trim(kv[0])] = trim(kv[1]);
}
//console.log(headers);
this.emit(command, headers, body);
});
};
sys.inherits(Parser, events.EventEmitter);
Parser.prototype.appendString = function (str) {
this.buffer += str;
this.parse();
};
Parser.prototype.parse = function () {
var end = this.buffer.indexOf("\x00");
if (end != -1) {
var consume = this.buffer.substr(0, end),
rest = this.buffer.substr(end + 1, (this.buffer.length - consume.length));
this.buffer = rest;
this.emit('frame', consume);
this.parse();
}
};
// Protocol
Protocol = function(stream) {
events.EventEmitter(this);
this.stream = stream;
}
sys.inherits(Protocol, events.EventEmitter);
Protocol.prototype.receipt = function(id) {
if (id && this.stream.writable)
this.stream.write('RECEIPT\nreceipt-id:'+id+'\n\n\x00');
}
Subscription = function() {
this.queue = '';
this.ack = false;
};
Client = function() {
this.ip = '';
this.port = '';
this.login= '';
this.password = '';
this.session_id = '';
this.subscription = new Array();
};
function Broker () {
events.EventEmitter.call(this);
var stream = arguments[0];
var func = arguments[1];
var parser = new Parser();
var proto = new Protocol(stream);
var client = new Client();
stream.setEncoding('ascii');
stream.on('connect', function(data) {
});
stream.on('data', function(data) {
parser.appendString(data);
});
parser.on('CONNECT', function(headers, body) {
stream.write('CONNECTED\nsession: 12345\n\n\x00'); // XXX unique session id
});
parser.on('SEND', function(headers, body) {
proto.receipt(headers['receipt']);
proto.emit('send', headers['destination'], body);
});
parser.on('SUBSCRIBE', function(headers, body) {
proto.receipt(headers['receipt']);
proto.emit('subscribe', headers['destination']);
});
parser.on('UNSUBSCRIBE', function(headers, body) {
proto.emit('unsubscribe', headers['destination']);
});
func.call(this, proto);
};
util.inherits(Broker, events.EventEmitter);
exports.Broker = Broker;
exports.createBroker = function() {
return new Broker(arguments[0], arguments[1]);
};
@bcg
Copy link
Author

bcg commented Dec 14, 2010

A first cut at a new STOMP driver. The idea here is that it handles the protocol specifics and lets the implementer write their own specialized message brokers / routers. Hoping to give it a ZeroMQ feel.

Still a little new to Node and JS so suggestions welcome ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment