Skip to content

Instantly share code, notes, and snippets.

@eiriklv
Last active March 9, 2019 14:14
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save eiriklv/ea8c9dbbae3069930054d3393b076808 to your computer and use it in GitHub Desktop.
Save eiriklv/ea8c9dbbae3069930054d3393b076808 to your computer and use it in GitHub Desktop.
Channels and websockets
/**
* Import dependencies
*/
const { EventEmitter } = require('events');
const ReconnectingWebSocket = require('reconnecting-websocket');
/**
* Export a function that creates a websocket connection interface
*/
export default function createReconnectingWebsocket(uri) {
/**
* Flag to determine if socket has been opened and is ready for sending messages
*/
let isOpen = false;
/**
* Create a message queue for all messages that have been requested sent before the socket opened
*/
let messageQueue = [];
/**
* Create an event emitter
*/
const emitter = new EventEmitter();
/**
* Create error handler
*/
emitter.on('error', console.log);
/**
* Create a new websocket connection instance
*/
const websocket = new ReconnectingWebSocket(uri);
/**
* Handle opening of the websocket
*/
websocket.onopen = (evt) => {
isOpen = true;
messageQueue.forEach((msg) => websocket.send(JSON.stringify(msg)));
};
/**
* Handle closing of the websocket
*/
websocket.onclose = (evt) => {
isOpen = false;
};
/**
* Handle websocket messages
*/
websocket.onmessage = (evt) => {
/**
* Try to parse the message if it is JSON
*/
let parsedMessage;
try {
parsedMessage = JSON.parse(evt.data);
} catch (e) {}
emitter.emit('message', parsedMessage || evt.data);
};
/**
* Handle errors on the websocket
*/
websocket.onerror = (evt) => {
isOpen = false;
};
/**
* Return the interface
*/
return {
send(msg) {
if (isOpen) {
websocket.send(JSON.stringify(msg))
} else {
messageQueue.push(msg);
}
},
close() {
websocket.close();
return emitter.removeAllListeners();
},
on(chan, listener) {
return emitter.on(chan, listener);
},
removeListener(chan, listener) {
return emitter.removeListener(chan, listener);
}
}
}
/**
* Dependencies
*/
import { eventChannel } from 'redux-saga';
/**
* Abstraction for creating socket channels
*/
function createSocketChannel(socket) {
return eventChannel(emit => {
const messageHandler = (data) => emit(data);
socket.on('message', messageHandler);
return () => socket.removeListener('message', messageHandler);
});
}
/**
* Import dependencies
*/
import { take, call, apply, put, fork } from 'redux-saga/effects';
/**
* Import helpers
*/
import createReconnectingWebsocket from '../utils/create-reconnecting-websocket';
import createSocketChannel from '../utils/create-socket-channel';
/**
* Import action types
*/
import {
SEND_SOCKET_MESSAGE,
} from '../ducks/whatever';
/**
* Import action creators
*/
import {
someAction,
someOtherAction,
} from '../ducks/whatever';
/**
* Initialization saga
*/
export default function* initializeSocket() {
const socket = yield call(createWebsocket, 'ws://socket.url');
const socketChannel = yield call(createSocketChannel, socket);
yield [
fork(sendSocketMessages, socket),
fork(receiveSocketMessages, socketChannel),
];
}
/**
* Receive socket messages
*/
function* receiveSocketMessages(socketChannel) {
while (true) {
const message = yield take(socketChannel);
/**
* Map websocket messages to actions
*/
switch (message.type) {
case 'some-message':
yield put(someAction(message.payload));
break;
case 'some-other-message':
yield put(someOtherAction(message.payload));
break;
default:
break;
}
}
}
/**
* Send socket messages
*/
function* sendSocketMessages(socket) {
while (true) {
const message = yield take(SEND_SOCKET_MESSAGE);
yield apply(socket, socket.send, [message])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment