Created
May 27, 2014 06:45
-
-
Save vvo/e06d9aec7c22e710543d to your computer and use it in GitHub Desktop.
rethinkdb native driver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var async = require('async'); | |
var r = require('rethinkdb'); | |
var debug = require('debug')('rethinkdb'); | |
// first get some warming data to insert | |
var warmMessages = require('./generate-messages.js')(10000); | |
var host = process.env.HOST || 'localhost'; | |
// get real data to insert | |
var messages = require('./generate-messages.js')(process.env.NB_MESSAGES); | |
var writes = require('./result')('rethinkdb_writes'); | |
var reads = require('./result')('rethinkdb_reads'); | |
var connection; | |
debug('Start'); | |
async.waterfall([ | |
connect, | |
setup, | |
run, | |
teardown | |
], function stop(err) { | |
if (err) { | |
throw err; | |
} | |
debug('Stop'); | |
}); | |
function connect(cb) { | |
debug('Connecting'); | |
r.connect({host: host, port: 28015}, cb); | |
} | |
function setup(conn, cb) { | |
conn.use('test'); | |
r.tableList().run(conn, function foundTables(err, tables) { | |
if (err) return cb(err); | |
if (tables.indexOf('message') !== -1) { | |
debug('Deleting previous `message` table'); | |
r.tableDrop('message').run(conn, createTable); | |
} else { | |
createTable(null); | |
} | |
function createTable(err) { | |
if (err) return cb(err); | |
debug('Creating `message` table'); | |
r.tableCreate('message').run(conn, function created(err) { | |
if (err) return cb(err); | |
cb(null, conn); | |
}); | |
} | |
}); | |
} | |
function run(conn, cb) { | |
debug('Inserting'); | |
async.series([warm, write, read], function(err) { | |
return cb(err, conn); | |
}); | |
var keys = []; | |
function warm(cb) { | |
debug('Warming with %d messages', warmMessages.length); | |
console.time('warming'); | |
r | |
.table('message') | |
.insert(warmMessages) | |
.run(conn, function(err) { | |
console.timeEnd('warming'); | |
cb(err); | |
}); | |
} | |
function write(cb) { | |
debug('Benchmarking %d messages', messages.length); | |
console.time('benchmarking'); | |
async.eachLimit(messages, 10000, insert, end); | |
function insert(message, cb) { | |
r | |
.table('message') | |
.insert(message) | |
// some options define how data store command is acknowledged | |
// http://rethinkdb.com/api/javascript/run/ | |
// this is similar to MongoDB "write-concern" | |
// It all boils down to whether or not we want to wait | |
// for our data to be writen to an actual disk to get a response | |
.run(conn, { | |
// durability: 'soft' | |
// noreply: true | |
}, function messageInserted(err, res) { | |
if (err) return cb(err); | |
writes.hit(); | |
keys.push(res.generated_keys[0]); | |
cb(); | |
}); | |
} | |
function end(err) { | |
if (err) return cb(err); | |
console.timeEnd('benchmarking'); | |
writes.end(); | |
cb(null, conn); | |
} | |
} | |
function read(cb) { | |
debug('Reading'); | |
console.time('reading'); | |
// limit to 10000 async ops per second, so we won't burn too much cpu | |
async.eachLimit(keys, 10000, get, end); | |
function get(key, cb) { | |
r | |
.table('message') | |
.get(key) | |
.run(conn, function messageRead(err) { | |
if (err) return cb(err); | |
reads.hit(); | |
cb(); | |
}); | |
} | |
function end(err) { | |
if (err) return cb(err); | |
console.timeEnd('reading'); | |
reads.end(); | |
cb(null, conn); | |
} | |
} | |
} | |
function teardown(conn, cb) { | |
debug('Closing'); | |
conn.close(cb); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment