Skip to content

Instantly share code, notes, and snippets.

@burke
Created June 21, 2013 18:41
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save burke/5833358 to your computer and use it in GitHub Desktop.
Save burke/5833358 to your computer and use it in GitHub Desktop.
// package spooler implements a disk-persistent queue.
//
// Spooler uses MDB (LMDB) to implement a queue of byteslices. Its intended usecase
// is to enqueue work items received by a service before later working them off.
// Note that Spooler only flushes to disk up to once every 25ms. As a result,
// if the process or machine crashes unexpectedly, the most recent 25ms of work
// can be lost. This decision effectively increases throughput by 10,000%,
// but makes spooler unsuitable for jobs that absolutely cannot be allowed to fail
// under any circumstances.
package spooler
import (
"encoding/binary"
mdb "github.com/szferi/gomdb"
"sync"
"time"
)
type Spooler struct {
db *mdb.Env
namespace string
dirty *sync.Cond
}
func (s *Spooler) Close() {
s.db.Close()
}
func New(path, namespace string) *Spooler {
env, _ := mdb.NewEnv()
env.SetMapSize(10485760)
env.SetMaxDBs(1)
// WRITEMAP | NOSYNC basically means we're using mmap and suppressing
// automatic fsync. We are responsible for flushing.
env.Open(path, mdb.NOSYNC|mdb.WRITEMAP, 0644)
var m sync.Mutex
dirty := sync.NewCond(&m)
// synchronously flushing to disk amounts to terrible performance.
// If we happen to crash hard, we lose up to 25ms of enqueued work.
// ...so don't use this for collecting money. Pretty much anything
// else is fine.
go func() {
for {
m.Lock()
dirty.Wait()
env.Sync(1)
time.Sleep(25 * time.Millisecond)
}
}()
return &Spooler{db: env, namespace: namespace, dirty: dirty}
}
func (s *Spooler) Enqueue(el []byte) error {
txn, err := s.db.BeginTxn(nil, 0)
if err != nil {
return err
}
dbi, err := txn.DBIOpen(&s.namespace, mdb.CREATE)
if err != nil {
txn.Abort()
return err
}
defer s.db.DBIClose(dbi)
err = txn.Put(dbi, generateMonotonicKey(), el, mdb.APPEND)
if err != nil {
txn.Abort()
return err
}
err = txn.Commit()
if err != nil {
txn.Abort()
return err
}
s.dirty.Signal()
return nil
}
func (s *Spooler) Dequeue() ([]byte, error) {
txn, err := s.db.BeginTxn(nil, 0)
if err != nil {
return nil, err
}
dbi, err := txn.DBIOpen(&s.namespace, 0)
if err != nil {
txn.Abort()
return nil, err
}
defer s.db.DBIClose(dbi)
curs, err := txn.CursorOpen(dbi)
if err != nil {
txn.Abort()
return nil, err
}
_, v, err := curs.Get([]byte{}, mdb.FIRST)
if err != nil {
txn.Abort()
return nil, err
}
err = curs.Del(0)
if err != nil {
txn.Abort()
return nil, err
}
err = txn.Commit()
if err != nil {
txn.Abort()
return nil, err
}
s.dirty.Signal()
return v, nil
}
var lastTime int64
// In the fairly-inoncievable scenario that this is called twice in the same
// nanosecond, the next ID will be higher. This safety is more useful when the
// clock runs backwards.
func generateMonotonicKey() []byte {
t := time.Now().UnixNano()
if t <= lastTime {
t = lastTime + 1
}
lastTime = t
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(t))
return buf
}
package spooler
import (
"github.com/bmizerany/assert"
mdb "github.com/szferi/gomdb"
"io/ioutil"
"os"
"testing"
)
func makeSpooler(ns string) (*Spooler, string) {
path, err := ioutil.TempDir("/tmp", "spooler_test")
if err != nil {
panic(err)
}
err = os.MkdirAll(path, 0770)
if err != nil {
panic(err)
}
return New(path, ns), path
}
func TestSpooler(t *testing.T) {
spooler, p := makeSpooler("spoolertest3")
defer os.RemoveAll(p)
defer spooler.Close()
spooler.Enqueue([]byte("YOLO 1"))
spooler.Enqueue([]byte("YOLO 2"))
spooler.Enqueue([]byte("YOLO 3"))
spooler.Enqueue([]byte("YOLO 4"))
spooler.Enqueue([]byte("YOLO 5"))
output, _ := spooler.Dequeue()
assert.Equal(t, []byte("YOLO 1"), output)
output, _ = spooler.Dequeue()
assert.Equal(t, []byte("YOLO 2"), output)
output, _ = spooler.Dequeue()
assert.Equal(t, []byte("YOLO 3"), output)
output, _ = spooler.Dequeue()
assert.Equal(t, []byte("YOLO 4"), output)
output, _ = spooler.Dequeue()
assert.Equal(t, []byte("YOLO 5"), output)
}
func TestEmpty(t *testing.T) {
spooler, p := makeSpooler("spoolertest2")
defer os.RemoveAll(p)
defer spooler.Close()
_, err := spooler.Dequeue()
assert.Equal(t, err, mdb.NotFound)
}
func BenchmarkSpooler(b *testing.B) {
spooler, p := makeSpooler("spoolertest")
defer os.RemoveAll(p)
defer spooler.Close()
for i := 0; i < b.N; i++ {
spooler.Enqueue([]byte("YOLO 1"))
spooler.Dequeue()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment