Skip to content

Instantly share code, notes, and snippets.

@simcap
Created June 6, 2016 13:01
Show Gist options
  • Save simcap/8958d3de68252fbf7e9b2b5e38566bac to your computer and use it in GitHub Desktop.
Save simcap/8958d3de68252fbf7e9b2b5e38566bac to your computer and use it in GitHub Desktop.
Prez - Replay aggregator
func (a *replayAggregator) Run() {
var timer = time.NewTimer(a.flushFrequency)
var elapsed <-chan time.Time
var out chan []*sarama.ConsumerMessage
for {
select {
case msg := <-a.in:
if !a.slicer.add(msg) {
a.out <- a.slicer.flush()
a.slicer.reset()
elapsed = nil
}
if elapsed == nil {
timer.Reset(a.flushFrequency)
elapsed = timer.C
}
case <-elapsed:
out = a.out
elapsed = nil
case out <- a.slicer.flush():
a.slicer.reset()
out = nil
case err := <-a.errs:
log.Printf("consuming error: %s", err)
case <-a.closing:
log.Println("closing replay aggregator")
timer.Stop()
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment