Skip to content

Instantly share code, notes, and snippets.

@antonioaguilar
Created January 5, 2017 11:36
Show Gist options
  • Save antonioaguilar/0b89bb8dbf2c7f9b7254a2a72541a99b to your computer and use it in GitHub Desktop.
Save antonioaguilar/0b89bb8dbf2c7f9b7254a2a72541a99b to your computer and use it in GitHub Desktop.
Simple PubSub using KefirJS (version 1)
( function(root, factory) {
root.pubsub = factory(root);
}(this, function() {
var publishEmitter;
var directory;
var eventStream;
var SubscriptionDefinition = function(stream, callback) {
this.stream = stream;
this.callback = callback;
this.onUnsubscribe = null;
this.subscription = stream.observe({ value: function(ev) {callback(ev.data, ev);} });
stream.subscriberCount = stream.subscriberCount ? stream.subscriberCount + 1 : 1;
};
SubscriptionDefinition.prototype.unsubscribe = function() {
if(typeof this.onUnsubscribe === 'function') {
this.onUnsubscribe();
}
this.subscription.unsubscribe();
if(!--this.stream.subscriberCount) {
delete directory[this.stream._source._channelName].topics[this.stream._binding];
}
};
function publish(event) {
event.timestamp = new Date().toISOString();
publishEmitter && publishEmitter.emit(event);
}
function topicRegex(binding) {
var prevSegment;
pattern = '^' + binding.split('.').map(function mapTopicBinding(segment) {
var res = '';
if(!!prevSegment) {
res = prevSegment !== '#' ? '\\.\\b' : '\\b';
}
if(segment === '#') {
res += '[\\s\\S]*';
} else if(segment === '*') {
res += '[^.]+';
} else {
res += segment;
}
prevSegment = segment;
return res;
}).join('') + '$';
return new RegExp(pattern);
}
function topicComparator(binding) {
if(binding.indexOf('#') === -1 && binding.indexOf('*') === -1) {
return (function(ev) { return ev.topic === binding; });
}
else {
var rgx = topicRegex(binding);
return (function(ev) { return rgx.test(ev.topic); });
}
}
function getChannel(name) {
if(!directory[name]) {
directory[name] = {
stream: eventStream.filter(function(ev) { return ev.channel === name; }),
topics: {}
};
directory[name].stream._channelName = name;
}
return directory[name];
}
function getTopicStream(channelName, binding) {
var channel = getChannel(channelName);
var cmp = topicComparator(binding);
var stream = channel.topics[binding] || (channel.topics[binding] = channel.stream.filter(function(ev) { return cmp(ev); }));
stream._binding = binding;
return stream;
}
function subscribe(def) {
return new SubscriptionDefinition(getTopicStream(def.channel, def.topic), def.callback);
}
function addWireTap(callback) {
var subscription = eventStream.observe({ value: function(ev) { callback(ev.data, ev); } });
return function() { subscription.unsubscribe() };
}
function reset() {
directory = {};
publishEmitter && publishEmitter.end();
publishEmitter = null;
eventStream = Kefir.stream(function(emitter) {
publishEmitter = emitter;
return function() { publishEmitter = null; };
});
}
function when(defs, onSuccess, onError, options) {
var streams = [];
var _options = options || {};
defs.forEach(function(def) {
streams.push(getTopicStream(def.channel, def.topic));
});
var aligned = Kefir.zip(streams);
var limited = _options.once ? aligned.take(1) : aligned;
limited.observe({ value: function(data) { onSuccess.apply(this, data); }, error: onError });
}
reset();
return {
publish: publish,
subscribe: subscribe,
addWireTap: addWireTap,
reset: reset,
when: when
};
}) );
@jamesholcomb
Copy link

Thanks for sharing this...Could you add a simple example?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment