Skip to content

Instantly share code, notes, and snippets.

@vvo
Created May 27, 2014 06:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vvo/e06d9aec7c22e710543d to your computer and use it in GitHub Desktop.
Save vvo/e06d9aec7c22e710543d to your computer and use it in GitHub Desktop.
rethinkdb native driver
var async = require('async');
var r = require('rethinkdb');
var debug = require('debug')('rethinkdb');
// first get some warming data to insert
var warmMessages = require('./generate-messages.js')(10000);
var host = process.env.HOST || 'localhost';
// get real data to insert
var messages = require('./generate-messages.js')(process.env.NB_MESSAGES);
var writes = require('./result')('rethinkdb_writes');
var reads = require('./result')('rethinkdb_reads');
var connection;
debug('Start');
async.waterfall([
connect,
setup,
run,
teardown
], function stop(err) {
if (err) {
throw err;
}
debug('Stop');
});
function connect(cb) {
debug('Connecting');
r.connect({host: host, port: 28015}, cb);
}
function setup(conn, cb) {
conn.use('test');
r.tableList().run(conn, function foundTables(err, tables) {
if (err) return cb(err);
if (tables.indexOf('message') !== -1) {
debug('Deleting previous `message` table');
r.tableDrop('message').run(conn, createTable);
} else {
createTable(null);
}
function createTable(err) {
if (err) return cb(err);
debug('Creating `message` table');
r.tableCreate('message').run(conn, function created(err) {
if (err) return cb(err);
cb(null, conn);
});
}
});
}
function run(conn, cb) {
debug('Inserting');
async.series([warm, write, read], function(err) {
return cb(err, conn);
});
var keys = [];
function warm(cb) {
debug('Warming with %d messages', warmMessages.length);
console.time('warming');
r
.table('message')
.insert(warmMessages)
.run(conn, function(err) {
console.timeEnd('warming');
cb(err);
});
}
function write(cb) {
debug('Benchmarking %d messages', messages.length);
console.time('benchmarking');
async.eachLimit(messages, 10000, insert, end);
function insert(message, cb) {
r
.table('message')
.insert(message)
// some options define how data store command is acknowledged
// http://rethinkdb.com/api/javascript/run/
// this is similar to MongoDB "write-concern"
// It all boils down to whether or not we want to wait
// for our data to be writen to an actual disk to get a response
.run(conn, {
// durability: 'soft'
// noreply: true
}, function messageInserted(err, res) {
if (err) return cb(err);
writes.hit();
keys.push(res.generated_keys[0]);
cb();
});
}
function end(err) {
if (err) return cb(err);
console.timeEnd('benchmarking');
writes.end();
cb(null, conn);
}
}
function read(cb) {
debug('Reading');
console.time('reading');
// limit to 10000 async ops per second, so we won't burn too much cpu
async.eachLimit(keys, 10000, get, end);
function get(key, cb) {
r
.table('message')
.get(key)
.run(conn, function messageRead(err) {
if (err) return cb(err);
reads.hit();
cb();
});
}
function end(err) {
if (err) return cb(err);
console.timeEnd('reading');
reads.end();
cb(null, conn);
}
}
}
function teardown(conn, cb) {
debug('Closing');
conn.close(cb);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment