Last active
May 9, 2016 14:36
-
-
Save feliperohdee/45d46ce949a44324f4bfcfe994e213af to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as _ from 'lodash'; | |
import * as ws from 'ws'; | |
import {Observable, Subject} from 'rxjs'; | |
import {Request} from 'restify'; | |
import {Socket as NetSocket} from 'net'; | |
import {Errors, Redis} from './index'; | |
export class Socket { | |
public channels: Map<string, Set<ws>> = new Map<string, Set<ws>>(); | |
public wss: ws.Server; | |
public redis: Redis = Redis.instance; | |
public eventStream: Subject<any> = new Subject<any>(); | |
public errors: Errors = Errors.instance; | |
constructor() {} | |
/** | |
* create socket server instance | |
* @return {ws.Server} | |
*/ | |
server(): ws.Server { | |
if (this.wss) { | |
return this.wss; | |
} | |
this.wss = new ws.Server({ | |
noServer: true | |
}); | |
// listen redis events | |
this.redis.createClient(); | |
this.redis.onChannel('broadcast') | |
.subscribe(response => this.broadcast(response.event, response.payload, response.ignoreClient, false)); | |
this.redis.onChannel('broadcastToChannel') | |
.subscribe(response => this.broadcastToChannel(response.id, response.event, response.payload, response.ignoreClient, false)); | |
return this.wss; | |
} | |
/** | |
* handle HTTP upgrade request, it dones after authenticated by bearer | |
* @param {Request} req | |
* @param {Socket as NetSocket} socket | |
* @param {Buffer} upgradeHead | |
*/ | |
handleUpgrade(req: Request, socket: NetSocket, upgradeHead: Buffer): void { | |
this.wss.handleUpgrade(req, socket, upgradeHead, client => { | |
let auth: any = req.params.auth; | |
// attach useful data to client | |
this.setClient(client, 'auth', auth); | |
this.setClient(client, 'channels', new Set()); | |
this.handleConnection(client); | |
}); | |
} | |
/** | |
* subscribe client to a channel | |
* @param {ws} client | |
* @param {string} id | |
*/ | |
subscribe(client: ws, id: string): void { | |
let channelsIn: Set<string> = _.get(client, 'channels', null); | |
let channel: Set<ws> = this.channels.get(id); | |
// add channel to client channels hash | |
channelsIn.add(id); | |
if (_.isSet(channel)) { | |
// add channel | |
channel.add(client); | |
} else { | |
// create new channel | |
this.channels.set(id, new Set<ws>([client])); | |
} | |
// fixme: remove it | |
// console.log(''); | |
// console.log('add to channel'); | |
// console.log(this.wss.clients.length, 'clients connected'); | |
// this.channels.forEach((clients, key) => console.log(key, clients.size)); | |
} | |
/** | |
* unsubscribe client from channel | |
* @param {ws} client | |
* @param {string} id | |
*/ | |
unsubscribe(client: ws, id: string): void { | |
let channelsIn: Set<string> = _.get(client, 'channels', null); | |
let channel: Set<ws> = this.channels.get(id); | |
Eif (_.isSet(channel)) { | |
// remove channel from client channels hash | |
channelsIn.delete(id); | |
// remove channel | |
channel.delete(client); | |
if (!channel.size) { | |
this.channels.delete(id); | |
} | |
} | |
// fixme: remove it | |
// console.log(''); | |
// console.log('remove from channel'); | |
// console.log(this.wss.clients.length, 'clients connected'); | |
// this.channels.forEach((clients, key) => console.log(key, clients.size)); | |
} | |
/** | |
* set key / value to a client | |
* @param {ws} client | |
* @param {string} key | |
* @param {any} value | |
*/ | |
setClient(client: ws, key: string, value: any): void { | |
_.set(client, key, value); | |
} | |
/** | |
* send an event to a client | |
* @param {ws} client | |
* @param {string} event | |
* @param {any} payload | |
*/ | |
sendToClient(client: ws, event: string, payload: any = null): void { | |
this.send(client, { | |
e: event, | |
p: payload | |
}); | |
} | |
/** | |
* broadcast to all | |
* @param {string} event | |
* @param {any = null} payload | |
* @param {string} ignoreClient | |
* @param {boolean = true} broadcastToRedis | |
*/ | |
broadcast(event: string, Ipayload: any = null, ignoreClient?: string, broadcastToRedis: boolean = true): void { | |
let data: any = { | |
e: event, | |
p: payload | |
}; | |
_.each(this.wss.clients, client => this.send(client, data, ignoreClient)); | |
// avoid reverb | |
if (broadcastToRedis) { | |
// broadcast to other nodes | |
this.redis.publish('broadcast', { event, payload, ignoreClient }); | |
} | |
} | |
/** | |
* broadcast to channel | |
* @param {string} id | |
* @param {string} event | |
* @param {any = null} payload | |
* @param {string} ignoreClient | |
* @param {boolean = true} broadcastToRedis | |
*/ | |
broadcastToChannel(id: string, event: string, Ipayload: any = null, ignoreClient?: string, broadcastToRedis: boolean = true): void { | |
let channel: Set<ws> = this.channels.get(id); | |
let data: any = { | |
e: event, | |
p: payload, | |
c: id | |
}; | |
if (_.isSet(channel)) { | |
channel.forEach(client => this.send(client, data, ignoreClient)); | |
// avoid reverb | |
if (broadcastToRedis) { | |
// broadcast to other nodes | |
this.redis.publish('broadcastToChannel', { id, event, payload, ignoreClient }); | |
} | |
} | |
} | |
/** | |
* helper get client auth | |
* @param {ws} client | |
* @param {string} key | |
* @return {any} | |
*/ | |
getAuth(client: ws, key?: string): any { | |
let auth: any = (<any>client).auth; | |
if (!auth) { | |
return false; | |
} | |
if (key) { | |
return auth[key] || false; | |
} | |
return auth; | |
} | |
/** | |
* handle client connection | |
* @param {ws} client | |
*/ | |
handleConnection(client: ws): void { | |
let auth: any = this.getAuth(client); | |
if (!auth) { | |
return client.close(); | |
} | |
// handle close | |
Observable.fromEvent(client, 'close') | |
.subscribe(data => { | |
let channelsIn: Set<string> = _.get(client, 'channels') as Set<string>; | |
// remove client from all channels | |
channelsIn.forEach(id => this.unsubscribe(client, id)); | |
}); | |
// handle message | |
Observable.fromEvent(client, 'message', response => response) | |
.subscribe(response => { | |
let parsed: any = this.parseIncomingMessage(response); | |
let event: string = parsed.e; | |
let payload: any = parsed.p; | |
if (event === 'ping') { | |
this.sendToClient(client, 'pong'); | |
} else { | |
this.eventStream.next({ client, event, payload }); | |
} | |
}); | |
} | |
/** | |
* handle incoming message | |
* @param {any} response | |
* @return {any} | |
*/ | |
parseIncomingMessage(data: string): any { | |
try { | |
return JSON.parse(data); | |
} catch (err) { | |
this.errors.log(err); | |
} | |
} | |
/** | |
* send message | |
* @param {ws | string} client | |
* @param {any} data | |
* @param {string} ignoreClient | |
*/ | |
send(client: ws, Idata: any = null, ignoreClient?: string): void { | |
if (_.isObject(data)){ | |
data = JSON.stringify(data); | |
} | |
try { | |
if (ignoreClient) { | |
let auth: any = this.getAuth(client); | |
if (auth.id === ignoreClient || `${auth.id}/${auth.unique}` === ignoreClient) { | |
return; | |
} | |
} | |
client.send(data); | |
} catch (err) { | |
this.errors.log(err); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
it('should call broadcast when redis publishes', done => { | |
let clientMessages: Observable<any> = Observable.fromEvent(client, 'message'); | |
let broadcastToChannel: Sinon.SinonSpy = sinon.spy(socket, 'broadcastToChannel'); | |
clientMessages.zip(clientMessages) | |
.first() | |
.subscribe(data => { | |
expect(broadcastToChannel).to.have.been.calledWith('someChannel', 'someEvent', { | |
key: 'value' | |
}, undefined, false); | |
expect(data[0]).to.equal(JSON.stringify({ | |
e: 'someEvent', | |
p: { | |
key: 'value' | |
}, | |
c: 'someChannel' | |
})); | |
}, null, done); | |
socket.redis.ignoreSameNode = false; | |
socket.redis.publish('broadcastToChannel', { | |
id: 'someChannel', | |
event: 'someEvent', | |
payload: { | |
key: 'value' | |
} | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment