Created
February 1, 2015 18:01
-
-
Save kiasaki/eae4bfbd89f3884052ca to your computer and use it in GitHub Desktop.
Node.js RethinkDB connect helper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// imports | |
var host = '0.0.0.0'; | |
var port = process.env.PORT || 8080; | |
// ... all kind of setup ... | |
// Routes | |
app.use(rethinkdb.connectMiddleware); | |
app.use(require('./routes')); | |
app.use(rethinkdb.disconnectMiddleware); | |
// Error handling | |
require('./lib/errors-handler').init(app); | |
// Open DB connection and make sure tables and indexes exist | |
rethinkdb.init([{ | |
name: 'notebooks', | |
indexes: ['name'] | |
}, { | |
name: 'notes', | |
indexes: ['name'] | |
}], function(err) { | |
if (err) { | |
console.error(err); | |
return process.exit(1); | |
} | |
console.log('listening on port ' + port); | |
app.listen(port, host); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var url = require('url'); | |
var async = require('async'); | |
var r = require('rethinkdb'); | |
var internals = {}; | |
internals.config = function() { | |
var rethinkdb_url = url.parse( | |
process.env.RETHINKDB_URL || | |
'rethinkdb://localhost:28015/marks' | |
); | |
return { | |
host: rethinkdb_url.hostname, | |
port: rethinkdb_url.port, | |
authKey: rethinkdb_url.auth || '', | |
db: rethinkdb_url.pathname.slice(1) | |
}; | |
}; | |
// Middleware to `.use` before any http handler | |
internals.connectMiddleware = function(req, res, next) { | |
r.connect(internals.config(), function(err, conn) { | |
if (err) return next(err); | |
req.rdbConnection = conn; | |
next(); | |
}); | |
}; | |
// Middleware to `.use` after all http handlers that closes rdb conns | |
internals.disconnectMiddleware = function(req, res, next) { | |
req.rdbConnection.close(); | |
next(); | |
}; | |
// Takes an Array of objects that have a name and indexes: | |
// [{ | |
// name: 'todos', | |
// indexes: ['createdAt'] | |
// }] | |
// | |
// Will ensure configured database exist, all tables exist, | |
// all indexes are created and wait for them to complete then | |
// callback, or, callback early if an error occurs in the process | |
internals.init = function(tableStructure, callback) { | |
var config = internals.config(); | |
async.auto({ | |
// Connect to RethinkDB | |
connection: function(step) { | |
r.connect(config, step); | |
}, | |
// Retrive database list | |
databaseList: ['connection', function(step, results) { | |
r.dbList().run(results.connection, step); | |
}], | |
// Create database as configured only if it's not | |
// already existing | |
createDbIfNotExist: ['databaseList', function(step, results) { | |
if (results.databaseList.indexOf(config.db) === -1) { | |
console.log('[RethinkDB] Creating database ' + config.db); | |
r.dbCreate(config.db).run(results.connection, step); | |
} else { | |
console.log('[RethinkDB] Database ' + config.db + ' exists'); | |
step(); | |
} | |
}], | |
// Get a list of table so we can skip creation of | |
// exiting ones | |
tableList: ['createDbIfNotExist', function(step, results) { | |
r.tableList().run(results.connection, step); | |
}], | |
// For each table definition passed | |
// - create table if needed | |
// - create indexes and wait for completion | |
createTables: ['tableList', function(step, results) { | |
console.log('[RethinkDB] Existing tables: [' + results.tableList.join(', ') + ']'); | |
async.each(tableStructure, function(tableDefinition, nextItem) { | |
async.waterfall([ | |
// If table doesn't exist yet, create it | |
function(cb) { | |
if (results.tableList.indexOf(tableDefinition.name) === -1) { | |
console.log('[RethinkDB] Creating table ' + tableDefinition.name); | |
r.tableCreate(tableDefinition.name).run(results.connection, cb); | |
} else { cb(null, null); } | |
}, | |
// Gather the existing indexes for current table | |
function(lastResult, cb) { | |
r.table(tableDefinition.name).indexList().run(results.connection, cb); | |
}, | |
// Now the main course, create indexes and wait for them if they do | |
// not already exist for table | |
function(existingIndexes, cb) { | |
// Go one level deeper, indexes is an array | |
async.each(tableDefinition.indexes, function(index, nextIndex) { | |
if (existingIndexes.indexOf(index) === -1) { | |
// Ok, now whatever, wont use an other asyns.waterfall or series | |
// let's use plain callbacks | |
console.log('[RethinkDB] Table ' + tableDefinition.name + ': Creating index ' + index); | |
r.table(tableDefinition.name).indexCreate(index) | |
.run(results.connection, function(err) { | |
if (err) return nextIndex(err); | |
r.table(tableDefinition.name).indexWait(index) | |
.run(results.connection, nextIndex); | |
}); | |
} else { nextIndex(); } | |
}, cb); | |
} | |
], nextItem); | |
}, step); | |
}] | |
}, function(err, results) { | |
results.connection.close(); | |
callback(err); | |
}); | |
}; | |
module.exports = internals; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment