Skip to content

Instantly share code, notes, and snippets.

@kiwanami
Created October 22, 2012 15:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kiwanami/3932122 to your computer and use it in GitHub Desktop.
Save kiwanami/3932122 to your computer and use it in GitHub Desktop.
http proxy for gridfs (node.js)
// -*- coding: utf-8; -*-
var http = require('http');
var Deferred = require('jsdeferred').Deferred;
var mongo = require('mongodb');
var config;
(function() {
var cmdArguments = process.argv.splice(2);
if (cmdArguments.length == 0) {
console.log("usage: node node-proxy.js (config.js)");
process.exit(1);
}
config = require(cmdArguments[0]).config;
})();
Deferred.constVal = function (x) {
return Deferred.next(function() {
return x;
});
};
// Deferred.onerror = function(e) {
// console.log("Deferred error: "+e);
// };
// GridFS API wrapped by JSDeferred
var GridFS = {
dbMap: {}, // dbname -> db object cache
_getServerConfig: function() {
return new mongo.Server(config.mongo.host, config.mongo.port, {auto_reconnect: true});
},
/**
* Connects the DB and return a deferred object for the subsequent tasks.
*
* @param {String} dbname DB name to connect.
* @return {Deferred} a deferred object for the subsequent tasks.
*/
getDB: function(dbname) {
var that = this;
var d = new Deferred();
var db = this.dbMap[dbname];
if (db) return Deferred.constVal(db);
db = new mongo.Db(dbname, this._getServerConfig());
db.open(function(err,db) {
if (!err) {
that.dbMap[dbname] = db;
d.call(db);
} else {
d.fail(err);
}
});
return d;
},
/**
* Connects the DB and return a deferred object for the subsequent tasks.
*
* @param {String} dbname DB name to connect.
* @param {Function} callback result.
* @param {Function} errorback error.
*/
getDBC: function(dbname,callback,errorback) {
var that = this;
var db = this.dbMap[dbname];
if (db) {
callback(db);
return;
}
db = new mongo.Db(dbname, this._getServerConfig());
db.open(function(err,db) {
if (!err) {
that.dbMap[dbname] = db;
callback(db);
} else {
errorback(err);
}
});
},
/**
* Closes the DB connection.
*
* @param {String} dbname DB name to close.
* @return {Deferred} a deferred object for the subsequent tasks.
*/
closeDB: function(dbname) {
var that = this;
var d = new Deferred();
if (this._db) {
this._db.close(true,function(err,result) {
if (!err) {
that.dbMap[dbname] = undefined;
d.call(result);
} else {
d.fail(err);
}
});
}
return Deferred.constVal(false);
},
/**
* Closes all connections.
*
* @param {String} dbname DB name to connect
* @return {Deferred} a deferred object for the subsequent
* tasks. If the connection fails, the deferred object calls the
* error callback.
*/
closeAll: function() {
var d = new Deferred();
if (this._db) {
this._db.close(true,function(err,result) {
if (!err) d.call(result);
else d.fail(err);
});
}
return Deferred.constVal(false);
},
/**
* Retrieves the file and returns a deferred object with a Buffer
* instance for the subsequent tasks.
*
* @param {String} dbname DB name to connect
* @param {String} id Object ID
* @return {Deferred} a deferred object for the subsequent
* tasks. If the connection fails or the file is not found, the
* deferred object calls the error callback.
*/
getFileAll: function(dbname,id) {
var d = Deferred();
this.getDBC(dbname, function(db) {
try {
var grid = new mongo.Grid(db,'fs');
var oid = new mongo.ObjectID(id);
grid.get(oid, function(err, data) {
if (!err) d.call(data);
else d.fail(err);
});
} catch (e) {
d.fail(e);
}
}, function(e) { d.fail(e); });
return d;
},
/**
* Opens the GridStore and returns a deferred object with a
* GridStore instance for the subsequent tasks.
*
* @param {String} dbname DB name to connect
* @param {String} id Object ID
* @return {Deferred} a deferred object for the subsequent
* tasks. If the connection fails or the file is not found, the
* deferred object calls the error callback.
*/
getGridStore: function(dbname, id) {
var d = Deferred();
this.getDBC(dbname,function(db) {
try {
var oid = new mongo.ObjectID(id);
var gs = new mongo.GridStore(db, oid, 'r');
gs.open(function(err, gridStore) {
if (!err) d.call(gridStore);
else d.fail(err);
});
} catch (e) {
Deferred.next(function() { // synchronous?
d.fail(e);
});
}
}, function(e) { d.fail(e); });
return d;
},
/**
* Iterates the callback over the chunks for the grid file.
*/
eachChunks: function(gridStore, chunkNumber) {
var d = new Deferred();
var waitTime = 0;
gridStore.chunkCollection(function(err, collection) {
if(err) return d.fail(err);
collection.find({'files_id':gridStore.fileId,
'n':{'$gte':chunkNumber}},
function(err, cursor) {
// 'find' is synchronous?
Deferred.next(function() {
try {
if(err) d.fail(err);
else d.call(new GridFS.CursorMachine(cursor,waitTime));
} catch (e) {
d.fail(e);
}
});
});
return false;
});
return d;
},
CURSOR_STATES: {MORE: 0, PAUSE: 1, END: 2},
CursorMachine: function(cursor,waitTime) {
var nextDeferred = new Deferred();
var that = this;
var waiting = false;
var f = function(err, chunk) {
if (err) return nextDeferred.fail(err);
if (!chunk) {
if (nextDeferred) {
nextDeferred.call();
nextDeferred = null;
}
return false;
}
try {
waiting = false;
var ret = that.callback(chunk);
switch(ret) {
case GridFS.CURSOR_STATES.MORE:
waiting = true;
setTimeout(function() {
cursor.nextObject(f);
},waitTime);
break;
case GridFS.CURSOR_STATES.PAUSE:
break;
case GridFS.CURSOR_STATES.END:
default:
nextDeferred.call();
nextDeferred = null;
}
} catch (e) {
nextDeferred.fail(e);
}
return false;
};
this.cont = function() {
if (!waiting) {
waiting = true;
cursor.nextObject(f);
}
};
this.start = function(callback) {
this.callback = callback;
waiting = true;
cursor.nextObject(f);
return nextDeferred;
};
}
};
// response implementations
function responseServerError(res,err) {
res.writeHead(500, {'Content-Type': 'text/plain'});
res.end("Internal Server Error");
}
function responseBadRequest(res) {
res.writeHead(400, {'Content-Type': 'text/plain'});
res.end("Bad request");
}
function responseFileNotFound(res) {
res.writeHead(404, {'Content-Type': 'text/plain'});
res.end("File not found");
}
function responseFileGen(res, gridStore, totalLength, beginPos, endPos) {
var counter = beginPos;
var chunkSize = gridStore.chunkSize;
var beginChunk = Math.floor(beginPos/chunkSize);
var endChunk = Math.floor(endPos/chunkSize);
var curChunk = beginChunk;
var closed = false;
var machine = null;
res.on('close',function(e) {
closed = true;
});
res.on('drain',function(e) {
if (machine) machine.cont();
});
var d = GridFS.eachChunks(gridStore,beginChunk).next(function(m) {
machine = m;
return machine.start(function(chunk) {
var writeState;
if (closed) return GridFS.CURSOR_STATES.END;
// assert that the chunk is an instance of Binary class.
if (beginChunk == endChunk) {
// part of the chunk
var s = beginPos - curChunk*chunkSize;
res.write(chunk.data.read(s, endPos-beginPos));
return GridFS.CURSOR_STATES.END;
} else if (curChunk == beginChunk) {
// first chunk
var s = beginPos - curChunk*chunkSize;
writeState = res.write(chunk.data.read(s, chunkSize - s));
} else if (curChunk == endChunk) {
// last chunk
var len = endPos - curChunk*chunkSize;
if (len > 0) {
res.write(chunk.data.read(0, len));
}
return GridFS.CURSOR_STATES.END;
} else {
// mid chunks
writeState = res.write(chunk.data.read(0,chunkSize));
}
curChunk++;
return (writeState) ? GridFS.CURSOR_STATES.MORE : GridFS.CURSOR_STATES.PAUSE;
});
}).next(function() {
res.end();
if (gridStore.close) gridStore.close(function() {});
}).error(function(e) {
res.end();
gridStore.close(function() {});
});
return d;
}
function responseAll(res, dbname, id) {
return GridFS.getGridStore(dbname,id).next(function(gs) {
var len = gs.length;
res.writeHead(200, {'Content-Type': gs.contentType,
'Content-Length': gs.length,
'ETag': gs.md5,
'Last-Modified': gs.uploadDate
});
// The stream method is slower than chunk loop...
return responseFileGen(res,gs,len,0,len);
}).error(function(err) {
responseFileNotFound(res,err);
throw err;
});
}
function responseRanged(res, dbname, id, beginStr, endStr) {
var totalLength, beginPos, endPos, counter;
return GridFS.getGridStore(dbname,id).next(function(gs) {
totalLength = gs.length;
beginPos = (beginStr) ? parseInt(beginStr, 10) : 0;
endPos = (endStr) ? parseInt(endStr, 10) : totalLength-1;
res.writeHead(206, {'Content-Type': gs.contentType,
'Content-Range': "bytes "+beginPos+"-"+endPos+"/"+totalLength});
return responseFileGen(res,gs,totalLength,beginPos,endPos+1);
}).error(function(err) {
responseFileNotFound(res,err);
throw err;
});
}
// building server
http.createServer(function (req, res) {
try {
var dateTime = new Date().toLocaleString();
var log = function(status,memo) {
console.log([
req.headers['x-forwarded-for'] || req.client.remoteAddress,
dateTime, req.method, req.url, status,
req.headers.referer || '-',
req.headers['user-agent'] || '-',
memo || ''
].join('\t'));
};
// dispatch db query
var id, dbname;
var ds = config.dispatch;
for (var i = 0; i < ds.length; i++) {
var dmatch = req.url.match(ds[i].regexp);
if (dmatch) {
id = dmatch[1];
dbname = ds[i].db;
break;
}
}
if (id) {
var range = req.headers['range'];
var mt = (range) ? range.match(/^bytes=(\d*)-(\d*)$/) : null;
if (!mt) {
responseAll(res,dbname,id).next(function() {
log(200);
}).error(function(e) {
log(404);
});
} else {
responseRanged(res,dbname,id,mt[1],mt[2]).next(function() {
log(206,range);
}).error(function(e) {
log(404);
});
}
} else {
responseFileNotFound(res);
log(404);
}
} catch (e) {
//console.log(e);
responseServerError(res,e);
log(500,e);
}
}).listen(config.http.port, '127.0.0.1', function() {
console.log("OK");
});
process.on('uncaughtException', function (err) {
console.log('Caught exception: ' + err.message);
});
exports.config = {
mongo: {
host: "localhost", port: 28017
},
http: {
port: 5001
},
dispatch: [
{regexp: /\/test\/([^?\/.]+)/, db:"test"},
]
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment