Last active
August 29, 2015 14:18
-
-
Save hohl/935bcc04151e7831b399 to your computer and use it in GitHub Desktop.
RedisEventEmitter for Node.JS written in TypeScript
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* | |
* █████╗ ██╗ ██╗ ██╗ █████╗ ██╗ ██╗███████╗ ██████╗ ███╗ ██╗ ██╗██████╗ ██████╗ | |
* ██╔══██╗██║ ██║ ██║██╔══██╗╚██╗ ██╔╝██╔════╝██╔═══██╗████╗ ██║██╗██╗██║██╔══██╗██╔════╝ | |
* ███████║██║ ██║ █╗ ██║███████║ ╚████╔╝ ███████╗██║ ██║██╔██╗ ██║╚═╝╚═╝██║██████╔╝██║ | |
* ██╔══██║██║ ██║███╗██║██╔══██║ ╚██╔╝ ╚════██║██║ ██║██║╚██╗██║██╗██╗██║██╔══██╗██║ | |
* ██║ ██║███████╗╚███╔███╔╝██║ ██║ ██║ ███████║╚██████╔╝██║ ╚████║╚═╝╚═╝██║██║ ██║╚██████╗ | |
* ╚═╝ ╚═╝╚══════╝ ╚══╝╚══╝ ╚═╝ ╚═╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═╝╚═╝ ╚═╝ ╚═════╝ | |
* | |
* | |
* CONFIDENTIAL AND PROPRIETARY | |
* Copyright (c) 2015 Michael Hohl, All Rights Reserved. | |
* | |
* NOTICE: All information contained herein is, and remains the property of MICHAEL HOHL. The intellectual and | |
* technical concepts contained herein are proprietary to MICHAEL HOHL and are protected by trade secret and copyright | |
* law. Dissemination of this information or reproduction of this material is strictly forbidden unless prior written | |
* permission is obtained from MICHAEL HOHL. Access to the source code contained herein is hereby forbidden to anyone | |
* except MICHAEL HOHL AND CONTRACTORS who have executed Confidentiality and Non-disclosure agreements explicitly | |
* covering such access. | |
* | |
* The copyright notice above does not evidence any actual or intended publication or disclosure of this source code, | |
* which includes information that is confidential and/or proprietary, and is a trade secret, of MICHAEL HOHL. ANY | |
* REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE | |
* CODE WITHOUT THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, AND IN VIOLATION OF APPLICABLE LAWS AND | |
* INTERNATIONAL TREATIES. THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION DOES NOT CONVEY OR | |
* IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT | |
* MAY DESCRIBE, IN WHOLE OR IN PART. | |
*/ | |
///<reference path='../../typing/chai/chai.d.ts'/> | |
///<reference path='../../typing/node/node.d.ts'/> | |
///<reference path='../../typing/mocha/mocha.d.ts'/> | |
///<reference path='../../typing/sinon/sinon.d.ts'/> | |
///<reference path='../../typing/redis/redis.d.ts'/> | |
import chai = require('chai'); | |
import events = require("events"); | |
import redis = require("redis"); | |
import sinon = require("sinon"); | |
import RedisEventEmitter = require("../../app/models/RedisEventEmitter"); | |
var assert = chai.assert; | |
describe("RedisEventEmitter", () => { | |
var redisEmitter = null; | |
var redisReceiver = null; | |
var redisEventEmitter = null; | |
beforeEach(function() { | |
redisEmitter = new events.EventEmitter(); | |
redisEmitter.publish = sinon.spy(); | |
redisReceiver = new events.EventEmitter(); | |
redisReceiver.psubscribe = sinon.spy(); | |
redisReceiver.punsubscribe = sinon.spy(); | |
redisEventEmitter = new RedisEventEmitter(redisEmitter, redisReceiver); | |
}); | |
it('publishs an event when publish is called', function() { | |
redisEventEmitter.publish('test.topic', {foo: 'bar'}); | |
assert.equal(redisEmitter.publish.lastCall.args[0], 'test.topic'); | |
assert.deepEqual(redisEmitter.publish.lastCall.args[1], '{"foo":"bar"}'); | |
}); | |
it('subscribes to topics on the first call of on', function() { | |
redisEventEmitter.on('test.topic.1.*', function() {}); | |
redisEventEmitter.on('test.topic.2.*', function() {}); | |
assert.equal(redisReceiver.psubscribe.firstCall.args[0], 'test.topic.1.*'); | |
assert.equal(redisReceiver.psubscribe.secondCall.args[0], 'test.topic.2.*'); | |
}); | |
it('subscribes to topics only once', function() { | |
redisEventEmitter.on('test.topic.1.*', function() {}); | |
redisEventEmitter.on('test.topic.1.*', function() {}); | |
assert.equal(redisReceiver.psubscribe.callCount, 1); | |
}); | |
it('does not unsubscribe if theres still a listener', function() { | |
redisEventEmitter.on('test.topic.1.*', function() {}); | |
redisEventEmitter.on('test.topic.1.*', function() {}); | |
redisEventEmitter.removeListener('test.topic.1.*', function() {}); | |
assert.equal(redisReceiver.punsubscribe.callCount, 0); | |
}); | |
it('does unsubscribe if theres no listener left', function() { | |
var listener = function() {}; | |
redisEventEmitter.on('test.topic.1.*', listener); | |
redisEventEmitter.removeListener('test.topic.1.*', listener); | |
assert.equal(redisReceiver.punsubscribe.firstCall.args[0], 'test.topic.1.*'); | |
}); | |
it('does unsubscribe if all listeners get removed', function() { | |
redisEventEmitter.on('test.topic.1.*', function() {}); | |
redisEventEmitter.on('test.topic.1.*', function() {}); | |
redisEventEmitter.removeAllListeners('test.topic.1.*'); | |
assert.equal(redisReceiver.punsubscribe.firstCall.args[0], 'test.topic.1.*'); | |
assert.equal(redisReceiver.punsubscribe.callCount, 1); | |
}); | |
it('does receive data', function(done) { | |
redisEventEmitter.on('test.topic.1.*', function(channel, data) { | |
assert.deepEqual(data, {foo: 'bar'}); | |
assert.equal(channel, 'test.topic.1.test'); | |
done(); | |
}); | |
redisReceiver.emit('pmessage', 'test.topic.1.*', 'test.topic.1.test', '{"foo":"bar"}'); | |
}); | |
it('does work with once', function() { | |
redisEventEmitter.once('test.topic.1.*', function() {}); | |
assert.equal(redisReceiver.punsubscribe.callCount, 0); | |
redisReceiver.emit('pmessage', 'test.topic.1.*', 'test.topic.1.test', '{}'); | |
assert.equal(redisReceiver.punsubscribe.callCount, 1); | |
}); | |
it('passes errors through', function(done) { | |
redisEventEmitter.on('error', function(msg) { | |
assert.deepEqual(msg, 'test'); | |
done(); | |
}); | |
redisReceiver.emit('error', 'test'); | |
}); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* | |
* █████╗ ██╗ ██╗ ██╗ █████╗ ██╗ ██╗███████╗ ██████╗ ███╗ ██╗ ██╗██████╗ ██████╗ | |
* ██╔══██╗██║ ██║ ██║██╔══██╗╚██╗ ██╔╝██╔════╝██╔═══██╗████╗ ██║██╗██╗██║██╔══██╗██╔════╝ | |
* ███████║██║ ██║ █╗ ██║███████║ ╚████╔╝ ███████╗██║ ██║██╔██╗ ██║╚═╝╚═╝██║██████╔╝██║ | |
* ██╔══██║██║ ██║███╗██║██╔══██║ ╚██╔╝ ╚════██║██║ ██║██║╚██╗██║██╗██╗██║██╔══██╗██║ | |
* ██║ ██║███████╗╚███╔███╔╝██║ ██║ ██║ ███████║╚██████╔╝██║ ╚████║╚═╝╚═╝██║██║ ██║╚██████╗ | |
* ╚═╝ ╚═╝╚══════╝ ╚══╝╚══╝ ╚═╝ ╚═╝ ╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═══╝ ╚═╝╚═╝ ╚═╝ ╚═════╝ | |
* | |
* | |
* CONFIDENTIAL AND PROPRIETARY | |
* Copyright (c) 2015 Michael Hohl, All Rights Reserved. | |
* | |
* NOTICE: All information contained herein is, and remains the property of MICHAEL HOHL. The intellectual and | |
* technical concepts contained herein are proprietary to MICHAEL HOHL and are protected by trade secret and copyright | |
* law. Dissemination of this information or reproduction of this material is strictly forbidden unless prior written | |
* permission is obtained from MICHAEL HOHL. Access to the source code contained herein is hereby forbidden to anyone | |
* except MICHAEL HOHL AND CONTRACTORS who have executed Confidentiality and Non-disclosure agreements explicitly | |
* covering such access. | |
* | |
* The copyright notice above does not evidence any actual or intended publication or disclosure of this source code, | |
* which includes information that is confidential and/or proprietary, and is a trade secret, of MICHAEL HOHL. ANY | |
* REPRODUCTION, MODIFICATION, DISTRIBUTION, PUBLIC PERFORMANCE, OR PUBLIC DISPLAY OF OR THROUGH USE OF THIS SOURCE | |
* CODE WITHOUT THE EXPRESS WRITTEN CONSENT OF COMPANY IS STRICTLY PROHIBITED, AND IN VIOLATION OF APPLICABLE LAWS AND | |
* INTERNATIONAL TREATIES. THE RECEIPT OR POSSESSION OF THIS SOURCE CODE AND/OR RELATED INFORMATION DOES NOT CONVEY OR | |
* IMPLY ANY RIGHTS TO REPRODUCE, DISCLOSE OR DISTRIBUTE ITS CONTENTS, OR TO MANUFACTURE, USE, OR SELL ANYTHING THAT IT | |
* MAY DESCRIBE, IN WHOLE OR IN PART. | |
*/ | |
///<reference path='../../typing/node/node.d.ts'/> | |
///<reference path='../../typing/redis/redis.d.ts'/> | |
import redis = require("redis"); | |
import events = require("events"); | |
/** | |
* RedisEventEmitter implements an NodeJS EventEmitter based on Redis pub/sub functionality. | |
* Redis is an open source, BSD licensed, advanced key-value cache and store. | |
* | |
* ATTENTION: HOWEVER TO EMIT A NEW EVENT YOU MUST USE "PUBLISH" !!! | |
* | |
* @see {@link http://redis.io/|Redis} | |
*/ | |
class RedisEventEmitter extends events.EventEmitter { | |
private _redisEmitter:redis.RedisClient; | |
private _redisReceiver:redis.RedisClient; | |
private _prefix:string; | |
private _subscribedMessages:any; | |
constructor(redisEmitter:redis.RedisClient, redisReceiver:redis.RedisClient, prefix?:string) { | |
super(); | |
this._redisEmitter = redisEmitter; | |
this._redisReceiver = redisReceiver; | |
this._prefix = prefix || ""; | |
this._subscribedMessages = {}; | |
this.on('newListener', this._onNewListener.bind(this)); | |
this.on('removeListener', this._onRemoveListener.bind(this)); | |
redisReceiver.on('pmessage', this._onPmessage.bind(this)); | |
redisEmitter.on("error", this._onError.bind(this)); | |
redisReceiver.on("error", this._onError.bind(this)); | |
} | |
/** | |
* Publishes an event to the Redis server. | |
* | |
* @param event the name of the event | |
* @param args the arguments of the event to pass | |
* @returns {boolean} always true to be in conformance with NodeJS.EventEmitter | |
* @private | |
*/ | |
publish(event:string, value:any):boolean { | |
this._redisEmitter.publish(this._prefix + event, JSON.stringify(value)); | |
return true; | |
} | |
/** | |
* Triggered on incomming message. | |
* | |
* @param pattern the received pattern | |
* @param channel the received channel | |
* @param payload the received payload | |
* @private | |
*/ | |
private _onPmessage(pattern, channel, payload) { | |
pattern = pattern.slice(this._prefix.length); | |
channel = channel.slice(this._prefix.length); | |
this.emit.apply(this, [pattern, channel].concat(JSON.parse(payload))); | |
} | |
/** | |
* Triggered on new listener. | |
* | |
* @param topic the topic the listener listens to. | |
* @private | |
*/ | |
private _onNewListener(topic:string):void { | |
if (topic == 'removeListener') { | |
return; | |
} | |
if (!this._subscribedMessages.hasOwnProperty(topic)) { | |
this._redisReceiver.psubscribe(this._prefix + topic); | |
this._subscribedMessages[topic] = true; | |
} | |
} | |
/** | |
* Triggered once a listener is removed. | |
* | |
* @param topic the topic the listener had listend to. | |
* @private | |
*/ | |
private _onRemoveListener(topic:string):void { | |
if (events.EventEmitter.listenerCount(this, topic) == 0 && this._subscribedMessages[topic]) { | |
this._redisReceiver.punsubscribe(this._prefix + topic); | |
delete this._subscribedMessages[topic]; | |
} | |
} | |
/** | |
* Triggered on error. Just forwards it to this emitter. | |
* | |
* @param message | |
* @private | |
*/ | |
private _onError(message):void { | |
this.emit("error", message); | |
} | |
} | |
export = RedisEventEmitter; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment