Last active
April 20, 2016 04:09
-
-
Save saiqulhaq/6277da01c372cd886529bbbccae77dc6 to your computer and use it in GitHub Desktop.
Message Bus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/*jshint bitwise: false*/ | |
// please take a look at line numbers 80-85, and 172-177 | |
(function(global, document, undefined) { | |
'use strict'; | |
longPoller = function(poll,data){ | |
var gotData = false; | |
var aborted = false; | |
lastAjax = new Date(); | |
totalAjaxCalls += 1; | |
data.__seq = totalAjaxCalls; | |
var longPoll = shouldLongPoll() && me.enableLongPolling; | |
var chunked = longPoll && allowChunked(); | |
if (chunkedBackoff > 0) { | |
chunkedBackoff--; | |
chunked = false; | |
} | |
var headers = { | |
'X-SILENCE-LOGGER': 'true' | |
}; | |
if (!chunked){ | |
headers["Dont-Chunk"] = 'true'; | |
} | |
var dataType = chunked ? "text" : "json"; | |
var handle_progress = function(payload, position) { | |
var separator = "\r\n|\r\n"; | |
var endChunk = payload.indexOf(separator, position); | |
if (endChunk === -1) { | |
return position; | |
} | |
var chunk = payload.substring(position, endChunk); | |
chunk = chunk.replace(/\r\n\|\|\r\n/g, separator); | |
try { | |
reqSuccess(JSON.parse(chunk)); | |
} catch(e) { | |
if (console.log) { | |
console.log("FAILED TO PARSE CHUNKED REPLY"); | |
console.log(data); | |
} | |
} | |
return handle_progress(payload, endChunk + separator.length); | |
} | |
var disableChunked = function(){ | |
if (me.longPoll) { | |
me.longPoll.abort(); | |
chunkedBackoff = 30; | |
} | |
}; | |
var setOnProgressListener = function(xhr) { | |
var position = 0; | |
// if it takes longer than 3000 ms to get first chunk, we have some proxy | |
// this is messing with us, so just backoff from using chunked for now | |
var chunkedTimeout = setTimeout(disableChunked,3000); | |
xhr.onprogress = function () { | |
clearTimeout(chunkedTimeout); | |
if(xhr.getResponseHeader('Content-Type') === 'application/json; charset=utf-8') { | |
// not chunked we are sending json back | |
chunked = false; | |
return; | |
} | |
position = handle_progress(xhr.responseText, position); | |
} | |
}; | |
if (!me.ajax){ | |
throw new Error("Either jQuery or the ajax adapter must be loaded"); | |
} | |
var url = me.baseUrl + "message-bus/" + me.clientId + "/poll" + (!longPoll ? "?dlp=t" : ""); | |
if(typeof me.additionalData !== 'undefined'){ | |
url += me.additionalData() | |
} | |
var req = me.ajax({ | |
url: url, | |
data: data, | |
cache: false, | |
dataType: dataType, | |
type: 'POST', | |
headers: headers, | |
messageBus: { | |
chunked: chunked, | |
onProgressListener: function(xhr) { | |
var position = 0; | |
// if it takes longer than 3000 ms to get first chunk, we have some proxy | |
// this is messing with us, so just backoff from using chunked for now | |
var chunkedTimeout = setTimeout(disableChunked,3000); | |
return xhr.onprogress = function () { | |
clearTimeout(chunkedTimeout); | |
if(xhr.getResponseHeader('Content-Type') === 'application/json; charset=utf-8') { | |
chunked = false; // not chunked, we are sending json back | |
} else { | |
position = handle_progress(xhr.responseText, position); | |
} | |
} | |
} | |
}, | |
xhr: function() { | |
var xhr = jQuery.ajaxSettings.xhr(); | |
if (!chunked) { | |
return xhr; | |
} | |
this.messageBus.onProgressListener(xhr); | |
return xhr; | |
}, | |
success: function(messages) { | |
if (!chunked) { | |
// we may have requested text so jQuery will not parse | |
if (typeof(messages) === "string") { | |
messages = JSON.parse(messages); | |
} | |
gotData = reqSuccess(messages); | |
} | |
}, | |
error: function(xhr, textStatus, err) { | |
if(textStatus === "abort") { | |
aborted = true; | |
} else { | |
failCount += 1; | |
totalAjaxFailures += 1; | |
} | |
}, | |
complete: function() { | |
var interval; | |
try { | |
if (gotData || aborted) { | |
interval = 100; | |
} else { | |
interval = me.callbackInterval; | |
if (failCount > 2) { | |
interval = interval * failCount; | |
} else if (!shouldLongPoll()) { | |
interval = me.backgroundCallbackInterval; | |
} | |
if (interval > me.maxPollInterval) { | |
interval = me.maxPollInterval; | |
} | |
interval -= (new Date() - lastAjax); | |
if (interval < 100) { | |
interval = 100; | |
} | |
} | |
} catch(e) { | |
if(console.log && e.message) { | |
console.log("MESSAGE BUS FAIL: " + e.message); | |
} | |
} | |
pollTimeout = setTimeout(function(){pollTimeout=null; poll();}, interval); | |
me.longPoll = null; | |
} | |
}); | |
return req; | |
}; | |
me = { | |
enableChunkedEncoding: true, | |
enableLongPolling: true, | |
additionalData: function(){ | |
if(window.location.pathname === '/'){ | |
return '&home=true' | |
} | |
return '' | |
}, | |
callbackInterval: 15000, | |
backgroundCallbackInterval: 60000, | |
maxPollInterval: 3 * 60 * 1000, | |
callbacks: callbacks, | |
clientId: clientId, | |
alwaysLongPoll: false, | |
baseUrl: baseUrl, | |
ajax: (jQuery && jQuery.ajax), | |
noConflict: function(){ | |
global.MessageBus = global.MessageBus.previousMessageBus; | |
return this; | |
}, | |
diagnostics: function(){ | |
console.log("Stopped: " + stopped + " Started: " + started); | |
console.log("Current callbacks"); | |
console.log(callbacks); | |
console.log("Total ajax calls: " + totalAjaxCalls + " Recent failure count: " + failCount + " Total failures: " + totalAjaxFailures); | |
console.log("Last ajax call: " + (new Date() - lastAjax) / 1000 + " seconds ago") ; | |
}, | |
pause: function() { | |
paused = true; | |
}, | |
resume: function() { | |
paused = false; | |
processMessages(later); | |
later = []; | |
}, | |
stop: function() { | |
stopped = true; | |
started = false; | |
}, | |
// Start polling | |
start: function() { | |
var poll, delayPollTimeout; | |
if (started) return; | |
started = true; | |
stopped = false; | |
poll = function() { | |
var data; | |
if(stopped) { | |
return; | |
} | |
if (callbacks.length === 0) { | |
if(!delayPollTimeout) { | |
delayPollTimeout = setTimeout(function(){ delayPollTimeout = null; poll();}, 500); | |
} | |
return; | |
} | |
data = {}; | |
for (var i=0;i<callbacks.length;i++) { | |
data[callbacks[i].channel] = callbacks[i].last_id; | |
} | |
me.longPoll = longPoller(poll,data); | |
}; | |
// monitor visibility, issue a new long poll when the page shows | |
if(document.addEventListener && 'hidden' in document){ | |
me.visibilityEvent = global.document.addEventListener('visibilitychange', function(){ | |
if(!document.hidden && !me.longPoll && pollTimeout){ | |
clearTimeout(pollTimeout); | |
pollTimeout = null; | |
poll(); | |
} | |
}); | |
} | |
poll(); | |
}, | |
// Subscribe to a channel | |
subscribe: function(channel, func, lastId) { | |
if(!started && !stopped){ | |
me.start(); | |
} | |
if (typeof(lastId) !== "number" || lastId < -1){ | |
lastId = -1; | |
} | |
callbacks.push({ | |
channel: channel, | |
func: func, | |
last_id: lastId | |
}); | |
if (me.longPoll) { | |
me.longPoll.abort(); | |
} | |
return func; | |
}, | |
// Unsubscribe from a channel | |
unsubscribe: function(channel, func) { | |
// TODO allow for globbing in the middle of a channel name | |
// like /something/*/something | |
// at the moment we only support globbing /something/* | |
var glob; | |
if (channel.indexOf("*", channel.length - 1) !== -1) { | |
channel = channel.substr(0, channel.length - 1); | |
glob = true; | |
} | |
var removed = false; | |
for (var i=callbacks.length-1; i>=0; i--) { | |
var callback = callbacks[i]; | |
var keep; | |
if (glob) { | |
keep = callback.channel.substr(0, channel.length) !== channel; | |
} else { | |
keep = callback.channel !== channel; | |
} | |
if(!keep && func && callback.func !== func){ | |
keep = true; | |
} | |
if (!keep) { | |
callbacks.splice(i,1); | |
removed = true; | |
} | |
} | |
if (removed && me.longPoll) { | |
me.longPoll.abort(); | |
} | |
return removed; | |
} | |
}; | |
global.MessageBus = me; | |
})(window, document); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Please take a look at line numbers 7 and 93 | |
module MessageBus::Rack; end | |
class MessageBus::Rack::Middleware | |
def my_callback(env) | |
# this method will be overriden by plugin, I do something like in my plugin | |
# params = env['rack.input'].deep_dup | |
# params = URI.decode_www_form(params.gets) | |
# topic_id = params.select {|par| par.first.match /\/topic\// }.first.first.split('/').last rescue nil | |
# user_id = env['HTTP_COOKIE'].split('; ').first.split('=').last | |
# PresenceTracker::Topic.add_user(topic_id: topic_id, user_id: user_id) if topic_id && user_id | |
# PresenceTracker::Topic.notify_subscribers topic_id if topic_id | |
end | |
def call(env) | |
return @app.call(env) unless env['PATH_INFO'] =~ /^\/message-bus\// | |
# special debug/test route | |
if @bus.allow_broadcast? && env['PATH_INFO'] == '/message-bus/broadcast'.freeze | |
parsed = Rack::Request.new(env) | |
@bus.publish parsed["channel".freeze], parsed["data".freeze] | |
return [200,{"Content-Type".freeze => "text/html".freeze},["sent"]] | |
end | |
if env['PATH_INFO'].start_with? '/message-bus/_diagnostics'.freeze | |
diags = MessageBus::Rack::Diagnostics.new(@app, message_bus: @bus) | |
return diags.call(env) | |
end | |
client_id = env['PATH_INFO'].split("/")[2] | |
return [404, {}, ["not found"]] unless client_id | |
user_id = @bus.user_id_lookup.call(env) if @bus.user_id_lookup | |
group_ids = @bus.group_ids_lookup.call(env) if @bus.group_ids_lookup | |
site_id = @bus.site_id_lookup.call(env) if @bus.site_id_lookup | |
# close db connection as early as possible | |
close_db_connection! | |
client = MessageBus::Client.new(message_bus: @bus, client_id: client_id, | |
user_id: user_id, site_id: site_id, group_ids: group_ids) | |
if channels = env['message_bus.channels'] | |
if seq = env['message_bus.seq'] | |
client.seq = seq.to_i | |
end | |
channels.each do |k, v| | |
client.subscribe(k, v) | |
end | |
else | |
request = Rack::Request.new(env) | |
data = request.content_type.include?('application/json') ? JSON.parse(request.body.read) : request.POST | |
data.each do |k,v| | |
if k == "__seq".freeze | |
client.seq = v.to_i | |
else | |
client.subscribe(k, v) | |
end | |
end | |
end | |
headers = {} | |
headers["Cache-Control"] = "must-revalidate, private, max-age=0" | |
headers["Content-Type"] = "application/json; charset=utf-8" | |
headers["Pragma"] = "no-cache" | |
headers["Expires"] = "0" | |
if @bus.extra_response_headers_lookup | |
@bus.extra_response_headers_lookup.call(env).each do |k,v| | |
headers[k] = v | |
end | |
end | |
if env["REQUEST_METHOD"] == "OPTIONS" | |
return [200, headers, ["OK"]] | |
end | |
long_polling = @bus.long_polling_enabled? && | |
env['QUERY_STRING'] !~ /dlp=t/.freeze && | |
@connection_manager.client_count < @bus.max_active_clients | |
allow_chunked = env['HTTP_VERSION'.freeze] == 'HTTP/1.1'.freeze | |
allow_chunked &&= !env['HTTP_DONT_CHUNK'.freeze] | |
allow_chunked &&= @bus.chunked_encoding_enabled? | |
client.use_chunked = allow_chunked | |
backlog = client.backlog | |
my_callback(env) | |
if backlog.length > 0 && !allow_chunked | |
client.cancel | |
@bus.logger.debug "Delivering backlog #{backlog} to client #{client_id} for user #{user_id}" | |
[200, headers, [self.class.backlog_to_json(backlog)] ] | |
elsif long_polling && env['rack.hijack'] && @bus.rack_hijack_enabled? | |
io = env['rack.hijack'].call | |
# TODO disable client till deliver backlog is called | |
client.io = io | |
client.headers = headers | |
client.synchronize do | |
client.deliver_backlog(backlog) | |
add_client_with_timeout(client) | |
client.ensure_first_chunk_sent | |
end | |
[418, {}, ["I'm a teapot, undefined in spec"]] | |
elsif long_polling && env['async.callback'] | |
response = nil | |
# load extension if needed | |
begin | |
response = Thin::AsyncResponse.new(env) | |
rescue NameError | |
require 'message_bus/rack/thin_ext' | |
response = Thin::AsyncResponse.new(env) | |
end | |
headers.each do |k,v| | |
response.headers[k] = v | |
end | |
if allow_chunked | |
response.headers["X-Content-Type-Options"] = "nosniff" | |
response.headers["Transfer-Encoding"] = "chunked" | |
response.headers["Content-Type"] = "text/plain; charset=utf-8" | |
end | |
response.status = 200 | |
client.async_response = response | |
client.synchronize do | |
add_client_with_timeout(client) | |
client.deliver_backlog(backlog) | |
client.ensure_first_chunk_sent | |
end | |
throw :async | |
else | |
[200, headers, ["[]"]] | |
end | |
end | |
# ...... | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment