Created
September 11, 2017 10:48
-
-
Save FZambia/c3c77aa541de2647a95e20f26e99c130 to your computer and use it in GitHub Desktop.
Smartbatching go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package smartbatching | |
import ( | |
"errors" | |
"sync" | |
) | |
// T is a type Batcher works with. | |
type T interface{} | |
// Config contains options to configure batching process. | |
type Config struct { | |
MaxBatchSize int | |
ChannelBufferSize int | |
BlockOnPut bool | |
} | |
// Batcher allows to collect batch of items and then call callback function with that batch. | |
type Batcher interface { | |
Run() error | |
Put(data T) chan error | |
Stop() error | |
} | |
// Item represents single item in batch slice passed to callback when batch ready. | |
type Item interface { | |
Data() T | |
} | |
type batchItem struct { | |
data interface{} | |
errCh chan error | |
} | |
// Data allows to extract original data from batch item. | |
func (i *batchItem) Data() T { | |
return i.data | |
} | |
type batcher struct { | |
mu sync.RWMutex | |
config *Config | |
cb func([]Item) error | |
itemCh chan *batchItem | |
closeCh chan struct{} | |
closed bool | |
} | |
// New creates new Batcher. | |
func New(config *Config, batchCallback func([]Item) error) Batcher { | |
return &batcher{ | |
config: config, | |
cb: batchCallback, | |
itemCh: make(chan *batchItem, config.ChannelBufferSize), | |
closeCh: make(chan struct{}), | |
} | |
} | |
// Run runs batcher routine. | |
func (b *batcher) Run() error { | |
go b.process() | |
return nil | |
} | |
// ErrQueueFull returned from Put method in case of internal batcher | |
// channel buffer is full. | |
// Enable BlockOnPut to prevent returning this error and block on | |
// sending to internal batcher channel. | |
var ErrQueueFull = errors.New("batch queue is full") | |
func (b *batcher) Put(data T) chan error { | |
errCh := make(chan error, 1) | |
item := &batchItem{ | |
data: data, | |
errCh: errCh, | |
} | |
if b.config.BlockOnPut { | |
b.itemCh <- item | |
return item.errCh | |
} | |
select { | |
case b.itemCh <- item: | |
return item.errCh | |
default: | |
errCh <- ErrQueueFull | |
return errCh | |
} | |
} | |
func (b *batcher) Stop() error { | |
b.mu.Lock() | |
defer b.mu.Unlock() | |
if b.closed { | |
return nil | |
} | |
close(b.closeCh) | |
b.closed = true | |
return nil | |
} | |
func (b *batcher) collect(items *[]Item) { | |
for { | |
select { | |
case item := <-b.itemCh: | |
*items = append(*items, item) | |
if len(*items) >= b.config.MaxBatchSize { | |
return | |
} | |
default: | |
return | |
} | |
} | |
} | |
func (b *batcher) process() { | |
for { | |
select { | |
case item := <-b.itemCh: | |
items := []Item{item} | |
b.collect(&items) | |
err := b.cb(items) | |
for _, i := range items { | |
i.(*batchItem).errCh <- err | |
} | |
case <-b.closeCh: | |
return | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package smartbatching | |
import ( | |
"testing" | |
) | |
func TestBatcher(t *testing.T) { | |
c := &Config{ | |
MaxBatchSize: 10, | |
ChannelBufferSize: 128, | |
BlockOnPut: true, | |
} | |
cb := func(items []Item) error { | |
return nil | |
} | |
b := New(c, cb) | |
b.Run() | |
defer b.Stop() | |
errs := []chan error{} | |
for i := 0; i < 10000; i++ { | |
err := b.Put("data") | |
errs = append(errs, err) | |
} | |
for _, errCh := range errs { | |
err := <-errCh | |
if err != nil { | |
t.Error(err) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment