Skip to content

Instantly share code, notes, and snippets.

@btmills
Last active July 22, 2019 00:52
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save btmills/5229749 to your computer and use it in GitHub Desktop.
Save btmills/5229749 to your computer and use it in GitHub Desktop.
Use a connection pool to rapidly insert a large number of records into a RethinkDB database.
var r = require('rethinkdb');
// How many connections in the pool?
// 2547 will fail with "(libuv) Failed to create kqueue (24)"
var CONNECTIONS = 2546;
// Insert this many rows
var COUNT = 10000;
// Database to use for the test
var DB = 'test'
// Table to use for the test
var TABLE = 'insertTest';
// Connection info
var CONNINFO = {
host: 'localhost',
port: 28015
};
// Based on http://tomswitzer.net/2011/02/super-simple-javascript-queue/
var Queue = function () {
var first = 0, last = -1, q = [];
var enqueue = function (val) {
q[++last] = val;
};
var dequeue = function () {
if (first > last) // empty
return undefined;
var res = q[first];
delete q[first];
first++;
return res;
};
var length = function () {
return last - first + 1;
};
return {
enqueue: enqueue,
dequeue: dequeue,
length: length
};
};
// Instead of a new connection per request, try pooling connections
var Pool = (function () {
// Queue of callbacks awaiting a connection
var q = new Queue();
// Stack of available connections
var pool = [];
// Initialize the connections in the pool
// Must be called first
// callback is function (err)
var init = function (size, callback) {
if (pool && pool.length < size) { // Add another connection
r.connect(CONNINFO, function (err, conn) {
if (err) return callback(err);
pool.push(conn);
init(size, callback);
});
} else { // Pool has been filled
callback(null);
}
};
// Get a connection from the pool
// callback is function (conn)
var get = function (callback) {
if (pool.length > 0) {
callback(pool.pop());
} else {
q.enqueue(callback);
}
};
// Return a connection to the pool
var done = function (conn) {
// For purposes of experimentation, assume conn is valid
// Would need to check for real-world use
// Either give the connection to the next callback in q
// or return it to the pool
if (q.length() > 0) {
process.nextTick(function () {
(q.dequeue())(conn);
});
} else {
pool.push(conn);
}
};
// Close all connections in the pool
var close = function () {
while (pool.length > 0) {
(pool.pop()).close();
}
};
return {
init: init,
get: get,
done: done,
close: close
};
})();
// Insert a whole bunch
// callback is function (err, iteration)
var insert = function (count, callback) {
console.time('Insert');
for (var i = 1; i <= count; i++) {
(function (i) {
Pool.get(function (conn) {
r.db(DB)
.table(TABLE)
.insert({ number: i })
.run(conn, function (err, res) {
Pool.done(conn);
if (err) {
console.error('Error inserting #%d', i, err);
return callback(err, i);
}
callback(null, i);
});
});
})(i);
}
}
// Insert callback maintains success and fail counts
var inserted = (function() {
var success = 0;
var fail = 0;
// callback is function (success count, fail count)
return function (err, row, callback) {
if (err) {
fail++;
} else {
success++;
}
console.log('Processed %d of %d rows', success + fail, COUNT);
if (success + fail == COUNT) {
callback (success, fail);
}
};
})();
// Everything's done. Let's show some stats.
var done = function (success, fail) {
console.timeEnd('Insert');
console.log('Attempted %d of %d inserts with %d successes and %d failures.',
success + fail, COUNT, success, fail);
Pool.close();
}
// This is almost function composition
Pool.init(CONNECTIONS, function (err) {
if (err) {
console.error('Could not fill connection pool', err);
return Pool.close(); // Some may succeed before failure
}
insert(COUNT, function (err, row) {
// Don't check err here - pass it through
inserted(err, row, done);
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment