Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@saiqulhaq
Last active April 20, 2016 04:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save saiqulhaq/6277da01c372cd886529bbbccae77dc6 to your computer and use it in GitHub Desktop.
Save saiqulhaq/6277da01c372cd886529bbbccae77dc6 to your computer and use it in GitHub Desktop.
Message Bus
/*jshint bitwise: false*/
// please take a look at line numbers 80-85, and 172-177
(function(global, document, undefined) {
'use strict';
longPoller = function(poll,data){
var gotData = false;
var aborted = false;
lastAjax = new Date();
totalAjaxCalls += 1;
data.__seq = totalAjaxCalls;
var longPoll = shouldLongPoll() && me.enableLongPolling;
var chunked = longPoll && allowChunked();
if (chunkedBackoff > 0) {
chunkedBackoff--;
chunked = false;
}
var headers = {
'X-SILENCE-LOGGER': 'true'
};
if (!chunked){
headers["Dont-Chunk"] = 'true';
}
var dataType = chunked ? "text" : "json";
var handle_progress = function(payload, position) {
var separator = "\r\n|\r\n";
var endChunk = payload.indexOf(separator, position);
if (endChunk === -1) {
return position;
}
var chunk = payload.substring(position, endChunk);
chunk = chunk.replace(/\r\n\|\|\r\n/g, separator);
try {
reqSuccess(JSON.parse(chunk));
} catch(e) {
if (console.log) {
console.log("FAILED TO PARSE CHUNKED REPLY");
console.log(data);
}
}
return handle_progress(payload, endChunk + separator.length);
}
var disableChunked = function(){
if (me.longPoll) {
me.longPoll.abort();
chunkedBackoff = 30;
}
};
var setOnProgressListener = function(xhr) {
var position = 0;
// if it takes longer than 3000 ms to get first chunk, we have some proxy
// this is messing with us, so just backoff from using chunked for now
var chunkedTimeout = setTimeout(disableChunked,3000);
xhr.onprogress = function () {
clearTimeout(chunkedTimeout);
if(xhr.getResponseHeader('Content-Type') === 'application/json; charset=utf-8') {
// not chunked we are sending json back
chunked = false;
return;
}
position = handle_progress(xhr.responseText, position);
}
};
if (!me.ajax){
throw new Error("Either jQuery or the ajax adapter must be loaded");
}
var url = me.baseUrl + "message-bus/" + me.clientId + "/poll" + (!longPoll ? "?dlp=t" : "");
if(typeof me.additionalData !== 'undefined'){
url += me.additionalData()
}
var req = me.ajax({
url: url,
data: data,
cache: false,
dataType: dataType,
type: 'POST',
headers: headers,
messageBus: {
chunked: chunked,
onProgressListener: function(xhr) {
var position = 0;
// if it takes longer than 3000 ms to get first chunk, we have some proxy
// this is messing with us, so just backoff from using chunked for now
var chunkedTimeout = setTimeout(disableChunked,3000);
return xhr.onprogress = function () {
clearTimeout(chunkedTimeout);
if(xhr.getResponseHeader('Content-Type') === 'application/json; charset=utf-8') {
chunked = false; // not chunked, we are sending json back
} else {
position = handle_progress(xhr.responseText, position);
}
}
}
},
xhr: function() {
var xhr = jQuery.ajaxSettings.xhr();
if (!chunked) {
return xhr;
}
this.messageBus.onProgressListener(xhr);
return xhr;
},
success: function(messages) {
if (!chunked) {
// we may have requested text so jQuery will not parse
if (typeof(messages) === "string") {
messages = JSON.parse(messages);
}
gotData = reqSuccess(messages);
}
},
error: function(xhr, textStatus, err) {
if(textStatus === "abort") {
aborted = true;
} else {
failCount += 1;
totalAjaxFailures += 1;
}
},
complete: function() {
var interval;
try {
if (gotData || aborted) {
interval = 100;
} else {
interval = me.callbackInterval;
if (failCount > 2) {
interval = interval * failCount;
} else if (!shouldLongPoll()) {
interval = me.backgroundCallbackInterval;
}
if (interval > me.maxPollInterval) {
interval = me.maxPollInterval;
}
interval -= (new Date() - lastAjax);
if (interval < 100) {
interval = 100;
}
}
} catch(e) {
if(console.log && e.message) {
console.log("MESSAGE BUS FAIL: " + e.message);
}
}
pollTimeout = setTimeout(function(){pollTimeout=null; poll();}, interval);
me.longPoll = null;
}
});
return req;
};
me = {
enableChunkedEncoding: true,
enableLongPolling: true,
additionalData: function(){
if(window.location.pathname === '/'){
return '&home=true'
}
return ''
},
callbackInterval: 15000,
backgroundCallbackInterval: 60000,
maxPollInterval: 3 * 60 * 1000,
callbacks: callbacks,
clientId: clientId,
alwaysLongPoll: false,
baseUrl: baseUrl,
ajax: (jQuery && jQuery.ajax),
noConflict: function(){
global.MessageBus = global.MessageBus.previousMessageBus;
return this;
},
diagnostics: function(){
console.log("Stopped: " + stopped + " Started: " + started);
console.log("Current callbacks");
console.log(callbacks);
console.log("Total ajax calls: " + totalAjaxCalls + " Recent failure count: " + failCount + " Total failures: " + totalAjaxFailures);
console.log("Last ajax call: " + (new Date() - lastAjax) / 1000 + " seconds ago") ;
},
pause: function() {
paused = true;
},
resume: function() {
paused = false;
processMessages(later);
later = [];
},
stop: function() {
stopped = true;
started = false;
},
// Start polling
start: function() {
var poll, delayPollTimeout;
if (started) return;
started = true;
stopped = false;
poll = function() {
var data;
if(stopped) {
return;
}
if (callbacks.length === 0) {
if(!delayPollTimeout) {
delayPollTimeout = setTimeout(function(){ delayPollTimeout = null; poll();}, 500);
}
return;
}
data = {};
for (var i=0;i<callbacks.length;i++) {
data[callbacks[i].channel] = callbacks[i].last_id;
}
me.longPoll = longPoller(poll,data);
};
// monitor visibility, issue a new long poll when the page shows
if(document.addEventListener && 'hidden' in document){
me.visibilityEvent = global.document.addEventListener('visibilitychange', function(){
if(!document.hidden && !me.longPoll && pollTimeout){
clearTimeout(pollTimeout);
pollTimeout = null;
poll();
}
});
}
poll();
},
// Subscribe to a channel
subscribe: function(channel, func, lastId) {
if(!started && !stopped){
me.start();
}
if (typeof(lastId) !== "number" || lastId < -1){
lastId = -1;
}
callbacks.push({
channel: channel,
func: func,
last_id: lastId
});
if (me.longPoll) {
me.longPoll.abort();
}
return func;
},
// Unsubscribe from a channel
unsubscribe: function(channel, func) {
// TODO allow for globbing in the middle of a channel name
// like /something/*/something
// at the moment we only support globbing /something/*
var glob;
if (channel.indexOf("*", channel.length - 1) !== -1) {
channel = channel.substr(0, channel.length - 1);
glob = true;
}
var removed = false;
for (var i=callbacks.length-1; i>=0; i--) {
var callback = callbacks[i];
var keep;
if (glob) {
keep = callback.channel.substr(0, channel.length) !== channel;
} else {
keep = callback.channel !== channel;
}
if(!keep && func && callback.func !== func){
keep = true;
}
if (!keep) {
callbacks.splice(i,1);
removed = true;
}
}
if (removed && me.longPoll) {
me.longPoll.abort();
}
return removed;
}
};
global.MessageBus = me;
})(window, document);
# Please take a look at line numbers 7 and 93
module MessageBus::Rack; end
class MessageBus::Rack::Middleware
def my_callback(env)
# this method will be overriden by plugin, I do something like in my plugin
# params = env['rack.input'].deep_dup
# params = URI.decode_www_form(params.gets)
# topic_id = params.select {|par| par.first.match /\/topic\// }.first.first.split('/').last rescue nil
# user_id = env['HTTP_COOKIE'].split('; ').first.split('=').last
# PresenceTracker::Topic.add_user(topic_id: topic_id, user_id: user_id) if topic_id && user_id
# PresenceTracker::Topic.notify_subscribers topic_id if topic_id
end
def call(env)
return @app.call(env) unless env['PATH_INFO'] =~ /^\/message-bus\//
# special debug/test route
if @bus.allow_broadcast? && env['PATH_INFO'] == '/message-bus/broadcast'.freeze
parsed = Rack::Request.new(env)
@bus.publish parsed["channel".freeze], parsed["data".freeze]
return [200,{"Content-Type".freeze => "text/html".freeze},["sent"]]
end
if env['PATH_INFO'].start_with? '/message-bus/_diagnostics'.freeze
diags = MessageBus::Rack::Diagnostics.new(@app, message_bus: @bus)
return diags.call(env)
end
client_id = env['PATH_INFO'].split("/")[2]
return [404, {}, ["not found"]] unless client_id
user_id = @bus.user_id_lookup.call(env) if @bus.user_id_lookup
group_ids = @bus.group_ids_lookup.call(env) if @bus.group_ids_lookup
site_id = @bus.site_id_lookup.call(env) if @bus.site_id_lookup
# close db connection as early as possible
close_db_connection!
client = MessageBus::Client.new(message_bus: @bus, client_id: client_id,
user_id: user_id, site_id: site_id, group_ids: group_ids)
if channels = env['message_bus.channels']
if seq = env['message_bus.seq']
client.seq = seq.to_i
end
channels.each do |k, v|
client.subscribe(k, v)
end
else
request = Rack::Request.new(env)
data = request.content_type.include?('application/json') ? JSON.parse(request.body.read) : request.POST
data.each do |k,v|
if k == "__seq".freeze
client.seq = v.to_i
else
client.subscribe(k, v)
end
end
end
headers = {}
headers["Cache-Control"] = "must-revalidate, private, max-age=0"
headers["Content-Type"] = "application/json; charset=utf-8"
headers["Pragma"] = "no-cache"
headers["Expires"] = "0"
if @bus.extra_response_headers_lookup
@bus.extra_response_headers_lookup.call(env).each do |k,v|
headers[k] = v
end
end
if env["REQUEST_METHOD"] == "OPTIONS"
return [200, headers, ["OK"]]
end
long_polling = @bus.long_polling_enabled? &&
env['QUERY_STRING'] !~ /dlp=t/.freeze &&
@connection_manager.client_count < @bus.max_active_clients
allow_chunked = env['HTTP_VERSION'.freeze] == 'HTTP/1.1'.freeze
allow_chunked &&= !env['HTTP_DONT_CHUNK'.freeze]
allow_chunked &&= @bus.chunked_encoding_enabled?
client.use_chunked = allow_chunked
backlog = client.backlog
my_callback(env)
if backlog.length > 0 && !allow_chunked
client.cancel
@bus.logger.debug "Delivering backlog #{backlog} to client #{client_id} for user #{user_id}"
[200, headers, [self.class.backlog_to_json(backlog)] ]
elsif long_polling && env['rack.hijack'] && @bus.rack_hijack_enabled?
io = env['rack.hijack'].call
# TODO disable client till deliver backlog is called
client.io = io
client.headers = headers
client.synchronize do
client.deliver_backlog(backlog)
add_client_with_timeout(client)
client.ensure_first_chunk_sent
end
[418, {}, ["I'm a teapot, undefined in spec"]]
elsif long_polling && env['async.callback']
response = nil
# load extension if needed
begin
response = Thin::AsyncResponse.new(env)
rescue NameError
require 'message_bus/rack/thin_ext'
response = Thin::AsyncResponse.new(env)
end
headers.each do |k,v|
response.headers[k] = v
end
if allow_chunked
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["Transfer-Encoding"] = "chunked"
response.headers["Content-Type"] = "text/plain; charset=utf-8"
end
response.status = 200
client.async_response = response
client.synchronize do
add_client_with_timeout(client)
client.deliver_backlog(backlog)
client.ensure_first_chunk_sent
end
throw :async
else
[200, headers, ["[]"]]
end
end
# ......
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment