Created
January 1, 2015 14:41
-
-
Save chreekat/f872747b77bd27767ece to your computer and use it in GitHub Desktop.
Round robin throttle
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# roundRobinThrottle :: Int -> [EventStream] -> EventStream | |
roundRobinThrottle = (rate, streams) -> | |
# Merge the input streams into a single, keyed stream | |
theStream = Bacon.mergeAll(streams.map((s, idx) -> | |
s.map((val) -> { | |
type: 'value' | |
index: idx | |
value: val | |
}) | |
)) | |
# Merge in 'end' events | |
.merge(Bacon.mergeAll(streams.map((s) -> | |
s.mapEnd(-> { | |
type: 'end' | |
}) | |
))) | |
# Merge in 'send' events that come at the desired interval | |
.merge(Bacon.interval(rate, { type: 'send' })) | |
# Feed into a state machine that keeps queues and only creates | |
# output events on 'send' input events. | |
.withStateMachine( | |
{ | |
queues: streams.map(-> []) | |
toPush: 0 | |
ended: 0 | |
} | |
handleState | |
) | |
handleState = (state, baconEvent) -> | |
outEvents = [] | |
if baconEvent.hasValue() | |
# Handle a round robin event of 'value', 'send', or 'end' | |
outEvents = handleRoundRobinEvent(state, baconEvent.value()) | |
else | |
outEvents = [baconEvent] | |
[state, outEvents] | |
handleRoundRobinEvent = (state, rrEvent) -> | |
outEvents = [] | |
# 'value' : push onto queue | |
if rrEvent.type == 'value' | |
state.queues[rrEvent.index].push(rrEvent.value) | |
# 'send' : send the next value by round-robin selection | |
else if rrEvent.type == 'send' | |
# Here's a sentinel for empty queues | |
noValue = {} | |
nextValue = noValue | |
triedQueues = 0 | |
while nextValue == noValue && triedQueues < state.queues.length | |
if state.queues[state.toPush].length > 0 | |
nextValue = state.queues[state.toPush].shift() | |
state.toPush = (state.toPush + 1) % state.queues.length | |
triedQueues++ | |
if nextValue != noValue | |
outEvents.push(new Bacon.Next(nextValue)) | |
# 'end': Keep track of ended streams | |
else if rrEvent.type == 'end' | |
state.ended++ | |
# End the round-robin stream if all inputs have ended | |
if roundRobinEnded(state) | |
outEvents.push(new Bacon.End()) | |
outEvents | |
roundRobinEnded = (state) -> | |
emptyQueues = allEmpty(state.queues) | |
emptyQueues && state.ended == state.queues.length | |
allEmpty = (arrays) -> | |
for a in arrays | |
return false if a.length > 0 | |
return true |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment