Skip to content

Instantly share code, notes, and snippets.

@eaigner
Created August 21, 2013 15:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eaigner/6295652 to your computer and use it in GitHub Desktop.
Save eaigner/6295652 to your computer and use it in GitHub Desktop.
Coalesce DB writes using a coordinator
package coord
import (
"sync"
)
// FlushFunc takes all pending objects, writes them, and returns
// the results in the same order, or an error.
type FlushFunc func(v []interface{}) ([]interface{}, error)
// WCoord is a write coordinator for efficient write operation batching.
type WCoord struct {
f FlushFunc
mtx sync.Mutex
queue []*job
}
func NewWCoord(f FlushFunc) *WCoord {
c := &WCoord{
f: f,
queue: make([]*job, 0, 100),
}
return c
}
// Write enqueues the object for batch writing and blocks until the
// value was flushed.
func (c *WCoord) Write(v interface{}) (interface{}, error) {
c.mtx.Lock()
j := &job{v: v}
j.Add(1)
c.queue = append(c.queue, j)
c.mtx.Unlock()
j.Wait()
return j.r, j.err
}
// Flush writes all objects. All pending Write callers are unblocked after
// the data has been written.
func (c *WCoord) Flush() {
c.mtx.Lock()
jobs := c.queue
c.queue = make([]*job, 0, 100)
c.mtx.Unlock()
a := make([]interface{}, 0, len(jobs))
for _, j := range jobs {
a = append(a, j.v)
}
r, err := c.f(a)
for i, j := range jobs {
if err != nil {
j.err = err
} else {
j.r = r[i]
}
j.Done()
}
}
type job struct {
sync.WaitGroup
v interface{}
r interface{}
err error
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment