Created
October 22, 2012 15:38
-
-
Save kiwanami/3932122 to your computer and use it in GitHub Desktop.
http proxy for gridfs (node.js)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// -*- coding: utf-8; -*- | |
var http = require('http'); | |
var Deferred = require('jsdeferred').Deferred; | |
var mongo = require('mongodb'); | |
var config; | |
(function() { | |
var cmdArguments = process.argv.splice(2); | |
if (cmdArguments.length == 0) { | |
console.log("usage: node node-proxy.js (config.js)"); | |
process.exit(1); | |
} | |
config = require(cmdArguments[0]).config; | |
})(); | |
Deferred.constVal = function (x) { | |
return Deferred.next(function() { | |
return x; | |
}); | |
}; | |
// Deferred.onerror = function(e) { | |
// console.log("Deferred error: "+e); | |
// }; | |
// GridFS API wrapped by JSDeferred | |
var GridFS = { | |
dbMap: {}, // dbname -> db object cache | |
_getServerConfig: function() { | |
return new mongo.Server(config.mongo.host, config.mongo.port, {auto_reconnect: true}); | |
}, | |
/** | |
* Connects the DB and return a deferred object for the subsequent tasks. | |
* | |
* @param {String} dbname DB name to connect. | |
* @return {Deferred} a deferred object for the subsequent tasks. | |
*/ | |
getDB: function(dbname) { | |
var that = this; | |
var d = new Deferred(); | |
var db = this.dbMap[dbname]; | |
if (db) return Deferred.constVal(db); | |
db = new mongo.Db(dbname, this._getServerConfig()); | |
db.open(function(err,db) { | |
if (!err) { | |
that.dbMap[dbname] = db; | |
d.call(db); | |
} else { | |
d.fail(err); | |
} | |
}); | |
return d; | |
}, | |
/** | |
* Connects the DB and return a deferred object for the subsequent tasks. | |
* | |
* @param {String} dbname DB name to connect. | |
* @param {Function} callback result. | |
* @param {Function} errorback error. | |
*/ | |
getDBC: function(dbname,callback,errorback) { | |
var that = this; | |
var db = this.dbMap[dbname]; | |
if (db) { | |
callback(db); | |
return; | |
} | |
db = new mongo.Db(dbname, this._getServerConfig()); | |
db.open(function(err,db) { | |
if (!err) { | |
that.dbMap[dbname] = db; | |
callback(db); | |
} else { | |
errorback(err); | |
} | |
}); | |
}, | |
/** | |
* Closes the DB connection. | |
* | |
* @param {String} dbname DB name to close. | |
* @return {Deferred} a deferred object for the subsequent tasks. | |
*/ | |
closeDB: function(dbname) { | |
var that = this; | |
var d = new Deferred(); | |
if (this._db) { | |
this._db.close(true,function(err,result) { | |
if (!err) { | |
that.dbMap[dbname] = undefined; | |
d.call(result); | |
} else { | |
d.fail(err); | |
} | |
}); | |
} | |
return Deferred.constVal(false); | |
}, | |
/** | |
* Closes all connections. | |
* | |
* @param {String} dbname DB name to connect | |
* @return {Deferred} a deferred object for the subsequent | |
* tasks. If the connection fails, the deferred object calls the | |
* error callback. | |
*/ | |
closeAll: function() { | |
var d = new Deferred(); | |
if (this._db) { | |
this._db.close(true,function(err,result) { | |
if (!err) d.call(result); | |
else d.fail(err); | |
}); | |
} | |
return Deferred.constVal(false); | |
}, | |
/** | |
* Retrieves the file and returns a deferred object with a Buffer | |
* instance for the subsequent tasks. | |
* | |
* @param {String} dbname DB name to connect | |
* @param {String} id Object ID | |
* @return {Deferred} a deferred object for the subsequent | |
* tasks. If the connection fails or the file is not found, the | |
* deferred object calls the error callback. | |
*/ | |
getFileAll: function(dbname,id) { | |
var d = Deferred(); | |
this.getDBC(dbname, function(db) { | |
try { | |
var grid = new mongo.Grid(db,'fs'); | |
var oid = new mongo.ObjectID(id); | |
grid.get(oid, function(err, data) { | |
if (!err) d.call(data); | |
else d.fail(err); | |
}); | |
} catch (e) { | |
d.fail(e); | |
} | |
}, function(e) { d.fail(e); }); | |
return d; | |
}, | |
/** | |
* Opens the GridStore and returns a deferred object with a | |
* GridStore instance for the subsequent tasks. | |
* | |
* @param {String} dbname DB name to connect | |
* @param {String} id Object ID | |
* @return {Deferred} a deferred object for the subsequent | |
* tasks. If the connection fails or the file is not found, the | |
* deferred object calls the error callback. | |
*/ | |
getGridStore: function(dbname, id) { | |
var d = Deferred(); | |
this.getDBC(dbname,function(db) { | |
try { | |
var oid = new mongo.ObjectID(id); | |
var gs = new mongo.GridStore(db, oid, 'r'); | |
gs.open(function(err, gridStore) { | |
if (!err) d.call(gridStore); | |
else d.fail(err); | |
}); | |
} catch (e) { | |
Deferred.next(function() { // synchronous? | |
d.fail(e); | |
}); | |
} | |
}, function(e) { d.fail(e); }); | |
return d; | |
}, | |
/** | |
* Iterates the callback over the chunks for the grid file. | |
*/ | |
eachChunks: function(gridStore, chunkNumber) { | |
var d = new Deferred(); | |
var waitTime = 0; | |
gridStore.chunkCollection(function(err, collection) { | |
if(err) return d.fail(err); | |
collection.find({'files_id':gridStore.fileId, | |
'n':{'$gte':chunkNumber}}, | |
function(err, cursor) { | |
// 'find' is synchronous? | |
Deferred.next(function() { | |
try { | |
if(err) d.fail(err); | |
else d.call(new GridFS.CursorMachine(cursor,waitTime)); | |
} catch (e) { | |
d.fail(e); | |
} | |
}); | |
}); | |
return false; | |
}); | |
return d; | |
}, | |
CURSOR_STATES: {MORE: 0, PAUSE: 1, END: 2}, | |
CursorMachine: function(cursor,waitTime) { | |
var nextDeferred = new Deferred(); | |
var that = this; | |
var waiting = false; | |
var f = function(err, chunk) { | |
if (err) return nextDeferred.fail(err); | |
if (!chunk) { | |
if (nextDeferred) { | |
nextDeferred.call(); | |
nextDeferred = null; | |
} | |
return false; | |
} | |
try { | |
waiting = false; | |
var ret = that.callback(chunk); | |
switch(ret) { | |
case GridFS.CURSOR_STATES.MORE: | |
waiting = true; | |
setTimeout(function() { | |
cursor.nextObject(f); | |
},waitTime); | |
break; | |
case GridFS.CURSOR_STATES.PAUSE: | |
break; | |
case GridFS.CURSOR_STATES.END: | |
default: | |
nextDeferred.call(); | |
nextDeferred = null; | |
} | |
} catch (e) { | |
nextDeferred.fail(e); | |
} | |
return false; | |
}; | |
this.cont = function() { | |
if (!waiting) { | |
waiting = true; | |
cursor.nextObject(f); | |
} | |
}; | |
this.start = function(callback) { | |
this.callback = callback; | |
waiting = true; | |
cursor.nextObject(f); | |
return nextDeferred; | |
}; | |
} | |
}; | |
// response implementations | |
function responseServerError(res,err) { | |
res.writeHead(500, {'Content-Type': 'text/plain'}); | |
res.end("Internal Server Error"); | |
} | |
function responseBadRequest(res) { | |
res.writeHead(400, {'Content-Type': 'text/plain'}); | |
res.end("Bad request"); | |
} | |
function responseFileNotFound(res) { | |
res.writeHead(404, {'Content-Type': 'text/plain'}); | |
res.end("File not found"); | |
} | |
function responseFileGen(res, gridStore, totalLength, beginPos, endPos) { | |
var counter = beginPos; | |
var chunkSize = gridStore.chunkSize; | |
var beginChunk = Math.floor(beginPos/chunkSize); | |
var endChunk = Math.floor(endPos/chunkSize); | |
var curChunk = beginChunk; | |
var closed = false; | |
var machine = null; | |
res.on('close',function(e) { | |
closed = true; | |
}); | |
res.on('drain',function(e) { | |
if (machine) machine.cont(); | |
}); | |
var d = GridFS.eachChunks(gridStore,beginChunk).next(function(m) { | |
machine = m; | |
return machine.start(function(chunk) { | |
var writeState; | |
if (closed) return GridFS.CURSOR_STATES.END; | |
// assert that the chunk is an instance of Binary class. | |
if (beginChunk == endChunk) { | |
// part of the chunk | |
var s = beginPos - curChunk*chunkSize; | |
res.write(chunk.data.read(s, endPos-beginPos)); | |
return GridFS.CURSOR_STATES.END; | |
} else if (curChunk == beginChunk) { | |
// first chunk | |
var s = beginPos - curChunk*chunkSize; | |
writeState = res.write(chunk.data.read(s, chunkSize - s)); | |
} else if (curChunk == endChunk) { | |
// last chunk | |
var len = endPos - curChunk*chunkSize; | |
if (len > 0) { | |
res.write(chunk.data.read(0, len)); | |
} | |
return GridFS.CURSOR_STATES.END; | |
} else { | |
// mid chunks | |
writeState = res.write(chunk.data.read(0,chunkSize)); | |
} | |
curChunk++; | |
return (writeState) ? GridFS.CURSOR_STATES.MORE : GridFS.CURSOR_STATES.PAUSE; | |
}); | |
}).next(function() { | |
res.end(); | |
if (gridStore.close) gridStore.close(function() {}); | |
}).error(function(e) { | |
res.end(); | |
gridStore.close(function() {}); | |
}); | |
return d; | |
} | |
function responseAll(res, dbname, id) { | |
return GridFS.getGridStore(dbname,id).next(function(gs) { | |
var len = gs.length; | |
res.writeHead(200, {'Content-Type': gs.contentType, | |
'Content-Length': gs.length, | |
'ETag': gs.md5, | |
'Last-Modified': gs.uploadDate | |
}); | |
// The stream method is slower than chunk loop... | |
return responseFileGen(res,gs,len,0,len); | |
}).error(function(err) { | |
responseFileNotFound(res,err); | |
throw err; | |
}); | |
} | |
function responseRanged(res, dbname, id, beginStr, endStr) { | |
var totalLength, beginPos, endPos, counter; | |
return GridFS.getGridStore(dbname,id).next(function(gs) { | |
totalLength = gs.length; | |
beginPos = (beginStr) ? parseInt(beginStr, 10) : 0; | |
endPos = (endStr) ? parseInt(endStr, 10) : totalLength-1; | |
res.writeHead(206, {'Content-Type': gs.contentType, | |
'Content-Range': "bytes "+beginPos+"-"+endPos+"/"+totalLength}); | |
return responseFileGen(res,gs,totalLength,beginPos,endPos+1); | |
}).error(function(err) { | |
responseFileNotFound(res,err); | |
throw err; | |
}); | |
} | |
// building server | |
http.createServer(function (req, res) { | |
try { | |
var dateTime = new Date().toLocaleString(); | |
var log = function(status,memo) { | |
console.log([ | |
req.headers['x-forwarded-for'] || req.client.remoteAddress, | |
dateTime, req.method, req.url, status, | |
req.headers.referer || '-', | |
req.headers['user-agent'] || '-', | |
memo || '' | |
].join('\t')); | |
}; | |
// dispatch db query | |
var id, dbname; | |
var ds = config.dispatch; | |
for (var i = 0; i < ds.length; i++) { | |
var dmatch = req.url.match(ds[i].regexp); | |
if (dmatch) { | |
id = dmatch[1]; | |
dbname = ds[i].db; | |
break; | |
} | |
} | |
if (id) { | |
var range = req.headers['range']; | |
var mt = (range) ? range.match(/^bytes=(\d*)-(\d*)$/) : null; | |
if (!mt) { | |
responseAll(res,dbname,id).next(function() { | |
log(200); | |
}).error(function(e) { | |
log(404); | |
}); | |
} else { | |
responseRanged(res,dbname,id,mt[1],mt[2]).next(function() { | |
log(206,range); | |
}).error(function(e) { | |
log(404); | |
}); | |
} | |
} else { | |
responseFileNotFound(res); | |
log(404); | |
} | |
} catch (e) { | |
//console.log(e); | |
responseServerError(res,e); | |
log(500,e); | |
} | |
}).listen(config.http.port, '127.0.0.1', function() { | |
console.log("OK"); | |
}); | |
process.on('uncaughtException', function (err) { | |
console.log('Caught exception: ' + err.message); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
exports.config = { | |
mongo: { | |
host: "localhost", port: 28017 | |
}, | |
http: { | |
port: 5001 | |
}, | |
dispatch: [ | |
{regexp: /\/test\/([^?\/.]+)/, db:"test"}, | |
] | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment