Skip to content

Instantly share code, notes, and snippets.

@tommady
Created March 9, 2019 12:40
Show Gist options
  • Save tommady/b6b705fb1e10e14a51db34deffc74acd to your computer and use it in GitHub Desktop.
Save tommady/b6b705fb1e10e14a51db34deffc74acd to your computer and use it in GitHub Desktop.
package sliceSwap
import (
"sync"
"time"
)
type Batcher struct {
cb cb
threshold int
done chan struct{}
tunnel chan interface{}
dirtyQ []interface{}
queue []interface{}
tick *time.Ticker
mux *sync.RWMutex
}
type cb func([]interface{}) error
func New(cb cb, threshold int, within time.Duration) *Batcher {
h := &Batcher{
cb: cb,
threshold: threshold,
done: make(chan struct{}),
tunnel: make(chan interface{}, 1024),
dirtyQ: make([]interface{}, 0, 1024),
queue: make([]interface{}, 0, 1024),
tick: time.NewTicker(within),
mux: new(sync.RWMutex),
}
h.start()
return h
}
func (h *Batcher) start() {
go func() {
for {
select {
case <-h.tick.C:
h.mux.Lock()
t := h.queue
h.queue = h.dirtyQ
h.dirtyQ = t
h.mux.Unlock()
length := len(h.dirtyQ)
if length == 0 {
continue
}
data := make([]interface{}, length)
copy(data, h.dirtyQ)
h.dirtyQ = h.dirtyQ[:0]
h.cb(data)
case <-h.done:
h.tick.Stop()
return
}
}
}()
go func() {
for {
select {
case v := <-h.tunnel:
h.mux.Lock()
h.queue = append(h.queue, v)
h.mux.Unlock()
h.mux.RLock()
if len(h.queue) >= h.threshold {
data := make([]interface{}, len(h.queue))
copy(data, h.queue)
h.cb(data)
h.queue = h.queue[:0]
}
h.mux.RUnlock()
case <-h.done:
return
}
}
}()
}
func (h *Batcher) Append(v interface{}) {
h.tunnel <- v
}
func (h *Batcher) Close() {
close(h.done)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment