Skip to content

Instantly share code, notes, and snippets.

@tlivings
Last active April 18, 2016 16:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tlivings/4c0a5ce06d2d86667a8194bae381fe0a to your computer and use it in GitHub Desktop.
Save tlivings/4c0a5ce06d2d86667a8194bae381fe0a to your computer and use it in GitHub Desktop.
RxJS based web server basics
const server = new RxServer(Http.createServer(), 3000);
server.flatMap(
//Read request content
({ request, response }) => {
return request.toArray().map((body) => {
request.payload = Buffer.concat(body);
return {request, response};
});
}
).subscribe(
({ request, response }) => {
response.next('success.');
response.complete();
}
);
import Http from 'http';
import { Observable, Subscriber} from 'rxjs/Rx';
import { debuglog as Debuglog } from 'util';
class RxServer extends Observable {
constructor(server = Http.createServer(), on = 80) {
const log = Debuglog('rxserver');
super((observer) => {
const onRequest = (request, response) => {
log(`request (url=${request.url}).`);
observer.next({ request: new RxRequest(request), response: new RxResponse(response) });
};
const onClose = () => {
log('closed.');
observer.complete();
};
const onError = (error) => {
log(`error (message=${error.message}).`);
observer.error(error);
};
server.on('request', onRequest);
server.on('close', onClose);
server.on('error', onError);
server.listen(on);
return () => {
log('disposed.');
server.removeListener('request', onRequest);
server.removeListener('close', onClose);
server.removeListener('error', onError);
}
});
this.raw = server;
}
}
class RxRequest extends Observable {
constructor(request) {
const log = Debuglog('rxserver/request');
super((observer) => {
const onReadable = () => {
let chunk;
log('readable.');
while ((chunk = request.read()) !== null) {
observer.next(chunk);
}
};
const onEnd = () => {
log('end.');
observer.complete();
};
const onError = (error) => {
log(`error (message=${error.message}).`);
observer.error(error);
};
request.on('readable', onReadable);
request.once('error', onError);
request.once('end', onEnd);
return () => {
log('disposed.');
request.removeListener('readable', onReadable);
request.removeListener('end', onEnd);
request.removeListener('error', onError);
};
});
this.raw = request;
}
}
class RxResponse extends Subscriber {
constructor(response) {
const log = Debuglog('rxserver/response');
const self = this;
super(
(chunk) => {
sendHeaders();
log(`write (length=${chunk ? chunk.length : 0}).`);
response.write(chunk);
},
(error) => {
log(`error (message=${error.message}).`);
self.status = 500;
response.emit('error', error);
},
() => {
log('end.');
sendHeaders();
response.end();
}
);
this.raw = response;
this.status = undefined;
this.headers = {};
const sendHeaders = () => {
if (!response.headersSent) {
const status = self.status || 200;
log(`write head (status=${status}).`);
response.writeHead(status, self.headers);
}
};
}
}
export { RxServer as default, RxRequest, RxResponse };
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment