Skip to content

Instantly share code, notes, and snippets.

@localhots
Last active January 21, 2016 23:29
Show Gist options
  • Save localhots/447023a78e09966db294 to your computer and use it in GitHub Desktop.
Save localhots/447023a78e09966db294 to your computer and use it in GitHub Desktop.
package main
import "time"
// Uint32BatchWithTimeout is an implementation of a batch that starts processing
// either if maximum size or timeout is reached.
type Uint32BatchWithTimeout struct {
size int
timeout time.Duration
fun func(batch []uint32)
input chan uint32
items []uint32
curTimeout <-chan time.Time
}
// NewUint32BatchWithTimeout creates a new instance of Uint32BatchWithTimeout.
func NewUint32BatchWithTimeout(
size int,
timeout time.Duration,
fun func(batch []uint32),
) *Uint32BatchWithTimeout {
b := &Uint32BatchWithTimeout{
size: size,
timeout: timeout,
fun: fun,
input: make(chan uint32),
}
b.resetTimer()
go b.collect()
return b
}
// Add adds an item to the batch. Add will panic if batch is already closed.
func (b *Uint32BatchWithTimeout) Add(item uint32) {
b.input <- item
}
// Close stops accepting items to the batch and processes remaining items.
func (b *Uint32BatchWithTimeout) Close() {
close(b.input)
}
func (b *Uint32BatchWithTimeout) collect() {
for {
select {
case item, ok := <-b.input:
if ok {
b.items = append(b.items, item)
if len(b.items) == b.size {
b.process()
}
} else {
b.process()
return
}
case <-b.curTimeout:
b.process()
}
}
}
func (b *Uint32BatchWithTimeout) process() {
if len(b.items) == 0 {
b.resetTimer()
return
}
b.fun(b.items)
b.items = []uint32{}
b.resetTimer()
}
func (b *Uint32BatchWithTimeout) resetTimer() {
b.curTimeout = time.After(b.timeout)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment