Skip to content

Instantly share code, notes, and snippets.

@tommady
Created March 9, 2019 12:34
Show Gist options
  • Save tommady/a97921fb598aab7ed1d43b6ef7fc3db8 to your computer and use it in GitHub Desktop.
Save tommady/a97921fb598aab7ed1d43b6ef7fc3db8 to your computer and use it in GitHub Desktop.
package slice
import (
"sync"
"time"
)
type Batcher struct {
cb cb
threshold int
done chan struct{}
tunnel chan interface{}
queue []interface{}
tick *time.Ticker
mux *sync.RWMutex
}
type cb func([]interface{}) error
func New(cb cb, threshold int, within time.Duration) *Batcher {
h := &Batcher{
cb: cb,
threshold: threshold,
done: make(chan struct{}),
tunnel: make(chan interface{}, 1024),
queue: make([]interface{}, 0, 1024),
tick: time.NewTicker(within),
mux: new(sync.RWMutex),
}
h.start()
return h
}
func (h *Batcher) start() {
go func() {
for {
select {
case <-h.tick.C:
h.mux.Lock()
if len(h.queue) == 0 {
h.mux.Unlock()
continue
}
data := make([]interface{}, len(h.queue))
copy(data, h.queue)
h.queue = h.queue[:0]
h.mux.Unlock()
h.cb(data)
case <-h.done:
h.tick.Stop()
return
}
}
}()
go func() {
for {
select {
case v := <-h.tunnel:
h.mux.Lock()
h.queue = append(h.queue, v)
h.mux.Unlock()
h.mux.RLock()
if len(h.queue) >= h.threshold {
data := make([]interface{}, len(h.queue))
copy(data, h.queue)
h.cb(data)
h.queue = h.queue[:0]
}
h.mux.RUnlock()
case <-h.done:
return
}
}
}()
}
func (h *Batcher) Append(v interface{}) {
h.tunnel <- v
}
func (h *Batcher) Close() {
close(h.done)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment