Skip to content

Instantly share code, notes, and snippets.

@feliperohdee
Last active May 9, 2016 14:36
Show Gist options
  • Save feliperohdee/45d46ce949a44324f4bfcfe994e213af to your computer and use it in GitHub Desktop.
Save feliperohdee/45d46ce949a44324f4bfcfe994e213af to your computer and use it in GitHub Desktop.
import * as _ from 'lodash';
import * as ws from 'ws';
import {Observable, Subject} from 'rxjs';
import {Request} from 'restify';
import {Socket as NetSocket} from 'net';
import {Errors, Redis} from './index';
export class Socket {
public channels: Map<string, Set<ws>> = new Map<string, Set<ws>>();
public wss: ws.Server;
public redis: Redis = Redis.instance;
public eventStream: Subject<any> = new Subject<any>();
public errors: Errors = Errors.instance;
constructor() {}
/**
* create socket server instance
* @return {ws.Server}
*/
server(): ws.Server {
if (this.wss) {
return this.wss;
}
this.wss = new ws.Server({
noServer: true
});
// listen redis events
this.redis.createClient();
this.redis.onChannel('broadcast')
.subscribe(response => this.broadcast(response.event, response.payload, response.ignoreClient, false));
this.redis.onChannel('broadcastToChannel')
.subscribe(response => this.broadcastToChannel(response.id, response.event, response.payload, response.ignoreClient, false));
return this.wss;
}
/**
* handle HTTP upgrade request, it dones after authenticated by bearer
* @param {Request} req
* @param {Socket as NetSocket} socket
* @param {Buffer} upgradeHead
*/
handleUpgrade(req: Request, socket: NetSocket, upgradeHead: Buffer): void {
this.wss.handleUpgrade(req, socket, upgradeHead, client => {
let auth: any = req.params.auth;
// attach useful data to client
this.setClient(client, 'auth', auth);
this.setClient(client, 'channels', new Set());
this.handleConnection(client);
});
}
/**
* subscribe client to a channel
* @param {ws} client
* @param {string} id
*/
subscribe(client: ws, id: string): void {
let channelsIn: Set<string> = _.get(client, 'channels', null);
let channel: Set<ws> = this.channels.get(id);
// add channel to client channels hash
channelsIn.add(id);
if (_.isSet(channel)) {
// add channel
channel.add(client);
} else {
// create new channel
this.channels.set(id, new Set<ws>([client]));
}
// fixme: remove it
// console.log('');
// console.log('add to channel');
// console.log(this.wss.clients.length, 'clients connected');
// this.channels.forEach((clients, key) => console.log(key, clients.size));
}
/**
* unsubscribe client from channel
* @param {ws} client
* @param {string} id
*/
unsubscribe(client: ws, id: string): void {
let channelsIn: Set<string> = _.get(client, 'channels', null);
let channel: Set<ws> = this.channels.get(id);
Eif (_.isSet(channel)) {
// remove channel from client channels hash
channelsIn.delete(id);
// remove channel
channel.delete(client);
if (!channel.size) {
this.channels.delete(id);
}
}
// fixme: remove it
// console.log('');
// console.log('remove from channel');
// console.log(this.wss.clients.length, 'clients connected');
// this.channels.forEach((clients, key) => console.log(key, clients.size));
}
/**
* set key / value to a client
* @param {ws} client
* @param {string} key
* @param {any} value
*/
setClient(client: ws, key: string, value: any): void {
_.set(client, key, value);
}
/**
* send an event to a client
* @param {ws} client
* @param {string} event
* @param {any} payload
*/
sendToClient(client: ws, event: string, payload: any = null): void {
this.send(client, {
e: event,
p: payload
});
}
/**
* broadcast to all
* @param {string} event
* @param {any = null} payload
* @param {string} ignoreClient
* @param {boolean = true} broadcastToRedis
*/
broadcast(event: string, Ipayload: any = null, ignoreClient?: string, broadcastToRedis: boolean = true): void {
let data: any = {
e: event,
p: payload
};
_.each(this.wss.clients, client => this.send(client, data, ignoreClient));
// avoid reverb
if (broadcastToRedis) {
// broadcast to other nodes
this.redis.publish('broadcast', { event, payload, ignoreClient });
}
}
/**
* broadcast to channel
* @param {string} id
* @param {string} event
* @param {any = null} payload
* @param {string} ignoreClient
* @param {boolean = true} broadcastToRedis
*/
broadcastToChannel(id: string, event: string, Ipayload: any = null, ignoreClient?: string, broadcastToRedis: boolean = true): void {
let channel: Set<ws> = this.channels.get(id);
let data: any = {
e: event,
p: payload,
c: id
};
if (_.isSet(channel)) {
channel.forEach(client => this.send(client, data, ignoreClient));
// avoid reverb
if (broadcastToRedis) {
// broadcast to other nodes
this.redis.publish('broadcastToChannel', { id, event, payload, ignoreClient });
}
}
}
/**
* helper get client auth
* @param {ws} client
* @param {string} key
* @return {any}
*/
getAuth(client: ws, key?: string): any {
let auth: any = (<any>client).auth;
if (!auth) {
return false;
}
if (key) {
return auth[key] || false;
}
return auth;
}
/**
* handle client connection
* @param {ws} client
*/
handleConnection(client: ws): void {
let auth: any = this.getAuth(client);
if (!auth) {
return client.close();
}
// handle close
Observable.fromEvent(client, 'close')
.subscribe(data => {
let channelsIn: Set<string> = _.get(client, 'channels') as Set<string>;
// remove client from all channels
channelsIn.forEach(id => this.unsubscribe(client, id));
});
// handle message
Observable.fromEvent(client, 'message', response => response)
.subscribe(response => {
let parsed: any = this.parseIncomingMessage(response);
let event: string = parsed.e;
let payload: any = parsed.p;
if (event === 'ping') {
this.sendToClient(client, 'pong');
} else {
this.eventStream.next({ client, event, payload });
}
});
}
/**
* handle incoming message
* @param {any} response
* @return {any}
*/
parseIncomingMessage(data: string): any {
try {
return JSON.parse(data);
} catch (err) {
this.errors.log(err);
}
}
/**
* send message
* @param {ws | string} client
* @param {any} data
* @param {string} ignoreClient
*/
send(client: ws, Idata: any = null, ignoreClient?: string): void {
if (_.isObject(data)){
data = JSON.stringify(data);
}
try {
if (ignoreClient) {
let auth: any = this.getAuth(client);
if (auth.id === ignoreClient || `${auth.id}/${auth.unique}` === ignoreClient) {
return;
}
}
client.send(data);
} catch (err) {
this.errors.log(err);
}
}
}
it('should call broadcast when redis publishes', done => {
let clientMessages: Observable<any> = Observable.fromEvent(client, 'message');
let broadcastToChannel: Sinon.SinonSpy = sinon.spy(socket, 'broadcastToChannel');
clientMessages.zip(clientMessages)
.first()
.subscribe(data => {
expect(broadcastToChannel).to.have.been.calledWith('someChannel', 'someEvent', {
key: 'value'
}, undefined, false);
expect(data[0]).to.equal(JSON.stringify({
e: 'someEvent',
p: {
key: 'value'
},
c: 'someChannel'
}));
}, null, done);
socket.redis.ignoreSameNode = false;
socket.redis.publish('broadcastToChannel', {
id: 'someChannel',
event: 'someEvent',
payload: {
key: 'value'
}
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment