Skip to content

Instantly share code, notes, and snippets.

@luciferous
Created January 11, 2012 00:51
Show Gist options
  • Save luciferous/1592265 to your computer and use it in GitHub Desktop.
Save luciferous/1592265 to your computer and use it in GitHub Desktop.
retryable = (fn) -> (args..., callbacks) ->
cont =
attempts: 0
return: -> callbacks.return?.apply cont, arguments
error: -> callbacks.error?.apply cont, arguments
retry: ->
cont.attempts++
fn.apply cont, args
fn.apply cont, args
source = (f) -> (args...) ->
handlers = []
state =
emit: ->
h state, arguments... for h in handlers
handlers = []
next: (h) -> handlers.push h
f.apply state, args
state
fromObserver = source (emitter, type, filter) ->
handler = => this.emit arguments... if filter arguments...
emitter.addListener type, handler
wait = (source, action) -> source.next (e) -> action e
waitOr = (sources..., action) ->
actions = [action]
handler = (e) ->
actions.pop()? e
source.next handler for source in sources
stream =
handlers: {}
addListener: (t, f) ->
stream.handlers[t] = [] unless stream.handlers[t]
stream.handlers[t].push f
fire: (t, args...) ->
f args... for f in stream.handlers[t] if stream.handlers[t]
publish: ->
console.log "Publishing...", arguments...
timer = source (s) -> setTimeout (=> this.emit()), s * 1000
reliablePublish = retryable (message) ->
stream.publish message
ack = fromObserver stream, "ack", (m) -> m.id == message.id
timeout = timer 1.5
waitOr ack, timeout, (e) =>
if e == ack then this.return message
else if this.attempts < 4 then this.retry()
else this.error()
reliablePublish id: "000", content: "hi",
return: -> console.log "Publish successful!"
error: -> console.log "Publish failed!"
#setTimeout (-> stream.fire "ack", id: "000"), 4000
streamE = events stream
reliablePublish = reactive (message) ->
stream.publish message
checkMessageId = compose (equals message.MessageId), (getAttr "MessageId")
acknowledged = filter checkMessageId, (subscribe streamE.acks)
timeout = subscribe (timer 1.5)
this.wait this.or(timeout, acknowledged), (e) =>
if e == acknowledged this.return e
else this.retry()
reliablePublish (new Message "hi")
wait = (source) ->
types = {}
make = (type) -> (args...) ->
items = types[type]
for item in items
if item.where args...
item.then args...
clearTimeout item.tid
(type, params={}) ->
params.initial or= 0
params.advance or= (x) -> x
params.where or= -> true
params.timeout or= ->
params.then or= ->
if not types[type]
types[type] = []
source.addListener type, (make type)
types[type].push params
params.tid = setTimeout (->
params.timeout arguments
types[type] = (x for x in types[type] when x != params)
), params.initial
retryable = (fn) -> (args..., callbacks) ->
cont =
attempts: 0
return: -> callbacks.return.apply cont, arguments
error: -> callbacks.error.apply cont, arguments
retry: ->
cont.attempts++
fn.apply cont, args
fn.apply cont, args
delay = (fn, initial, options={}) ->
options.attempt or= 0
options.advance or= (x) -> x
delayL = initial
for i in [0..options.attempt]
delayL = options.advance delayL
setTimeout fn, delayL
foo =
listeners: []
addListener: (channel, listener) ->
foo.listeners.push listener
func2 = retryable ->
(wait foo) "ack",
initial: 800
advance: (x) -> 2 * x
where: (m) -> m == "foo"
timeout: this.error
then: this.return
func2
error: ->
if this.attempts < 5
console.log "Timeout expired... retry #{this.attempts}"
this.retry()
return: (message) -> console.log message
setTimeout (-> listener "foo" for listener in foo.listeners), 3000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment