Skip to content

Instantly share code, notes, and snippets.

@chreekat
Created January 1, 2015 14:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chreekat/f872747b77bd27767ece to your computer and use it in GitHub Desktop.
Save chreekat/f872747b77bd27767ece to your computer and use it in GitHub Desktop.
Round robin throttle
# roundRobinThrottle :: Int -> [EventStream] -> EventStream
roundRobinThrottle = (rate, streams) ->
# Merge the input streams into a single, keyed stream
theStream = Bacon.mergeAll(streams.map((s, idx) ->
s.map((val) -> {
type: 'value'
index: idx
value: val
})
))
# Merge in 'end' events
.merge(Bacon.mergeAll(streams.map((s) ->
s.mapEnd(-> {
type: 'end'
})
)))
# Merge in 'send' events that come at the desired interval
.merge(Bacon.interval(rate, { type: 'send' }))
# Feed into a state machine that keeps queues and only creates
# output events on 'send' input events.
.withStateMachine(
{
queues: streams.map(-> [])
toPush: 0
ended: 0
}
handleState
)
handleState = (state, baconEvent) ->
outEvents = []
if baconEvent.hasValue()
# Handle a round robin event of 'value', 'send', or 'end'
outEvents = handleRoundRobinEvent(state, baconEvent.value())
else
outEvents = [baconEvent]
[state, outEvents]
handleRoundRobinEvent = (state, rrEvent) ->
outEvents = []
# 'value' : push onto queue
if rrEvent.type == 'value'
state.queues[rrEvent.index].push(rrEvent.value)
# 'send' : send the next value by round-robin selection
else if rrEvent.type == 'send'
# Here's a sentinel for empty queues
noValue = {}
nextValue = noValue
triedQueues = 0
while nextValue == noValue && triedQueues < state.queues.length
if state.queues[state.toPush].length > 0
nextValue = state.queues[state.toPush].shift()
state.toPush = (state.toPush + 1) % state.queues.length
triedQueues++
if nextValue != noValue
outEvents.push(new Bacon.Next(nextValue))
# 'end': Keep track of ended streams
else if rrEvent.type == 'end'
state.ended++
# End the round-robin stream if all inputs have ended
if roundRobinEnded(state)
outEvents.push(new Bacon.End())
outEvents
roundRobinEnded = (state) ->
emptyQueues = allEmpty(state.queues)
emptyQueues && state.ended == state.queues.length
allEmpty = (arrays) ->
for a in arrays
return false if a.length > 0
return true
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment