Skip to content

Instantly share code, notes, and snippets.

@olivere
Created November 26, 2015 08:50
Show Gist options
  • Save olivere/a6dc27ae961736721d2c to your computer and use it in GitHub Desktop.
Save olivere/a6dc27ae961736721d2c to your computer and use it in GitHub Desktop.
Very simple bulk pool for elastic/Elasticsearch.
package bulk
import (
"errors"
"sync"
"time"
"gopkg.in/olivere/elastic.v3"
)
const (
// NumRetries is the default number of retries for a bulk request.
NumRetries = 5
// SleepMillis is the number of milliseconds to wait between retries (initially).
// The sleep interval gets larger the more retries fail.
SleepMillis = 1000
)
// Pool is a pool of workers performing bulk requests to Elasticsearch.
// A pool ensures that Elasticsearch doesn't get overloaded with many
// individual bulk requests.
//
// There is typically just one pool of index workers for the whole application.
//
// Example:
// // Set up a pool of 5 workers
// pool, err := bulk.NewPool(5)
// if err != nil { ... }
//
// // Create a response channel for requests
// responseCh := make(chan error)
// defer close(responseCh)
//
// // Create a bulk service from Elasticsearch
// svc, err := env.ES.Bulk().Index(...).Type(...)
// if err != nil { ... }
//
// for {
// // Break on some condition
// if stop {
// break
// }
//
// // Fill svc with bulk requests here
// indexReq := elastic.NewBulkIndexRequest().Id(...).Type(...).Doc(...)
// svc.Add(indexReq)
//
// // Commit every 1000 docs
// if svc.NumberOfActions() >= 1000 {
// // Enqueue a request
// pool.Commit(svc, responseCh)
//
// // Wait for response, either success or failure
// err <- responseCh
// if err != nil { ... }
//
// // Set up new bulk service
// svc, err = env.ES.Bulk().Index(...).Type(...)
// if err != nil { ... }
// }
// }
//
// // Final flush
// if svc.NumberOfActions() > 0 {
// // Enqueue a request
// err = pool.Commit(svc, responseCh)
// if err != nil { ... }
//
// // Wait for response, either success or failure
// err <- responseCh
// if err != nil { ... }
//
// // Set up new bulk service
// svc, err = env.ES.Bulk().Index(...).Type(...)
// if err != nil { ... }
// }
type Pool struct {
sync.Mutex
workers int
requests chan *request
done chan bool
running bool
}
// request encapsulates a single bulk insert request.
type request struct {
service *elastic.BulkService // bulk service
retries int // number of retries before giving up
sleepMillis int // time to sleep between retries (initial)
// Response channel that emits nil on success and an error on failure.
Response chan error
}
// NewPool sets up a new pool for bulk insertion into Elasticsearch.
func NewPool(workers int) (*Pool, error) {
if workers <= 0 {
return nil, errors.New("invalid number of workers")
}
p := &Pool{
workers: workers,
requests: make(chan *request),
done: make(chan bool),
}
for i := 0; i < p.workers; i++ {
go p.worker(p.requests, p.done)
}
p.Lock()
p.running = true
p.Unlock()
return p, nil
}
// Close will close the pool, stop all underlying workers and close all
// channels. The pool is unusable after calling Close; you need to set up
// a new pool.
func (p *Pool) Close() error {
p.Lock()
defer p.Unlock()
if !p.running {
return nil
}
for i := 0; i < p.workers; i++ {
p.done <- true
}
for i := 0; i < p.workers; i++ {
<-p.done
}
close(p.requests)
close(p.done)
p.running = false
return nil
}
// isRunning indicates whether the pool is running.
func (p *Pool) isRunning() bool {
p.Lock()
defer p.Unlock()
return p.running
}
// Commit will commit a single bulk to Elasticsearch.
func (p *Pool) Commit(service *elastic.BulkService, responseCh chan error) {
p.requests <- &request{
service: service,
retries: NumRetries,
sleepMillis: SleepMillis,
Response: responseCh,
}
}
// worker is an individual worker that consumes requests and commits them
// to Elasticsearch.
func (p *Pool) worker(requests chan *request, done chan bool) {
for {
select {
case r := <-requests:
p.commit(r)
case <-done:
done <- true
return
}
}
}
// commit bulk commits the request to Elasticsearch. It will retry
// automatically in case of errors.
func (p *Pool) commit(r *request) {
sleepMillis := r.sleepMillis
for {
_, err := r.service.Do()
if err != nil {
r.retries -= 1
if r.retries < 0 {
r.Response <- err
return
}
time.Sleep(time.Duration(sleepMillis) * time.Millisecond)
sleepMillis += sleepMillis
} else {
break
}
}
r.Response <- nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment