Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
RxJS HTTP Server
// Rx HTTP Server
// first attempt at creating a minimal reactive nodejs HTTP server
//
// has:
// * url/querystring parsing
// * body parsing
// * some kind of routing
// * some kind of error handling
// * minimalist request logging
//
// has not:
// * robust routing
// * cookie parsing
// * solid error handling
// * file uploads
const Rx = require("rxjs/Rx"),
fs = require("fs"),
http = require("http"),
url = require("url"),
mime = require("mime");
// creates Observable from a nodejs ReadableStream
const fromReadableStream = stream => {
stream.pause();
return Rx.Observable.create(observer => {
let next = chunk => observer.next(chunk),
complete = () => observer.complete(),
error = err => observer.error(err);
stream
.on('data', next)
.on('error', error)
.on('end', complete)
.resume();
return () => {
stream.removeListener('data',next);
stream.removeListener('error',error);
stream.removeListener('end',complete);
};
}).share();
};
// creates an HTTP server observable
// I wonder if this is a case where I should be considering doing some OOP stuff with a RxServer class or some nonesense... nah
const createHttpRxStream = (http,port) => {
return Rx.Observable
.create(observer => {
// create a http server that emits a connection event of the request and response objects
const server = http.createServer((req,res) => observer.next({req,res})).listen(port);
// close the server as our unsubscriber fn
return server.close.bind(server);
});
};
// url parsing middleware
// add querystring object and other URL parsed data to the request object
const urlParser = ({req}) => {
const urlObj = url.parse(req.url,true);
req.query = urlObj.query;
req.hash = urlObj.hash;
req.pathname = urlObj.pathname;
req.search = urlObj.search;
};
// request logging middleware
// log the incoming request data
const logger = ({req}) => console.log(`${req.headers.host} - - ${req.method} ${req.headers['content-type'] || '-'} ${req.url} - ${req.headers['user-agent'] || '-'}`);
// body parsing middleware
// adds the `rawBody` buffer and the parsed `body` object/string to the request object
// this returns an observable so it needs to be added to the stream using `flatMap` rather than `do` or regular `map`
const bodyParser = (conn) => {
let { req, res } = conn;
if(req.method!="POST" && req.method != "PUT" && req.method != "PATCH") return Rx.Observable.of(conn);
// PRO - will allow subsequent handlers to have ready access to the body data
// CON - waits on body parsing before continuing operation, so small lag on requests that have no need for the body data
let body_ = fromReadableStream(req)
.toArray()
.map(chunks => Buffer.concat(chunks))
.do(rawBody => req.rawBody=rawBody)
.map(rawBody => {
switch(req.headers["content-type"]) {
case "application/json":
case "application/javascript":
return JSON.parse(rawBody);
default:
return rawBody.toString();
}
})
.do(body => req.body=body)
.do(body => console.log("request body: ", req.body, typeof req.body))
.map(body => conn)
.catch(err => {
console.log("Error caught: ", err);
if(err instanceof SyntaxError) {
res.writeHead(400, { 'Content-Type': 'text/plain' });
res.end("Bad JSON");
} else {
res.writeHead(500, { 'Content-Type': 'text/plain' });
res.end("Internal Server Error");
}
return body_;
});
return body_;
};
// this is our trunkline subscription endpoint (sink)
const _404 = {
"next": ({res}) => {
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.end("Not Found");
},
"error": err => console.log("default server connection stream error: ", err),
"complete": () => console.log("default server connection stream completed")
};
// this is our index handler, this is where we'd deliver the root of a client web app
const _index = {
"next": ({res}) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end("Hello World\n");
},
"error": err => console.log("index server connection stream error: ", err),
"complete": () => console.log("index server connection stream completed")
};
// this our interaction tracking handler, all tracking requests (POST /interaction) should come here
const _tracker = {
"next": ({res}) => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end('{ "ok": true }');
},
"error": err => console.log("`tracking` resource connection stream error: ", err),
"complete": () => console.log("`tracking` resource connection stream completed")
}
// take a folder path and make a subscriber which will server matching files from that path
// ideally also has a mechanism to re-emit connections into another stream
const createStaticSubscriber = (dir) => {
return {
"next": (conn) => {
let {req,res} = conn,
pathname = __dirname + dir + (req.pathname=="/" ? "/index.html" : req.pathname);
console.log("get static file at ", pathname);
fs.readFile(pathname, (err,file) => {
if(err) {
if(err.code=="ENOENT") {
// TODO - fix up this stuff here
console.log("static file 404");
return _404.next(conn);
}
console.log("problem getting the file", err);
res.writeHead(400, { 'Content-Type': 'text/plain' });
res.end(err.message);
return;
}
res.writeHead(200, { 'Content-Type': mime.lookup(pathname) });
res.end(file.toString());
});
},
"error": err => console.log(`${dir} static resource connection stream error: `, err),
"complete": () => console.log(`${dir} static resource connection stream completed`)
}
};
// this actually creates our server stream and sets it up to share the events
const server_ = Rx.Observable
.onErrorResumeNext(
createHttpRxStream(http,8000)
.do(urlParser)
.do(logger)
.flatMap(bodyParser))
.share();
// Rx Routing
// take a trunk stream and a dictionary of branching predicate functions
// return a matching dictionary of branch streams which produce events from the trunk stream if passing the predicate
// adds a default branch to the returned dictionary which produces all the events that matched none of the predicates
// to consider, adding some kind of "break" or stopPropagation functionality to stop the event if it matches
const branchStream = (trunk$,cases) => {
let branches = {},
branchList = [];
Object.keys(cases)
.forEach(k=>{
let predicate = cases[k];
branch = new Rx.Subject();
branches[k] = branch;
branchList.push([predicate,branch]);
});
branches.default = new Rx.Subject();
trunk$.subscribe({
next: (e)=>{
let gutter = true;
branchList.forEach(([predicate,branch])=>{
if(predicate(e)) {
branch.next(e);
gutter=false;
}
});
if(gutter) {
branches.default.next(e);
}
},
error: err=>console.error(err),
complete: ()=>{
branchList.forEach(([predicate,branch])=>branch.complete());
}
});
return branches;
}
const routes = {
"getAll$": ({req})=>(req.method=="GET"),
"postInteraction$": ({req})=>(req.pathname=="/interaction"&&req.method=="POST")
},
Router = branchStream(server_,routes);
// console.log("our router!", Router);
// GET static subscription
// should handle all requests in this stream with a static file or 404
// need to consider various test cases for propagating events that match multiple routes
Router.getAll$.subscribe(createStaticSubscriber("/public"));
// POST Interaction subscription
// we should try to collect over a time interval and process all the interaction requests at once
Router.postInteraction$.subscribe(_tracker);
// Send everything that isn't routed somewhere to 404ville
Router.default.subscribe(_404);
;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.