Created
April 4, 2017 17:45
-
-
Save washu/5a5c7e7eea829149ad37c5e3a80b747b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Message Bus Web Worker Implmentation. | |
# Performs the polling in a web worker. | |
# Input Messages | |
# start (options same as message bus) | |
# stop | |
# pause | |
# resume | |
# subscribe | |
# unsubscribe | |
# publish (hard coded to /_ap/ endpoint) sends a message to a backend channel via the worker. | |
# Output Messages (format cmd: 'key', options) | |
# ack [options: none] send a ping to the worker controller. fires in onProgress and onSuccess | |
# process [options: channel: 'chan', message: 'json data'] send the work out to another controller to handle the callback. needed as | |
# worker doesnt have dom access | |
self.cacheBuster = Math.random() * 10000 | 0 | |
self.pubsub = {} | |
self.pubsub_options = {} | |
self.stopped = false | |
self.started = false | |
self.paused = false | |
self.hasonprogress = (new XMLHttpRequest()).onprogress == null; | |
self.chunkedBackoff = 0 | |
self.later = [] | |
self.interval = null | |
self.failCount = 0 | |
self.totalAjaxCalls = 0 | |
self.subscribers = {} | |
self.call_lock = false | |
# polfill assign, workers are limited | |
if typeof Object.assign != 'function' | |
Object.assign = (target, varArgs) -> | |
'use strict' | |
if target == null | |
throw new TypeError('Cannot convert undefined or null to object') | |
to = Object(target) | |
index = 1 | |
while index < arguments.length | |
nextSource = arguments[index] | |
if nextSource != null | |
for nextKey of nextSource | |
if Object::hasOwnProperty.call(nextSource, nextKey) | |
to[nextKey] = nextSource[nextKey] | |
index++ | |
to | |
self.pubsub.ajax = (options) -> | |
call_lock = true | |
xhr = new XMLHttpRequest | |
xhr.dataType = options.dataType | |
url = options.url | |
if !options.cache | |
url += (if -1 == url.indexOf('?') then '?' else '&') + '_=' + self.cacheBuster++ | |
xhr.open 'POST', url | |
for name of options.headers | |
xhr.setRequestHeader name, options.headers[name] | |
xhr.setRequestHeader 'Content-Type', 'application/x-www-form-urlencoded' | |
if options.messageBus.chunked | |
options.messageBus.onProgressListener xhr | |
xhr.onreadystatechange = -> | |
if xhr.readyState == 4 | |
status = xhr.status | |
if status >= 200 and status < 300 or status == 304 | |
options.success xhr.responseText | |
else | |
options.error xhr, xhr.statusText | |
options.complete() | |
call_lock = false | |
if xhr.readyState == 0 | |
options.error xhr, 'Server not available' | |
options.complete() | |
call_lock = false | |
return | |
ds_string = Object.keys(options.data).reduce(((a, k) -> | |
a.push k + '=' + encodeURIComponent(options.data[k]) | |
a | |
), []).join '&' | |
xhr.send ds_string | |
xhr | |
self.addEventListener 'message', ((e) -> | |
me = self | |
data = e.data | |
me.pubsub_options = me.pubsub_options || {} | |
switch data.cmd | |
when 'start' | |
# start the polling | |
if me.longPoll && me.call_lock == false | |
me.stopped = true | |
me.longPoll.abort() | |
# current pub sub options | |
me.pubsub_options = Object.assign({},data.options) | |
# subscriptions we know about at start all channel data. | |
me.subscribers = data.data | |
me.started = true | |
me.stopped = false | |
me.paused = false | |
me.poll() | |
when 'stop' | |
# stop the polling | |
me.stopped = true | |
me.started = false | |
me.paused = false | |
# abort current running poll | |
if me.longPoll | |
me.longPoll.abort() | |
when 'resume' | |
# restart the polling | |
me.processMessages(me.later); | |
me.later = []; | |
when 'pause' | |
# pause the polling | |
me.paused = true | |
when 'subscribe' | |
me.subscribers = data.data | |
if me.longPoll | |
me.longPoll.abort() | |
when 'publish' | |
me.publishMessage(data.data.channel,data.data.message) | |
when 'unsubscribe' | |
for key in Object.keys(me.subscribers) | |
if !data.hasOwnProperty(key) | |
me.subscribers.delete(key) | |
if me.longPoll | |
me.longPoll.abort() | |
when 'abort' | |
if me.longPoll | |
me.longPoll.abort() | |
else | |
console.log("Unknown Command "+data.cmd) | |
return | |
), false | |
self.poll= ()-> | |
if stopped | |
return | |
if Object.keys(self.subscribers).length == 0 | |
setTimeout(self.poll,500) | |
return | |
if self.call_lock | |
setTimeout(self.poll, 500) | |
return | |
else | |
self.longPoll = self.longPoller() | |
return | |
self.publishMessage= (channel,message)-> | |
me = self | |
headers = 'X-SILENCE-LOGGER': 'true' | |
for name of me.pubsub_options.headers | |
headers[name] = me.pubsub_options.headers[name] | |
req = me.pubsub.ajax( | |
url: me.pubsub_options.baseUrl + '_ap/' + channel | |
data: { | |
message: JSON.stringify(message) | |
} | |
cache: false | |
async: false | |
dataType: 'json' | |
type: 'POST' | |
headers: headers | |
messageBus: {}, | |
success: ()-> | |
# noi op | |
error: (xhr, textStatus, err) -> | |
# no op | |
complete: ()-> | |
# no op | |
) | |
self.longPoller= () -> | |
gotData = false | |
aborted = false | |
lastAjax = new Date | |
me = self | |
me.totalAjaxCalls += 1 | |
me.subscribers.__seq = me.totalAjaxCalls | |
me.longPoll = me.pubsub_options.alwaysLongPoll and me.pubsub_options.enableLongPolling | |
chunked = longPoll and (me.pubsub_options.enableChunkedEncoding && me.hasonprogress) | |
if me.chunkedBackoff > 0 | |
me.chunkedBackoff-- | |
chunked = false | |
headers = 'X-SILENCE-LOGGER': 'true' | |
for name of me.pubsub_options.headers | |
headers[name] = me.pubsub_options.headers[name] | |
if !chunked | |
headers['Dont-Chunk'] = 'true' | |
dataType = if chunked then 'text' else 'json' | |
if dataType == "text" | |
headers['Accept'] = 'text/plain, */*; q=0.01' | |
headers['Content-Type'] ='application/x-www-form-urlencoded; charset=UTF-8' | |
handle_progress = (payload, position) -> | |
separator = '\u000d\n|\u000d\n' | |
endChunk = payload.indexOf(separator, position) | |
self.postMessage({cmd: 'ack'}) | |
if endChunk == -1 | |
return position | |
chunk = payload.substring(position, endChunk) | |
chunk = chunk.replace(/\r\n\|\|\r\n/g, separator) | |
try | |
me.reqSuccess JSON.parse(chunk) | |
catch e | |
if console.log | |
console.log 'FAILED TO PARSE CHUNKED REPLY' | |
console.log e | |
handle_progress payload, endChunk + separator.length | |
disableChunked = ()-> | |
if me.longPoll | |
if !self.call_lock | |
me.longPoll.abort() | |
chunkedBackoff = 30 | |
return | |
setOnProgressListener = (xhr) -> | |
position = 0 | |
# if it takes longer than 3000 ms to get first chunk, we have some proxy | |
# this is messing with us, so just backoff from using chunked for now | |
chunkedTimeout = setTimeout(disableChunked, 30000) | |
xhr.onprogress = ()-> | |
clearTimeout chunkedTimeout | |
if xhr.getResponseHeader('Content-Type') == 'application/json; charset=utf-8' | |
# not chunked we are sending json back | |
chunked = false | |
return | |
position = handle_progress(xhr.responseText, position) | |
return | |
return | |
req = me.pubsub.ajax( | |
url: me.pubsub_options.baseUrl + 'message-bus/' + me.pubsub_options.clientId + '/poll' + (if !longPoll then '?dlp=t' else '') | |
data: me.subscribers | |
cache: false | |
async: false | |
dataType: dataType | |
type: 'POST' | |
headers: headers | |
messageBus: | |
chunked: chunked | |
onProgressListener: (xhr) -> | |
position = 0 | |
# if it takes longer than 3000 ms to get first chunk, we have some proxy | |
# this is messing with us, so just backoff from using chunked for now | |
chunkedTimeout = setTimeout(disableChunked, 30000) | |
return xhr.onprogress = -> | |
clearTimeout chunkedTimeout | |
if xhr.getResponseHeader('Content-Type') == 'application/json; charset=utf-8' | |
chunked = false | |
# not chunked, we are sending json back | |
else | |
position = handle_progress(xhr.responseText, position) | |
beforeSend: (xhr,options)-> | |
if !chunked | |
return true | |
@messageBus.onProgressListener xhr | |
true | |
success: (messages) -> | |
if !chunked | |
# we may have requested text so jQuery will not parse | |
if typeof messages == 'string' | |
messages = JSON.parse(messages) | |
gotData = me.reqSuccess(messages) | |
me.failCount = 0 | |
me.call_lock = false | |
return | |
error: (xhr, textStatus, err) -> | |
if textStatus == 'abort' | |
aborted = true | |
else | |
me.failCount += 1 | |
me.totalAjaxFailures += 1 | |
me.call_lock = false | |
return | |
complete: ()-> | |
me.call_lock = false | |
interval = undefined | |
try | |
if gotData or aborted | |
interval = 100 | |
else | |
interval = me.pubsub_options.callbackInterval | |
if me.failCount > 30 | |
interval = interval * failCount | |
else if !(me.pubsub_options.alwaysLongPoll) | |
interval = me.backgroundCallbackInterval | |
if interval > me.pubsub_options.maxPollInterval | |
interval = me.pubsub_options.maxPollInterval | |
interval -= new Date - lastAjax | |
if interval < 100 | |
interval = 100 | |
catch e | |
if console && console.log | |
console.log('MESSAGE BUS FAIL: ' + e.message) | |
pollTimeout = setTimeout((-> | |
pollTimeout = null | |
me.poll() | |
return | |
), interval) | |
me.longPoll = null | |
return | |
) | |
self.postMessage({cmd: 'ack'}) | |
req | |
self.processMessages= (messages)-> | |
gotData = false | |
if typeof(messages) != 'object' | |
return false | |
i = 0 | |
while i < messages.length | |
message = messages[i] | |
for cb in Object.keys(self.subscribers) | |
if cb == message.channel | |
self.subscribers[cb] = message.message_id | |
self.postMessage({cmd:'process',channel:cb,message:message}) | |
if message.channel == "/__status" | |
if message.data[cb] != undefined | |
self.subscribers[cb] = message.data[cb] | |
gotData = true | |
i++ | |
gotData | |
self.reqSuccess= (messages)-> | |
failCount = 0 | |
if self.paused | |
if messages | |
i = 0 | |
while i < messages.length | |
self.later.push messages[i] | |
i++ | |
else | |
return self.processMessages(messages) | |
false |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment