Skip to content

Instantly share code, notes, and snippets.

@nickaknudson
Last active September 29, 2015 19:38
Show Gist options
  • Save nickaknudson/1655610 to your computer and use it in GitHub Desktop.
Save nickaknudson/1655610 to your computer and use it in GitHub Desktop.
a faye hack to provide a simple message queue upon channel subscription
Faye.extend(Faye.Engine.Redis.prototype, {
subscribe: function(clientId, channel, callback, scope) {
var current_key, self;
self = this;
this._redis.sadd(this._ns + '/clients/' + clientId + '/channels', channel, function(error, added) {
if (added === 1) return self.trigger('subscribe', clientId, channel);
});
this._redis.sadd(this._ns + '/channels' + channel, clientId, function() {
self.debug('Subscribed client ? to channel ?', clientId, channel);
if (callback) return callback.call(scope);
});
// MODIFICATION: queue all messages for new subscription
current_key = this._ns + '/channels' + channel + '/msg_queue';
this._redis.lrange(current_key, 0, -1, function(error, jsonMessages) {
return Faye.each(jsonMessages, function(jsonMessage) {
self.debug('MOD USER Q: ? with ?', clientId, jsonMessage);
return self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
});
});
return self._redis.publish(self._ns + '/notifications', clientId);
},
publish: function(message) {
var channels, jsonMessage, keys, notify, self;
this.debug('Publishing message ?', message);
self = this;
jsonMessage = JSON.stringify(message);
channels = Faye.Channel.expand(message.channel);
keys = Faye.map(channels, function(c) {
return self._ns + '/channels' + c;
});
// MODIFICATION: Add to queue
notify = function(error, clients) {
return Faye.each(clients, function(clientId) {
self.debug('Queueing for client ?: ?', clientId, message);
self._redis.rpush(self._ns + '/clients/' + clientId + '/messages', jsonMessage);
return self._redis.publish(self._ns + '/notifications', clientId);
});
};
Faye.each(keys, function(c) {
var current_key;
current_key = c + '/msg_queue';
self.debug('MOD MSG Q: ?, ?', current_key, message);
self._redis.ltrim(current_key, 1, 99);
return self._redis.rpush(current_key, jsonMessage);
});
keys.push(notify);
this._redis.sunion.apply(this._redis, keys);
return this.trigger('publish', message.clientId, message.channel, message.data);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment