Skip to content

Instantly share code, notes, and snippets.

@sergiodxa
Created August 17, 2016 17:29
Show Gist options
  • Save sergiodxa/245515a4673d9f902b608029847269ff to your computer and use it in GitHub Desktop.
Save sergiodxa/245515a4673d9f902b608029847269ff to your computer and use it in GitHub Desktop.
Example possible usage of Rx for (fake) WebSocket Server
import { Observable } from 'rx';
import { EventEmitter } from 'events';
import { Map as map } from 'immutable';
// base para crear sockets
class Socket extends EventEmitter {
constructor(id) {
super(id);
this.id = id;
}
}
// servidor de sockets y sistema de colas
const wss = new EventEmitter();
const queue = new EventEmitter();
// mapa de sockets activos
let sockets = map();
// sockets conectados
const sockets$ = Observable
.fromEvent(wss, 'connection');
// agregar sockets al mapa
sockets$
.forEach(socket => { sockets = sockets.set(socket.id.toString(), socket) });
// mensajes del sistema de colas
const queueMessage$ = Observable
.fromEvent(queue, 'message');
// mensajes de sockets + sistema de cola
const messages$ = sockets$
.map(socket => Observable.fromEvent(socket, 'message'))
.mergeAll()
.merge(queueMessage$)
.filter(message => sockets.has(message.meta.user.id.toString()))
// desconexiones de sockets
const close = sockets$
.map(socket => Observable.fromEvent(socket, 'close'))
.mergeAll();
// mensajes de tipo join:live
const joinLive$ = messages$
.filter(message => message.type === 'join:live');
// mensajes de tipo chat:message
const chatMessage$ = messages$
.filter(message => message.type === 'chat:message');
// mensajes de tipo notificación
const notification$ = messages$
.filter(message => message.type === 'notification');
// procesamiento de tipos de mensajes
joinLive$
.bufferWithTime(100)
.filter(messages => messages.length > 0)
.forEach(::console.log);
chatMessage$
.forEach(::console.log);
notification$
.map(notification => ({
...notification,
socket: sockets.get(notification.meta.user.id.toString())
}))
.forEach(::console.log)
// .forEach(notification => notification.socket.send(notification.payload));
// =====
// conectar sockets
let used = false;
for (let i = 0; i < 5; i++) {
wss
.emit(
'connection',
new Socket(!used ? 862 : Math.floor(Math.random() * 100) + 1)
);
used = true
}
// crear datos de mensajes
function createAction() {
const n = Math.floor(Math.random() * 2) + 1;
if (n === 1) {
return {
type: 'join:live',
payload: 862,
meta: {
user: {
id: 862,
},
},
};
}
return {
type: 'chat:message',
payload: 'hello world',
meta: {
user: {
id: Math.floor(Math.random() * 100) + 1,
},
},
};
}
// enviar mensajes cada 200ms
setInterval(
() => sockets
.forEach(socket => socket.emit('message', createAction())),
200
);
// enviar notification cada 1s
setInterval(
() => sockets.forEach(socket => socket.emit('message', {
type: 'notification',
payload: {
message: 'te respondieron una discusión!'
},
meta: {
user: {
id: 862,
},
},
})),
1000
);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment