Skip to content

Instantly share code, notes, and snippets.

@hohl
Last active August 29, 2015 14:18
Show Gist options
  • Save hohl/935bcc04151e7831b399 to your computer and use it in GitHub Desktop.
Save hohl/935bcc04151e7831b399 to your computer and use it in GitHub Desktop.
RedisEventEmitter for Node.JS written in TypeScript
/*
*
* █████╗ ██╗ ██╗ ██╗ █████╗ ██╗ ██╗███████╗ ██████╗ ███╗ ██╗ ██╗██████╗ ██████╗
* ██╔══██╗██║ ██║ ██║██╔══██╗╚██╗ ██╔╝██╔════╝██╔═══██╗████╗ ██║██╗██╗██║██╔══██╗██╔════╝
* ███████║██║ ██║ █╗ ██║███████║ ╚████╔╝ ███████╗██║ ██║██╔██╗ ██║╚═╝╚═╝██║██████╔╝██║
* ██╔══██║██║ ██║███╗██║██╔══██║ ╚██╔╝ ╚════██║██║ ██║██║╚██╗██║██╗██╗██║██╔══██╗██║
* ██║ ██║███████╗╚███╔███╔╝██║ ██║ ██║ ███████║╚██████╔╝██║ ╚████║╚═╝╚═╝██║██║ ██║╚██████╗
* ╚═╝ ╚═╝╚══════╝ ╚══╝╚══╝ ╚═╝ ╚═╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═╝╚═╝ ╚═╝ ╚═════╝
*
*
* CONFIDENTIAL AND PROPRIETARY
* Copyright (c) 2015 Michael Hohl, All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains the property of MICHAEL HOHL. The intellectual and
* technical concepts contained herein are proprietary to MICHAEL HOHL and are protected by trade secret and copyright
* law. Dissemination of this information or reproduction of this material is strictly forbidden unless prior written
* permission is obtained from MICHAEL HOHL. Access to the source code contained herein is hereby forbidden to anyone
* except MICHAEL HOHL AND CONTRACTORS who have executed Confidentiality and Non-disclosure agreements explicitly
* covering such access.
*
* The copyright notice above does not evidence any actual or intended publication or disclosure of this source code,
* which includes information that is confidential and/or proprietary, and is a trade secret, of MICHAEL HOHL. ANY
* REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE
* CODE WITHOUT THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, AND IN VIOLATION OF APPLICABLE LAWS AND
* INTERNATIONAL TREATIES. THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION DOES NOT CONVEY OR
* IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT
* MAY DESCRIBE, IN WHOLE OR IN PART.
*/
///<reference path='../../typing/chai/chai.d.ts'/>
///<reference path='../../typing/node/node.d.ts'/>
///<reference path='../../typing/mocha/mocha.d.ts'/>
///<reference path='../../typing/sinon/sinon.d.ts'/>
///<reference path='../../typing/redis/redis.d.ts'/>
import chai = require('chai');
import events = require("events");
import redis = require("redis");
import sinon = require("sinon");
import RedisEventEmitter = require("../../app/models/RedisEventEmitter");
var assert = chai.assert;
describe("RedisEventEmitter", () => {
var redisEmitter = null;
var redisReceiver = null;
var redisEventEmitter = null;
beforeEach(function() {
redisEmitter = new events.EventEmitter();
redisEmitter.publish = sinon.spy();
redisReceiver = new events.EventEmitter();
redisReceiver.psubscribe = sinon.spy();
redisReceiver.punsubscribe = sinon.spy();
redisEventEmitter = new RedisEventEmitter(redisEmitter, redisReceiver);
});
it('publishs an event when publish is called', function() {
redisEventEmitter.publish('test.topic', {foo: 'bar'});
assert.equal(redisEmitter.publish.lastCall.args[0], 'test.topic');
assert.deepEqual(redisEmitter.publish.lastCall.args[1], '{"foo":"bar"}');
});
it('subscribes to topics on the first call of on', function() {
redisEventEmitter.on('test.topic.1.*', function() {});
redisEventEmitter.on('test.topic.2.*', function() {});
assert.equal(redisReceiver.psubscribe.firstCall.args[0], 'test.topic.1.*');
assert.equal(redisReceiver.psubscribe.secondCall.args[0], 'test.topic.2.*');
});
it('subscribes to topics only once', function() {
redisEventEmitter.on('test.topic.1.*', function() {});
redisEventEmitter.on('test.topic.1.*', function() {});
assert.equal(redisReceiver.psubscribe.callCount, 1);
});
it('does not unsubscribe if theres still a listener', function() {
redisEventEmitter.on('test.topic.1.*', function() {});
redisEventEmitter.on('test.topic.1.*', function() {});
redisEventEmitter.removeListener('test.topic.1.*', function() {});
assert.equal(redisReceiver.punsubscribe.callCount, 0);
});
it('does unsubscribe if theres no listener left', function() {
var listener = function() {};
redisEventEmitter.on('test.topic.1.*', listener);
redisEventEmitter.removeListener('test.topic.1.*', listener);
assert.equal(redisReceiver.punsubscribe.firstCall.args[0], 'test.topic.1.*');
});
it('does unsubscribe if all listeners get removed', function() {
redisEventEmitter.on('test.topic.1.*', function() {});
redisEventEmitter.on('test.topic.1.*', function() {});
redisEventEmitter.removeAllListeners('test.topic.1.*');
assert.equal(redisReceiver.punsubscribe.firstCall.args[0], 'test.topic.1.*');
assert.equal(redisReceiver.punsubscribe.callCount, 1);
});
it('does receive data', function(done) {
redisEventEmitter.on('test.topic.1.*', function(channel, data) {
assert.deepEqual(data, {foo: 'bar'});
assert.equal(channel, 'test.topic.1.test');
done();
});
redisReceiver.emit('pmessage', 'test.topic.1.*', 'test.topic.1.test', '{"foo":"bar"}');
});
it('does work with once', function() {
redisEventEmitter.once('test.topic.1.*', function() {});
assert.equal(redisReceiver.punsubscribe.callCount, 0);
redisReceiver.emit('pmessage', 'test.topic.1.*', 'test.topic.1.test', '{}');
assert.equal(redisReceiver.punsubscribe.callCount, 1);
});
it('passes errors through', function(done) {
redisEventEmitter.on('error', function(msg) {
assert.deepEqual(msg, 'test');
done();
});
redisReceiver.emit('error', 'test');
});
});
/*
*
* █████╗ ██╗ ██╗ ██╗ █████╗ ██╗ ██╗███████╗ ██████╗ ███╗ ██╗ ██╗██████╗ ██████╗
* ██╔══██╗██║ ██║ ██║██╔══██╗╚██╗ ██╔╝██╔════╝██╔═══██╗████╗ ██║██╗██╗██║██╔══██╗██╔════╝
* ███████║██║ ██║ █╗ ██║███████║ ╚████╔╝ ███████╗██║ ██║██╔██╗ ██║╚═╝╚═╝██║██████╔╝██║
* ██╔══██║██║ ██║███╗██║██╔══██║ ╚██╔╝ ╚════██║██║ ██║██║╚██╗██║██╗██╗██║██╔══██╗██║
* ██║ ██║███████╗╚███╔███╔╝██║ ██║ ██║ ███████║╚██████╔╝██║ ╚████║╚═╝╚═╝██║██║ ██║╚██████╗
* ╚═╝ ╚═╝╚══════╝ ╚══╝╚══╝ ╚═╝ ╚═╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═╝╚═╝ ╚═╝ ╚═════╝
*
*
* CONFIDENTIAL AND PROPRIETARY
* Copyright (c) 2015 Michael Hohl, All Rights Reserved.
*
* NOTICE: All information contained herein is, and remains the property of MICHAEL HOHL. The intellectual and
* technical concepts contained herein are proprietary to MICHAEL HOHL and are protected by trade secret and copyright
* law. Dissemination of this information or reproduction of this material is strictly forbidden unless prior written
* permission is obtained from MICHAEL HOHL. Access to the source code contained herein is hereby forbidden to anyone
* except MICHAEL HOHL AND CONTRACTORS who have executed Confidentiality and Non-disclosure agreements explicitly
* covering such access.
*
* The copyright notice above does not evidence any actual or intended publication or disclosure of this source code,
* which includes information that is confidential and/or proprietary, and is a trade secret, of MICHAEL HOHL. ANY
* REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE
* CODE WITHOUT THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, AND IN VIOLATION OF APPLICABLE LAWS AND
* INTERNATIONAL TREATIES. THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION DOES NOT CONVEY OR
* IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT
* MAY DESCRIBE, IN WHOLE OR IN PART.
*/
///<reference path='../../typing/node/node.d.ts'/>
///<reference path='../../typing/redis/redis.d.ts'/>
import redis = require("redis");
import events = require("events");
/**
* RedisEventEmitter implements an NodeJS EventEmitter based on Redis pub/sub functionality.
* Redis is an open source, BSD licensed, advanced key-value cache and store.
*
* ATTENTION: HOWEVER TO EMIT A NEW EVENT YOU MUST USE "PUBLISH" !!!
*
* @see {@link http://redis.io/|Redis}
*/
class RedisEventEmitter extends events.EventEmitter {
private _redisEmitter:redis.RedisClient;
private _redisReceiver:redis.RedisClient;
private _prefix:string;
private _subscribedMessages:any;
constructor(redisEmitter:redis.RedisClient, redisReceiver:redis.RedisClient, prefix?:string) {
super();
this._redisEmitter = redisEmitter;
this._redisReceiver = redisReceiver;
this._prefix = prefix || "";
this._subscribedMessages = {};
this.on('newListener', this._onNewListener.bind(this));
this.on('removeListener', this._onRemoveListener.bind(this));
redisReceiver.on('pmessage', this._onPmessage.bind(this));
redisEmitter.on("error", this._onError.bind(this));
redisReceiver.on("error", this._onError.bind(this));
}
/**
* Publishes an event to the Redis server.
*
* @param event the name of the event
* @param args the arguments of the event to pass
* @returns {boolean} always true to be in conformance with NodeJS.EventEmitter
* @private
*/
publish(event:string, value:any):boolean {
this._redisEmitter.publish(this._prefix + event, JSON.stringify(value));
return true;
}
/**
* Triggered on incomming message.
*
* @param pattern the received pattern
* @param channel the received channel
* @param payload the received payload
* @private
*/
private _onPmessage(pattern, channel, payload) {
pattern = pattern.slice(this._prefix.length);
channel = channel.slice(this._prefix.length);
this.emit.apply(this, [pattern, channel].concat(JSON.parse(payload)));
}
/**
* Triggered on new listener.
*
* @param topic the topic the listener listens to.
* @private
*/
private _onNewListener(topic:string):void {
if (topic == 'removeListener') {
return;
}
if (!this._subscribedMessages.hasOwnProperty(topic)) {
this._redisReceiver.psubscribe(this._prefix + topic);
this._subscribedMessages[topic] = true;
}
}
/**
* Triggered once a listener is removed.
*
* @param topic the topic the listener had listend to.
* @private
*/
private _onRemoveListener(topic:string):void {
if (events.EventEmitter.listenerCount(this, topic) == 0 && this._subscribedMessages[topic]) {
this._redisReceiver.punsubscribe(this._prefix + topic);
delete this._subscribedMessages[topic];
}
}
/**
* Triggered on error. Just forwards it to this emitter.
*
* @param message
* @private
*/
private _onError(message):void {
this.emit("error", message);
}
}
export = RedisEventEmitter;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment