Skip to content

Instantly share code, notes, and snippets.

@kurokikaze
Created May 5, 2010 09:24
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kurokikaze/390566 to your computer and use it in GitHub Desktop.
Save kurokikaze/390566 to your computer and use it in GitHub Desktop.
Simple work queue implementation for node.js
var net = require('net'),
sys = require('sys');
var messages = [];
var work_in_progress = [];
var host = '192.168.175.128';
var port = 8999;
var server = net.createServer(function(client_stream) {
client_stream.setEncoding('utf8');
var process_message = function(message_raw) {
var message = JSON.parse(message_raw);
if (message.action == 'put' && message.message) {
messages.push(message.message);
}
if (message.action == 'get') {
var work_message = messages.shift();
if (work_message) {
work_in_progress.push({'client':client_stream, 'message':work_message});
client_stream.write(JSON.stringify({'action':'work','contents': work_message.toString()}));
} else {
// No more work
client_stream.write(JSON.stringify({'action':'work','contents': ''}));
}
}
// Work done, remove item from 'work in progress' queue
if (message.action == 'notify' && message.message) {
for (var wip_id in work_in_progress) {
if (message.message == work_in_progress[wip_id].message && client_stream === work_in_progress[wip_id].client) {
work_in_progress.splice(wip_id, 1);
}
}
}
};
client_stream.addListener('data', function(data) {
//sys.puts('Data received: ' + data);
var messages = [];
while (data.indexOf('}{') != -1) {
messages.push(data.substr(0,data.indexOf('}{') + 1));
data = data.substr(data.indexOf('}{') + 1);
}
messages.push(data);
for (message_id in messages) {
process_message(messages[message_id]);
}
});
// Return all unfinished work by client to queue
client_stream.addListener('end', function() {
for (var wip_id in work_in_progress) {
if (work_in_progress[wip_id].client === client_stream) {
// Если клиент отключился, вернуть выданные ему задания в очередь
messages.push(work_in_progress[wip_id].message);
work_in_progress.splice(wip_id, 1);
}
}
});
});
server.listen(port, host);
var queues = {};
(function(){
var sys = require('sys'),
net = require('net'),
events = require('events');
queues.connect = function(port, host, callback) {
var queue = new events.EventEmitter;
var server_conn = net.createConnection(port, host);
server_conn.setEncoding('utf-8');
server_conn.addListener('connect', function() {
queue.emit('connect');
server_conn.addListener('close', function(code) {
queue.emit('close');
});
queue.add_job = function(message) {
server_conn.write(JSON.stringify({'action':'put', 'message':message}));
};
queue.get_job = function(callback) {
server_conn.write(JSON.stringify({'action':'get'}));
server_conn.addListener('data', function(data) {
sys.puts('Data: ' + data);
var message = JSON.parse(data);
server_conn.removeAllListeners('data');
if (message.contents) {
callback(false, message.contents);
} else {
callback(true);
}
})
};
queue.notify = function(message) {
server_conn.write(JSON.stringify({'action':'notify', 'message':message}));
};
callback(false, queue);
});
};
})();
process.mixin(exports, queues);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment