Created
May 24, 2012 04:29
-
-
Save timothyjoelwright/2779421 to your computer and use it in GitHub Desktop.
De-multiplexing Redis transport for SocketStream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Publish Event - Redis Transport | |
redis = require('redis') | |
subscriptions = require('../../websocket/subscriptions') | |
module.exports = (config = {}) -> | |
# REMOVE_BEFORE_0.3.0 | |
throw new Error("Note the {redis: {}} object wrapper was removed in 0.3 alpha3. Please pass any Redis server options to ss.session.store.use('redis') and ss.publish.transport.use('redis') directly.") if config.redis | |
# Set options or use the defaults | |
port = config.port || 6379 | |
host = config.host || "127.0.0.1" | |
options = config.options || {} | |
# Redis requires a separate connection for pub/sub | |
conn = {} | |
['pub','sub'].forEach (name) -> | |
conn[name] = redis.createClient(port, host, options) | |
conn[name].auth(config.pass) if config.pass | |
conn[name].select(config.db) if config.db | |
# Basic subscription management by counting references | |
channels = [] | |
subscribe = (channel, type) -> | |
channel = type+':'+channel | |
if channels[channel]? | |
channels[channel] += 1 | |
console.log channel+" refs: "+channels[channel] | |
else | |
channels[channel] = 1 | |
conn.sub.subscribe 'ss:event:'+channel | |
console.log channel+" added: "+channel | |
unsubscribe = (channel, type) -> | |
channel = type+':'+channel | |
channels[channel] -= 1 | |
console.log channel+" refs: "+channels[channel] | |
if channels[channel] <= 0 | |
delete channels[channel] | |
conn.sub.unsubscribe 'ss:event:'+channel | |
console.log "channel deleted: "+channel | |
conn.sub.subscribe 'ss:event:all' | |
# React to subscribe/unsubscribe events | |
subscriptions.user.events.on 'addKey', (key, value) -> subscribe key, 'user' | |
subscriptions.user.events.on 'addValue', (key, value) -> subscribe value, 'socketId' | |
subscriptions.user.events.on 'removeKey', (key, value) -> unsubscribe key, 'user' | |
subscriptions.user.events.on 'removeValue', (key, value) -> unsubscribe value, 'socketId' | |
subscriptions.channel.events.on 'addKey', (key, value) -> subscribe key, 'channel' | |
subscriptions.channel.events.on 'addValue', (key, value) -> subscribe value, 'socketId' | |
subscriptions.channel.events.on 'removeKey', (key, value) -> unsubscribe key, 'channel' | |
subscriptions.channel.events.on 'removeValue', (key, value) -> unsubscribe value, 'socketId' | |
### Add any existing channels ( Is this needed? ) | |
for key in subscriptions.user.keys() | |
subscribe key | |
subscribe value for value in subscriptions.user.members key | |
for key in subscriptions.channel.keys() | |
subscribe key | |
subscribe value for value in subscriptions.channel.members key | |
### | |
listen: (cb) -> | |
conn.sub.on 'message', (channel, msg) -> | |
cb JSON.parse(msg) | |
send: (obj) -> | |
switch obj.t | |
when 'all' | |
conn.pub.publish 'ss:event:all', JSON.stringify(obj) | |
when 'socketId' | |
conn.pub.publish 'ss:event:socketId:'+obj.socketId, JSON.stringify(obj) | |
when 'user' | |
for user in obj.users | |
msg = | |
t: obj.t | |
users: [user] | |
e: obj.e | |
p: obj.p | |
conn.pub.publish 'ss:event:user:'+user, JSON.stringify(msg) | |
when 'channel' | |
for channel in obj.channels | |
msg = | |
t: obj.t | |
channels: [channel] | |
e: obj.e | |
p: obj.p | |
conn.pub.publish 'ss:event:channel:'+channel, JSON.stringify(msg) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is used to maintain lists of userIds to socketIds and channelIds to socketIds | |
EventEmitter2 = require('eventemitter2').EventEmitter2 | |
class exports.UniqueSet | |
constructor: -> | |
@data = [] | |
@events = new EventEmitter2({wildcard: false}) | |
add: (key, value) -> | |
return false unless key? and value? | |
if set = @data[key] | |
unless set.indexOf(value) >= 0 | |
set.push(value) | |
@events.emit 'addValue', key, value | |
else | |
@data[key] = [value] | |
@events.emit 'addKey', key, value | |
@events.emit 'addValue', key, value | |
remove: (key, value) -> | |
if (i = @data[key].indexOf(value)) >= 0 | |
@data[key].splice(i, 1) | |
@events.emit 'removeValue', key, value | |
if @data[key].length == 0 | |
delete @data[key] | |
@events.emit 'removeKey', key, value | |
removeFromAll: (value) -> | |
@keys().forEach (key) => | |
@remove(key, value) | |
keys: -> | |
Object.keys(@data) | |
members: (key) -> | |
@data[key] || [] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment