Created
January 11, 2012 00:51
-
-
Save luciferous/1592265 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
retryable = (fn) -> (args..., callbacks) -> | |
cont = | |
attempts: 0 | |
return: -> callbacks.return?.apply cont, arguments | |
error: -> callbacks.error?.apply cont, arguments | |
retry: -> | |
cont.attempts++ | |
fn.apply cont, args | |
fn.apply cont, args | |
source = (f) -> (args...) -> | |
handlers = [] | |
state = | |
emit: -> | |
h state, arguments... for h in handlers | |
handlers = [] | |
next: (h) -> handlers.push h | |
f.apply state, args | |
state | |
fromObserver = source (emitter, type, filter) -> | |
handler = => this.emit arguments... if filter arguments... | |
emitter.addListener type, handler | |
wait = (source, action) -> source.next (e) -> action e | |
waitOr = (sources..., action) -> | |
actions = [action] | |
handler = (e) -> | |
actions.pop()? e | |
source.next handler for source in sources | |
stream = | |
handlers: {} | |
addListener: (t, f) -> | |
stream.handlers[t] = [] unless stream.handlers[t] | |
stream.handlers[t].push f | |
fire: (t, args...) -> | |
f args... for f in stream.handlers[t] if stream.handlers[t] | |
publish: -> | |
console.log "Publishing...", arguments... | |
timer = source (s) -> setTimeout (=> this.emit()), s * 1000 | |
reliablePublish = retryable (message) -> | |
stream.publish message | |
ack = fromObserver stream, "ack", (m) -> m.id == message.id | |
timeout = timer 1.5 | |
waitOr ack, timeout, (e) => | |
if e == ack then this.return message | |
else if this.attempts < 4 then this.retry() | |
else this.error() | |
reliablePublish id: "000", content: "hi", | |
return: -> console.log "Publish successful!" | |
error: -> console.log "Publish failed!" | |
#setTimeout (-> stream.fire "ack", id: "000"), 4000 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
streamE = events stream | |
reliablePublish = reactive (message) -> | |
stream.publish message | |
checkMessageId = compose (equals message.MessageId), (getAttr "MessageId") | |
acknowledged = filter checkMessageId, (subscribe streamE.acks) | |
timeout = subscribe (timer 1.5) | |
this.wait this.or(timeout, acknowledged), (e) => | |
if e == acknowledged this.return e | |
else this.retry() | |
reliablePublish (new Message "hi") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
wait = (source) -> | |
types = {} | |
make = (type) -> (args...) -> | |
items = types[type] | |
for item in items | |
if item.where args... | |
item.then args... | |
clearTimeout item.tid | |
(type, params={}) -> | |
params.initial or= 0 | |
params.advance or= (x) -> x | |
params.where or= -> true | |
params.timeout or= -> | |
params.then or= -> | |
if not types[type] | |
types[type] = [] | |
source.addListener type, (make type) | |
types[type].push params | |
params.tid = setTimeout (-> | |
params.timeout arguments | |
types[type] = (x for x in types[type] when x != params) | |
), params.initial | |
retryable = (fn) -> (args..., callbacks) -> | |
cont = | |
attempts: 0 | |
return: -> callbacks.return.apply cont, arguments | |
error: -> callbacks.error.apply cont, arguments | |
retry: -> | |
cont.attempts++ | |
fn.apply cont, args | |
fn.apply cont, args | |
delay = (fn, initial, options={}) -> | |
options.attempt or= 0 | |
options.advance or= (x) -> x | |
delayL = initial | |
for i in [0..options.attempt] | |
delayL = options.advance delayL | |
setTimeout fn, delayL | |
foo = | |
listeners: [] | |
addListener: (channel, listener) -> | |
foo.listeners.push listener | |
func2 = retryable -> | |
(wait foo) "ack", | |
initial: 800 | |
advance: (x) -> 2 * x | |
where: (m) -> m == "foo" | |
timeout: this.error | |
then: this.return | |
func2 | |
error: -> | |
if this.attempts < 5 | |
console.log "Timeout expired... retry #{this.attempts}" | |
this.retry() | |
return: (message) -> console.log message | |
setTimeout (-> listener "foo" for listener in foo.listeners), 3000 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment