Skip to content

Instantly share code, notes, and snippets.

@brzez
Created July 5, 2018 05:59
Show Gist options
  • Save brzez/7ccb625f7f431b95f16f13b6c54c6b0e to your computer and use it in GitHub Desktop.
Save brzez/7ccb625f7f431b95f16f13b6c54c6b0e to your computer and use it in GitHub Desktop.
hapi 16 sse
// @flow
import {PassThrough} from 'stream';
class SSEStream extends PassThrough {
_compressor = null;
_read (size) {
super._read(size);
if (this._compressor) {
this._compressor.flush();
}
}
setCompressor (compressor) {
this._compressor = compressor;
}
}
type Emit = {
(type: string, payload: any): void,
};
export default {
name: 'sse',
version: '0.0.1',
async register (server: any, options: any) {
server.decorate('toolkit', 'sse', function (onReady: Emit) {
/*
usage
in response handler:
return h.sse(emit => ...)
emit(type, data)
*/
const stream = new SSEStream();
let onErrorListener = () => {};
let onEndListener = () => {};
function emit (type, payload) {
stream.write(`event: ${type}\ndata: ${typeof payload === 'string' ? payload : JSON.stringify(payload)}\n\n`);
}
this.request.raw.req.socket.on('error', (...args) => onErrorListener.apply(null, args));
this.request.raw.req.socket.on('end', (...args) => onEndListener.apply(null, args));
setTimeout(() => onReady({
emit,
onError: callback => {
onErrorListener = callback;
},
onEnd: callback => {
onEndListener = callback;
}
}), 0);
return this.response(stream)
.type('text/event-stream')
.header('Cache-Control', 'no-cache, no-transform')
.header('Connection', 'keep-alive')
.header('X-Accel-Buffering', 'no')
.header('access-control-allow-origin', '*');
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment