Skip to content

Instantly share code, notes, and snippets.

@tommady
Created March 9, 2019 12:43
Show Gist options
  • Save tommady/37e61e6bce18f167ea301b144296765b to your computer and use it in GitHub Desktop.
Save tommady/37e61e6bce18f167ea301b144296765b to your computer and use it in GitHub Desktop.
package channelSwap
import (
"sync"
"time"
)
type Batcher struct {
cb cb
threshold int
done chan struct{}
tunnel chan interface{}
dirtyQ chan interface{}
queue chan interface{}
tick *time.Ticker
mux *sync.RWMutex
}
type cb func([]interface{}) error
func New(cb cb, threshold int, within time.Duration) *Batcher {
h := &Batcher{
cb: cb,
threshold: threshold,
done: make(chan struct{}),
tunnel: make(chan interface{}, 1024),
dirtyQ: make(chan interface{}, 1024),
queue: make(chan interface{}, 1024),
tick: time.NewTicker(within),
mux: new(sync.RWMutex),
}
h.start()
return h
}
func (h *Batcher) start() {
go func() {
for {
select {
case <-h.tick.C:
h.mux.Lock()
t := h.queue
h.queue = h.dirtyQ
h.dirtyQ = t
h.mux.Unlock()
length := len(h.dirtyQ)
if length == 0 {
continue
}
data := make([]interface{}, length)
for i := 0; i < length; i++ {
data[i] = <-h.dirtyQ
}
h.cb(data)
case <-h.done:
h.tick.Stop()
return
}
}
}()
go func() {
for {
select {
case v := <-h.tunnel:
h.mux.RLock()
h.queue <- v
if len(h.queue) >= h.threshold {
data := make([]interface{}, h.threshold)
for i := 0; i < h.threshold; i++ {
data[i] = <-h.queue
}
h.cb(data)
}
h.mux.RUnlock()
case <-h.done:
return
}
}
}()
}
func (h *Batcher) Append(v interface{}) {
h.tunnel <- v
}
func (h *Batcher) Close() {
close(h.done)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment