Skip to content

Instantly share code, notes, and snippets.

@FZambia
Created September 11, 2017 10:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FZambia/c3c77aa541de2647a95e20f26e99c130 to your computer and use it in GitHub Desktop.
Save FZambia/c3c77aa541de2647a95e20f26e99c130 to your computer and use it in GitHub Desktop.
Smartbatching go
package smartbatching
import (
"errors"
"sync"
)
// T is a type Batcher works with.
type T interface{}
// Config contains options to configure batching process.
type Config struct {
MaxBatchSize int
ChannelBufferSize int
BlockOnPut bool
}
// Batcher allows to collect batch of items and then call callback function with that batch.
type Batcher interface {
Run() error
Put(data T) chan error
Stop() error
}
// Item represents single item in batch slice passed to callback when batch ready.
type Item interface {
Data() T
}
type batchItem struct {
data interface{}
errCh chan error
}
// Data allows to extract original data from batch item.
func (i *batchItem) Data() T {
return i.data
}
type batcher struct {
mu sync.RWMutex
config *Config
cb func([]Item) error
itemCh chan *batchItem
closeCh chan struct{}
closed bool
}
// New creates new Batcher.
func New(config *Config, batchCallback func([]Item) error) Batcher {
return &batcher{
config: config,
cb: batchCallback,
itemCh: make(chan *batchItem, config.ChannelBufferSize),
closeCh: make(chan struct{}),
}
}
// Run runs batcher routine.
func (b *batcher) Run() error {
go b.process()
return nil
}
// ErrQueueFull returned from Put method in case of internal batcher
// channel buffer is full.
// Enable BlockOnPut to prevent returning this error and block on
// sending to internal batcher channel.
var ErrQueueFull = errors.New("batch queue is full")
func (b *batcher) Put(data T) chan error {
errCh := make(chan error, 1)
item := &batchItem{
data: data,
errCh: errCh,
}
if b.config.BlockOnPut {
b.itemCh <- item
return item.errCh
}
select {
case b.itemCh <- item:
return item.errCh
default:
errCh <- ErrQueueFull
return errCh
}
}
func (b *batcher) Stop() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return nil
}
close(b.closeCh)
b.closed = true
return nil
}
func (b *batcher) collect(items *[]Item) {
for {
select {
case item := <-b.itemCh:
*items = append(*items, item)
if len(*items) >= b.config.MaxBatchSize {
return
}
default:
return
}
}
}
func (b *batcher) process() {
for {
select {
case item := <-b.itemCh:
items := []Item{item}
b.collect(&items)
err := b.cb(items)
for _, i := range items {
i.(*batchItem).errCh <- err
}
case <-b.closeCh:
return
}
}
}
package smartbatching
import (
"testing"
)
func TestBatcher(t *testing.T) {
c := &Config{
MaxBatchSize: 10,
ChannelBufferSize: 128,
BlockOnPut: true,
}
cb := func(items []Item) error {
return nil
}
b := New(c, cb)
b.Run()
defer b.Stop()
errs := []chan error{}
for i := 0; i < 10000; i++ {
err := b.Put("data")
errs = append(errs, err)
}
for _, errCh := range errs {
err := <-errCh
if err != nil {
t.Error(err)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment