Skip to content

Instantly share code, notes, and snippets.

@hagino3000
Created April 12, 2011 04:17
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save hagino3000/914916 to your computer and use it in GitHub Desktop.
Save hagino3000/914916 to your computer and use it in GitHub Desktop.
Mongo transaction module
// For node-mongodb-native (npm install mongodb)
var mongodb = require('mongodb');
var MongoTransaction = function(db) {
this.db = db;
this.queue = [];
this.running = false;
this.failureFn = function(){};
}
MongoTransaction.prototype = {
openDB: function() {
this.queue.push(function(next) {
this.db.open(function(err, client) {
if (err) { throw err; }
next(client);
});
});
return this;
},
insert: function(collectionName, data) {
this.queue.push(function(next, client) {
var collection = new mongodb.Collection(client, collectionName);
data.updatedAt = new Date();
collection.insert(data, function(err, objects) {
if (err) {
console.warn(err.message);
}
if (err && err.message.indexOf('E11000 ') !== -1) {
console.error('this _id was already inserted in the database');
}
next(client);
});
});
return this;
},
removeAll: function(collectionName) {
this.queue.push(function(next, client) {
var collection = new mongodb.Collection(client, collectionName);
collection.remove(function(err, result) {
if (err) {
console.warn(err.message);
}
next(client);
});
});
return this;
},
find: function(collectionName, condition, option) {
this.queue.push(function(next, client) {
var collection = new mongodb.Collection(client, collectionName);
collection.find(condition, option, function(err, cursor) {
if (err) {
throw err;
}
next(cursor);
});
});
return this;
},
exec: function(fn) {
var _this = this;
this.queue.push(function(next, cx) {
fn.call(_this, next, cx);
});
return this;
},
openCursor: function(method) {
this.queue.push(function(next, cursor) {
if (method) {
cursor[method](function(err, arr) {
if (err) {
throw err;
}
next({
arr: arr,
db: cursor.db
});
});
} else {
next({
cursor: cursor,
db: cursor.db
});
}
});
return this;
},
closeDB: function() {
this.queue.push(function(next) {
this.db.close();
next();
});
return this;
},
onFailure: function(fn) {
this.failureFn = fn;
return this;
},
start: function() {
if (!this.running) {
this.running = true;
this._start();
}
},
_start: function(p) {
var fn = this.queue.shift();
if (fn) {
var _this = this;
var callback = function(res) {
_this._start(res);
}
try {
fn.call(this, callback, p);
} catch(e) {
this.running = false;
if (this.db && this.db.state == 'connected') {
this.db.close();
}
this.failureFn(e);
}
} else {
this.running = false;
}
}
}
exports.Transaction = MongoTransaction;
var mongodb = require('mongodb');
var Transaction = require('./mongotransaction').Transaction;
var db;
var case1 = {};
/**
*
*/
case1.setUp = function(next) {
var server = new mongodb.Server("127.0.0.1", 27017, {});
db = new mongodb.Db('test', server, {});
next();
}
/**
*
*/
case1["Open and close DB"] = function(test) {
new Transaction(db)
.exec(function(next) {
test.equal(this.db.state, "notConnected");
next();
})
.openDB()
.exec(function(next, client) {
test.equal(this.db.state, "connected");
next(client);
})
.closeDB()
.exec(function(next, client) {
test.equal(this.db.state, "notConnected");
test.done();
})
.start();
};
/**
*
*/
case1["Insert document"] = function(test) {
var colName = 'unitTest';
new Transaction(db)
.openDB()
.removeAll(colName)
.insert(colName, {name: 'Insert test'})
.find(colName, {}, {})
.openCursor('toArray')
.exec(function(next, cx) {
test.equal(cx.arr.length, 1);
test.equal(cx.arr[0].name, 'Insert test');
test.done();
next(cx.db);
})
.closeDB()
.start();
}
case1["Handle Exception and close DB"] = function(test) {
var colName = 'unitTest';
new Transaction(db)
.openDB()
.removeAll(colName)
.find(colName, {}, {})
.openCursor('toArray')
.exec(function(next, cx) {
// Throws exception (cx.arr[10] is undefined)
cx.arr[10].toString();
test.ok(false, "This line shoud not be called");
})
.onFailure(function(e) {
test.equal(this.db.state, 'notConnected');
test.ok(true, "handle failure succeeded");
test.done();
})
.start();
}
module.exports = require('nodeunit').testCase(case1);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment