Skip to content

Instantly share code, notes, and snippets.

@getify
Last active August 29, 2015 14:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save getify/bba5ec0de9d6047b720e to your computer and use it in GitHub Desktop.
Save getify/bba5ec0de9d6047b720e to your computer and use it in GitHub Desktop.
asynquence "reactive sequences" for handling http request/response streams
// Inspired by/adapted from: https://gist.github.com/totherik/4fb1784f008815ac82e1
var http = require("http");
var ASQ = require("asynquence-contrib"); // bring in ASQ + optional contrib plugins
var server, source;
server = http.createServer();
server.setTimeout(30000);
server.listen(8000);
// setup reactive listeners
source = ASQ.react(function setup(next,registerTeardown){
server.addListener("request",next);
server.addListener("close",this.stop);
registerTeardown(function(){
server.removeListener("request",next);
server.removeListener("close",source.stop);
});
});
// subscription
source
.then(function onNext(done,req,res){
req.setEncoding("utf8");
res.setHeader("Content-Type","text/html");
var body = "", collector;
// setup reactive listeners
collector = ASQ.react(function setup(next,registerTeardown){
next.onStream(req); // listen for standard stream events
req.on("end",onEnd);
req.resume(); // is this needed?
registerTeardown(function(){
next.unStream(req); // undo standard stream events
req.removeListener("end",onEnd);
});
});
// subscription
collector
.val(onNext)
.or(onError);
function onNext(x){
if (x instanceof Error) throw x;
body += x;
}
function onEnd() {
res.statusCode = 200;
res.end(" .. ");
collector.stop();
done();
}
function onError(e) {
res.statusCode = 500;
res.end();
collector.stop();
done.fail(e);
}
})
.then(function onComplete(){
console.log("complete");
})
.or(function onError(e){
console.error(e);
});
process.on("SIGINT",source.stop);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment