Created
August 17, 2016 17:29
-
-
Save sergiodxa/245515a4673d9f902b608029847269ff to your computer and use it in GitHub Desktop.
Example possible usage of Rx for (fake) WebSocket Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable } from 'rx'; | |
import { EventEmitter } from 'events'; | |
import { Map as map } from 'immutable'; | |
// base para crear sockets | |
class Socket extends EventEmitter { | |
constructor(id) { | |
super(id); | |
this.id = id; | |
} | |
} | |
// servidor de sockets y sistema de colas | |
const wss = new EventEmitter(); | |
const queue = new EventEmitter(); | |
// mapa de sockets activos | |
let sockets = map(); | |
// sockets conectados | |
const sockets$ = Observable | |
.fromEvent(wss, 'connection'); | |
// agregar sockets al mapa | |
sockets$ | |
.forEach(socket => { sockets = sockets.set(socket.id.toString(), socket) }); | |
// mensajes del sistema de colas | |
const queueMessage$ = Observable | |
.fromEvent(queue, 'message'); | |
// mensajes de sockets + sistema de cola | |
const messages$ = sockets$ | |
.map(socket => Observable.fromEvent(socket, 'message')) | |
.mergeAll() | |
.merge(queueMessage$) | |
.filter(message => sockets.has(message.meta.user.id.toString())) | |
// desconexiones de sockets | |
const close = sockets$ | |
.map(socket => Observable.fromEvent(socket, 'close')) | |
.mergeAll(); | |
// mensajes de tipo join:live | |
const joinLive$ = messages$ | |
.filter(message => message.type === 'join:live'); | |
// mensajes de tipo chat:message | |
const chatMessage$ = messages$ | |
.filter(message => message.type === 'chat:message'); | |
// mensajes de tipo notificación | |
const notification$ = messages$ | |
.filter(message => message.type === 'notification'); | |
// procesamiento de tipos de mensajes | |
joinLive$ | |
.bufferWithTime(100) | |
.filter(messages => messages.length > 0) | |
.forEach(::console.log); | |
chatMessage$ | |
.forEach(::console.log); | |
notification$ | |
.map(notification => ({ | |
...notification, | |
socket: sockets.get(notification.meta.user.id.toString()) | |
})) | |
.forEach(::console.log) | |
// .forEach(notification => notification.socket.send(notification.payload)); | |
// ===== | |
// conectar sockets | |
let used = false; | |
for (let i = 0; i < 5; i++) { | |
wss | |
.emit( | |
'connection', | |
new Socket(!used ? 862 : Math.floor(Math.random() * 100) + 1) | |
); | |
used = true | |
} | |
// crear datos de mensajes | |
function createAction() { | |
const n = Math.floor(Math.random() * 2) + 1; | |
if (n === 1) { | |
return { | |
type: 'join:live', | |
payload: 862, | |
meta: { | |
user: { | |
id: 862, | |
}, | |
}, | |
}; | |
} | |
return { | |
type: 'chat:message', | |
payload: 'hello world', | |
meta: { | |
user: { | |
id: Math.floor(Math.random() * 100) + 1, | |
}, | |
}, | |
}; | |
} | |
// enviar mensajes cada 200ms | |
setInterval( | |
() => sockets | |
.forEach(socket => socket.emit('message', createAction())), | |
200 | |
); | |
// enviar notification cada 1s | |
setInterval( | |
() => sockets.forEach(socket => socket.emit('message', { | |
type: 'notification', | |
payload: { | |
message: 'te respondieron una discusión!' | |
}, | |
meta: { | |
user: { | |
id: 862, | |
}, | |
}, | |
})), | |
1000 | |
); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment