Skip to content

Instantly share code, notes, and snippets.

@felixge
Created December 2, 2009 20:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save felixge/247564 to your computer and use it in GitHub Desktop.
Save felixge/247564 to your computer and use it in GitHub Desktop.
var sys = require('sys');
var path = require('path');
var Db = exports.Db = function(options) {
this.options = {
host: 'localhost',
user: '',
pass: '',
database: '',
workers: 2
};
this.jobs = [];
this.workers = [];
process.mixin(this.options, options);
this.connect();
};
Db.prototype.connect = function() {
for (var i = 0; i < this.options.workers; i++) {
var options = {
host: this.options.host,
user: this.options.user,
pass: this.options.pass,
database: this.options.database
}
this.workers.push(new PhpWorker(options));
}
};
Db.prototype.work = function() {
for (var i = 0; i < this.workers.length; i++) {
var worker = this.workers[i];
if (worker.busy()) {
continue;
}
var job = this.jobs.shift(), self = this;
if (!job) {
break;
}
worker.send(job.params)
.addCallback(function(result) {
job.promise.emitSuccess(result);
setTimeout(function() {
self.work();
}, 1);
})
.addErrback(function(error) {
job.promise.emitError(error);
setTimeout(function() {
self.work();
}, 1)
});
break;
}
};
Db.prototype.query = function(query) {
var promise = new process.Promise();
var args = Array.prototype.slice.call(arguments);
this.jobs.push({
params: ['query'].concat(args),
promise: promise
});
this.work();
return promise;
};
var PhpWorker = function(options) {
this.options = {
host: 'localhost',
user: '',
pass: '',
database: '',
};
process.mixin(this.options, options);
this.job = null;
this.launch();
};
PhpWorker.prototype.relaunch = function() {
this.process.kill();
this.launch();
};
PhpWorker.prototype.launch = function() {
var cmd = [
'php',
path.join(path.dirname(__filename), 'worker.php'),
"'"+JSON.stringify(this.options)+"'"
].join(' ');
this.process = process.createChildProcess('/bin/sh', ['-c', cmd]);
var buffer = '', self = this;
this.process.addListener('output', function(chunk) {
buffer = buffer + (chunk || '');
while (buffer) {
var offset = buffer.indexOf("\n");
if (offset < 0) {
return;
}
var line = buffer.substr(0, offset);
buffer = buffer.substr(offset + 1);
self.onLine(line);
}
});
};
PhpWorker.prototype.onLine = function(line) {
try {
line = JSON.parse(line);
} catch (e) {
return puts('Could not parse line: '+line);
}
if (this.job) {
this.job.emitSuccess(line);
this.job = null;
} else {
puts('unexpected PhpWorker.onLine event, received: '+line);
}
};
PhpWorker.prototype.send = function(job) {
this.job = new process.Promise();
job = JSON.stringify(job);
puts('> '+job);
this.process.write(job+"\n", 'utf8');
return this.job;
};
PhpWorker.prototype.busy = function() {
return !!this.job;
};
process.mixin(require('sys'));
var mysql = require('../lib/mysql');
var db = new mysql.Db({
user: 'root',
pass: 'root',
database: 'mydb'
});
var query = db.query('SELECT %s FROM posts LIMIT 10', '*');
query.addCallback(function(results) {
p(results);
});
query.addErrback(function() {
p('error');
});
<?php
$options = json_decode(stripslashes($argv[1]), true);
function connect($options) {
static $retry = 0;
$db = @mysql_pconnect($options['host'], $options['user'], $options['pass']);
if (!is_resource($db)) {
return false;
}
if (!@mysql_select_db($options['database'], $db)) {
return false;
}
return $db;
}
$db = connect($options);
function send($line) {
$line = json_encode($line);
fwrite(STDOUT, $line."\n");
flush();
}
while($line = rtrim(fgets(STDIN))) {
$args = json_decode($line, true);
$cmd = array_shift($args);
while (true) {
$result = (is_resource($db))
? mysql_query('SELECT 1', $db)
: false;
if (is_resource($result)) {
break;
}
$db = connect($options);
sleep(1);
}
switch ($cmd) {
case 'query':
$template = array_shift($args);
foreach ($args as $i => $arg) {
$args[$i] = "'".mysql_real_escape_string($arg, $db)."'";
}
$query = vsprintf($template, $args);
$result = mysql_query($query, $db);
$rows = array();
while ($row = mysql_fetch_assoc($result)) {
$rows[] = $row;
}
mysql_free_result($result);
send($rows);
break;
default:
send(array('error' => 'unknown command: '.$cmd));
break;
}
}
?>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment