Skip to content

Instantly share code, notes, and snippets.

@timothyjoelwright
Created May 24, 2012 04:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timothyjoelwright/2779421 to your computer and use it in GitHub Desktop.
Save timothyjoelwright/2779421 to your computer and use it in GitHub Desktop.
De-multiplexing Redis transport for SocketStream
# Publish Event - Redis Transport
redis = require('redis')
subscriptions = require('../../websocket/subscriptions')
module.exports = (config = {}) ->
# REMOVE_BEFORE_0.3.0
throw new Error("Note the {redis: {}} object wrapper was removed in 0.3 alpha3. Please pass any Redis server options to ss.session.store.use('redis') and ss.publish.transport.use('redis') directly.") if config.redis
# Set options or use the defaults
port = config.port || 6379
host = config.host || "127.0.0.1"
options = config.options || {}
# Redis requires a separate connection for pub/sub
conn = {}
['pub','sub'].forEach (name) ->
conn[name] = redis.createClient(port, host, options)
conn[name].auth(config.pass) if config.pass
conn[name].select(config.db) if config.db
# Basic subscription management by counting references
channels = []
subscribe = (channel, type) ->
channel = type+':'+channel
if channels[channel]?
channels[channel] += 1
console.log channel+" refs: "+channels[channel]
else
channels[channel] = 1
conn.sub.subscribe 'ss:event:'+channel
console.log channel+" added: "+channel
unsubscribe = (channel, type) ->
channel = type+':'+channel
channels[channel] -= 1
console.log channel+" refs: "+channels[channel]
if channels[channel] <= 0
delete channels[channel]
conn.sub.unsubscribe 'ss:event:'+channel
console.log "channel deleted: "+channel
conn.sub.subscribe 'ss:event:all'
# React to subscribe/unsubscribe events
subscriptions.user.events.on 'addKey', (key, value) -> subscribe key, 'user'
subscriptions.user.events.on 'addValue', (key, value) -> subscribe value, 'socketId'
subscriptions.user.events.on 'removeKey', (key, value) -> unsubscribe key, 'user'
subscriptions.user.events.on 'removeValue', (key, value) -> unsubscribe value, 'socketId'
subscriptions.channel.events.on 'addKey', (key, value) -> subscribe key, 'channel'
subscriptions.channel.events.on 'addValue', (key, value) -> subscribe value, 'socketId'
subscriptions.channel.events.on 'removeKey', (key, value) -> unsubscribe key, 'channel'
subscriptions.channel.events.on 'removeValue', (key, value) -> unsubscribe value, 'socketId'
### Add any existing channels ( Is this needed? )
for key in subscriptions.user.keys()
subscribe key
subscribe value for value in subscriptions.user.members key
for key in subscriptions.channel.keys()
subscribe key
subscribe value for value in subscriptions.channel.members key
###
listen: (cb) ->
conn.sub.on 'message', (channel, msg) ->
cb JSON.parse(msg)
send: (obj) ->
switch obj.t
when 'all'
conn.pub.publish 'ss:event:all', JSON.stringify(obj)
when 'socketId'
conn.pub.publish 'ss:event:socketId:'+obj.socketId, JSON.stringify(obj)
when 'user'
for user in obj.users
msg =
t: obj.t
users: [user]
e: obj.e
p: obj.p
conn.pub.publish 'ss:event:user:'+user, JSON.stringify(msg)
when 'channel'
for channel in obj.channels
msg =
t: obj.t
channels: [channel]
e: obj.e
p: obj.p
conn.pub.publish 'ss:event:channel:'+channel, JSON.stringify(msg)
# This is used to maintain lists of userIds to socketIds and channelIds to socketIds
EventEmitter2 = require('eventemitter2').EventEmitter2
class exports.UniqueSet
constructor: ->
@data = []
@events = new EventEmitter2({wildcard: false})
add: (key, value) ->
return false unless key? and value?
if set = @data[key]
unless set.indexOf(value) >= 0
set.push(value)
@events.emit 'addValue', key, value
else
@data[key] = [value]
@events.emit 'addKey', key, value
@events.emit 'addValue', key, value
remove: (key, value) ->
if (i = @data[key].indexOf(value)) >= 0
@data[key].splice(i, 1)
@events.emit 'removeValue', key, value
if @data[key].length == 0
delete @data[key]
@events.emit 'removeKey', key, value
removeFromAll: (value) ->
@keys().forEach (key) =>
@remove(key, value)
keys: ->
Object.keys(@data)
members: (key) ->
@data[key] || []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment