Skip to content

Instantly share code, notes, and snippets.

@washu
Created April 4, 2017 17:45
Show Gist options
  • Save washu/5a5c7e7eea829149ad37c5e3a80b747b to your computer and use it in GitHub Desktop.
Save washu/5a5c7e7eea829149ad37c5e3a80b747b to your computer and use it in GitHub Desktop.
# Message Bus Web Worker Implmentation.
# Performs the polling in a web worker.
# Input Messages
# start (options same as message bus)
# stop
# pause
# resume
# subscribe
# unsubscribe
# publish (hard coded to /_ap/ endpoint) sends a message to a backend channel via the worker.
# Output Messages (format cmd: 'key', options)
# ack [options: none] send a ping to the worker controller. fires in onProgress and onSuccess
# process [options: channel: 'chan', message: 'json data'] send the work out to another controller to handle the callback. needed as
# worker doesnt have dom access
self.cacheBuster = Math.random() * 10000 | 0
self.pubsub = {}
self.pubsub_options = {}
self.stopped = false
self.started = false
self.paused = false
self.hasonprogress = (new XMLHttpRequest()).onprogress == null;
self.chunkedBackoff = 0
self.later = []
self.interval = null
self.failCount = 0
self.totalAjaxCalls = 0
self.subscribers = {}
self.call_lock = false
# polfill assign, workers are limited
if typeof Object.assign != 'function'
Object.assign = (target, varArgs) ->
'use strict'
if target == null
throw new TypeError('Cannot convert undefined or null to object')
to = Object(target)
index = 1
while index < arguments.length
nextSource = arguments[index]
if nextSource != null
for nextKey of nextSource
if Object::hasOwnProperty.call(nextSource, nextKey)
to[nextKey] = nextSource[nextKey]
index++
to
self.pubsub.ajax = (options) ->
call_lock = true
xhr = new XMLHttpRequest
xhr.dataType = options.dataType
url = options.url
if !options.cache
url += (if -1 == url.indexOf('?') then '?' else '&') + '_=' + self.cacheBuster++
xhr.open 'POST', url
for name of options.headers
xhr.setRequestHeader name, options.headers[name]
xhr.setRequestHeader 'Content-Type', 'application/x-www-form-urlencoded'
if options.messageBus.chunked
options.messageBus.onProgressListener xhr
xhr.onreadystatechange = ->
if xhr.readyState == 4
status = xhr.status
if status >= 200 and status < 300 or status == 304
options.success xhr.responseText
else
options.error xhr, xhr.statusText
options.complete()
call_lock = false
if xhr.readyState == 0
options.error xhr, 'Server not available'
options.complete()
call_lock = false
return
ds_string = Object.keys(options.data).reduce(((a, k) ->
a.push k + '=' + encodeURIComponent(options.data[k])
a
), []).join '&'
xhr.send ds_string
xhr
self.addEventListener 'message', ((e) ->
me = self
data = e.data
me.pubsub_options = me.pubsub_options || {}
switch data.cmd
when 'start'
# start the polling
if me.longPoll && me.call_lock == false
me.stopped = true
me.longPoll.abort()
# current pub sub options
me.pubsub_options = Object.assign({},data.options)
# subscriptions we know about at start all channel data.
me.subscribers = data.data
me.started = true
me.stopped = false
me.paused = false
me.poll()
when 'stop'
# stop the polling
me.stopped = true
me.started = false
me.paused = false
# abort current running poll
if me.longPoll
me.longPoll.abort()
when 'resume'
# restart the polling
me.processMessages(me.later);
me.later = [];
when 'pause'
# pause the polling
me.paused = true
when 'subscribe'
me.subscribers = data.data
if me.longPoll
me.longPoll.abort()
when 'publish'
me.publishMessage(data.data.channel,data.data.message)
when 'unsubscribe'
for key in Object.keys(me.subscribers)
if !data.hasOwnProperty(key)
me.subscribers.delete(key)
if me.longPoll
me.longPoll.abort()
when 'abort'
if me.longPoll
me.longPoll.abort()
else
console.log("Unknown Command "+data.cmd)
return
), false
self.poll= ()->
if stopped
return
if Object.keys(self.subscribers).length == 0
setTimeout(self.poll,500)
return
if self.call_lock
setTimeout(self.poll, 500)
return
else
self.longPoll = self.longPoller()
return
self.publishMessage= (channel,message)->
me = self
headers = 'X-SILENCE-LOGGER': 'true'
for name of me.pubsub_options.headers
headers[name] = me.pubsub_options.headers[name]
req = me.pubsub.ajax(
url: me.pubsub_options.baseUrl + '_ap/' + channel
data: {
message: JSON.stringify(message)
}
cache: false
async: false
dataType: 'json'
type: 'POST'
headers: headers
messageBus: {},
success: ()->
# noi op
error: (xhr, textStatus, err) ->
# no op
complete: ()->
# no op
)
self.longPoller= () ->
gotData = false
aborted = false
lastAjax = new Date
me = self
me.totalAjaxCalls += 1
me.subscribers.__seq = me.totalAjaxCalls
me.longPoll = me.pubsub_options.alwaysLongPoll and me.pubsub_options.enableLongPolling
chunked = longPoll and (me.pubsub_options.enableChunkedEncoding && me.hasonprogress)
if me.chunkedBackoff > 0
me.chunkedBackoff--
chunked = false
headers = 'X-SILENCE-LOGGER': 'true'
for name of me.pubsub_options.headers
headers[name] = me.pubsub_options.headers[name]
if !chunked
headers['Dont-Chunk'] = 'true'
dataType = if chunked then 'text' else 'json'
if dataType == "text"
headers['Accept'] = 'text/plain, */*; q=0.01'
headers['Content-Type'] ='application/x-www-form-urlencoded; charset=UTF-8'
handle_progress = (payload, position) ->
separator = '\u000d\n|\u000d\n'
endChunk = payload.indexOf(separator, position)
self.postMessage({cmd: 'ack'})
if endChunk == -1
return position
chunk = payload.substring(position, endChunk)
chunk = chunk.replace(/\r\n\|\|\r\n/g, separator)
try
me.reqSuccess JSON.parse(chunk)
catch e
if console.log
console.log 'FAILED TO PARSE CHUNKED REPLY'
console.log e
handle_progress payload, endChunk + separator.length
disableChunked = ()->
if me.longPoll
if !self.call_lock
me.longPoll.abort()
chunkedBackoff = 30
return
setOnProgressListener = (xhr) ->
position = 0
# if it takes longer than 3000 ms to get first chunk, we have some proxy
# this is messing with us, so just backoff from using chunked for now
chunkedTimeout = setTimeout(disableChunked, 30000)
xhr.onprogress = ()->
clearTimeout chunkedTimeout
if xhr.getResponseHeader('Content-Type') == 'application/json; charset=utf-8'
# not chunked we are sending json back
chunked = false
return
position = handle_progress(xhr.responseText, position)
return
return
req = me.pubsub.ajax(
url: me.pubsub_options.baseUrl + 'message-bus/' + me.pubsub_options.clientId + '/poll' + (if !longPoll then '?dlp=t' else '')
data: me.subscribers
cache: false
async: false
dataType: dataType
type: 'POST'
headers: headers
messageBus:
chunked: chunked
onProgressListener: (xhr) ->
position = 0
# if it takes longer than 3000 ms to get first chunk, we have some proxy
# this is messing with us, so just backoff from using chunked for now
chunkedTimeout = setTimeout(disableChunked, 30000)
return xhr.onprogress = ->
clearTimeout chunkedTimeout
if xhr.getResponseHeader('Content-Type') == 'application/json; charset=utf-8'
chunked = false
# not chunked, we are sending json back
else
position = handle_progress(xhr.responseText, position)
beforeSend: (xhr,options)->
if !chunked
return true
@messageBus.onProgressListener xhr
true
success: (messages) ->
if !chunked
# we may have requested text so jQuery will not parse
if typeof messages == 'string'
messages = JSON.parse(messages)
gotData = me.reqSuccess(messages)
me.failCount = 0
me.call_lock = false
return
error: (xhr, textStatus, err) ->
if textStatus == 'abort'
aborted = true
else
me.failCount += 1
me.totalAjaxFailures += 1
me.call_lock = false
return
complete: ()->
me.call_lock = false
interval = undefined
try
if gotData or aborted
interval = 100
else
interval = me.pubsub_options.callbackInterval
if me.failCount > 30
interval = interval * failCount
else if !(me.pubsub_options.alwaysLongPoll)
interval = me.backgroundCallbackInterval
if interval > me.pubsub_options.maxPollInterval
interval = me.pubsub_options.maxPollInterval
interval -= new Date - lastAjax
if interval < 100
interval = 100
catch e
if console && console.log
console.log('MESSAGE BUS FAIL: ' + e.message)
pollTimeout = setTimeout((->
pollTimeout = null
me.poll()
return
), interval)
me.longPoll = null
return
)
self.postMessage({cmd: 'ack'})
req
self.processMessages= (messages)->
gotData = false
if typeof(messages) != 'object'
return false
i = 0
while i < messages.length
message = messages[i]
for cb in Object.keys(self.subscribers)
if cb == message.channel
self.subscribers[cb] = message.message_id
self.postMessage({cmd:'process',channel:cb,message:message})
if message.channel == "/__status"
if message.data[cb] != undefined
self.subscribers[cb] = message.data[cb]
gotData = true
i++
gotData
self.reqSuccess= (messages)->
failCount = 0
if self.paused
if messages
i = 0
while i < messages.length
self.later.push messages[i]
i++
else
return self.processMessages(messages)
false
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment