Skip to content

Instantly share code, notes, and snippets.

@benbjohnson
Created March 21, 2016 15:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benbjohnson/9d2ebbc90b8b52f3fe25 to your computer and use it in GitHub Desktop.
Save benbjohnson/9d2ebbc90b8b52f3fe25 to your computer and use it in GitHub Desktop.
boltpanic
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"os"
"strconv"
"sync"
"time"
"github.com/boltdb/bolt"
)
const (
OplogExt = ".log"
BucketName = "kv"
CommitN = 50
)
func main() {
rand.Seed(time.Now().UnixNano())
// Parse command line arguments.
fs := flag.NewFlagSet("boltpanic", flag.ContinueOnError)
srcDBPath := fs.String("src-db", "", "source db path")
since := fs.Int("start-txid", 0, "replay from txid")
if err := fs.Parse(os.Args[1:]); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
// First argument specifies a subcommand to run.
switch fs.Arg(0) {
case "generate":
if err := generate(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
case "replay":
if err := replay(fs.Arg(1), *srcDBPath, *since); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
case "":
fmt.Fprintln(os.Stderr, "command name required")
os.Exit(1)
default:
fmt.Fprintf(os.Stderr, "unknown command: %q\n", fs.Arg(0))
os.Exit(1)
}
}
// generate randomly generates databases until one panics.
func generate() error {
for {
// Generate temporary path.
path := MustTempPath()
log.Printf("GENERATE: %s", path)
// If an error is returned then simply stop.
if err := generateFile(path, CommitN); err != nil {
log.Fatalf("error occurred: %s", err)
}
// If we successfully generated a database then try again.
log.Printf("GENERATION SUCCESSFUL, RETRYING")
os.Remove(path)
os.Remove(path + OplogExt)
println("")
}
return nil
}
// generateFile generates a bolt file with a maximum of n commits.
func generateFile(path string, n int) error {
// Create and open database.
db := NewDB(path)
if err := db.Open(); err != nil {
return err
}
defer db.Close()
// Generate as many operations as possible in a given period.
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for i := 0; i < n; i++ {
tx, err := db.db.Begin(true)
if err != nil {
return err
}
inner:
for {
switch rand.Intn(3) {
case 0: // PUT
k, v := strconv.Itoa(rand.Intn(65536)), RandBlob()
db.mustAppendLogEntry(&LogEntry{Op: "put", Key: k, Value: v})
if err := tx.Bucket([]byte(BucketName)).Put([]byte(k), v); err != nil {
log.Fatalf("put error:", err)
}
case 1:
k := strconv.Itoa(rand.Intn(65536))
db.mustAppendLogEntry(&LogEntry{Op: "get", Key: k})
if v := tx.Bucket([]byte(BucketName)).Get([]byte(k)); len(v) > 1000000 { // this is added to avoid optimization
log.Println("...")
}
case 2:
k := strconv.Itoa(rand.Intn(32768))
db.mustAppendLogEntry(&LogEntry{Op: "delete", Key: k})
if err := tx.Bucket([]byte(BucketName)).Delete([]byte(k)); err != nil {
log.Fatalf("delete error:", err)
}
}
select {
case <-ticker.C:
break inner
default:
}
}
db.mustAppendLogEntry(&LogEntry{Op: "commit"})
if err := tx.Commit(); err != nil {
return err
}
}
return nil
}
// replay executes a set of generated data against the database.
func replay(logPath, srcDBPath string, sinceTxid int) error {
if logPath == "" {
return errors.New("log path required")
}
// Generate temporary path.
dbPath := MustTempPath()
log.Printf("READ FROM: %s", logPath)
log.Printf("REPLAY INTO: %s", dbPath)
// Copy from source database, if specified.
if srcDBPath != "" {
log.Printf("COPYING DB FROM: %s", srcDBPath)
buf, err := ioutil.ReadFile(srcDBPath)
if err != nil {
return err
}
if err := ioutil.WriteFile(dbPath, buf, 0666); err != nil {
return err
}
}
// Create and open database.
db := NewDB(dbPath)
if err := db.Open(); err != nil {
return err
}
db.db.StrictMode = true
defer db.Close()
f, err := os.Open(logPath)
if err != nil {
return err
}
defer f.Close()
dec := json.NewDecoder(f)
// Skip over early transactions if requested.
if sinceTxid > 0 {
log.Printf("SKIPPING TO TX: %d", sinceTxid)
for txid := 3; txid < sinceTxid; {
// Increment txid on commit.
var entry LogEntry
if err := dec.Decode(&entry); err == io.EOF {
break
} else if err != nil {
return err
} else if entry.Op == "commit" {
log.Printf("[skip #%d]", txid)
txid++
}
}
}
var done bool
for {
if err := db.db.Update(func(tx *bolt.Tx) error {
log.Printf("executing tx: id=%d", tx.ID())
for {
// Decode entry.
var entry LogEntry
if err := dec.Decode(&entry); err == io.EOF {
done = true
return nil
} else if err != nil {
return err
}
// Execute entry.
println("op", entry.Op)
switch entry.Op {
case "put":
if err := tx.Bucket([]byte(BucketName)).Put([]byte(entry.Key), entry.Value); err != nil {
log.Fatalf("put error:", err)
}
case "get":
if v := tx.Bucket([]byte(BucketName)).Get([]byte(entry.Key)); len(v) > 1000000 { // this is added to avoid optimization
log.Println("...")
}
case "delete":
if err := tx.Bucket([]byte(BucketName)).Delete([]byte(entry.Key)); err != nil {
log.Fatalf("delete error:", err)
}
case "commit":
return nil // exit inner function and commit
default:
log.Fatalf("invalid op: %s", entry.Op)
}
}
}); err != nil {
return err
} else if done {
break
}
}
log.Printf("SUCCESSFUL REPLAY, DELETING DB: %s", dbPath)
if err := os.Remove(dbPath); err != nil {
return err
}
return nil
}
// DB wraps a bolt.DB to provide periodic flushes.
type DB struct {
mu sync.Mutex
path string
db *bolt.DB
oplog *os.File
}
// NewDB returns a new instance of DB.
func NewDB(path string) *DB {
return &DB{
path: path,
}
}
// Open opens and initializes the database.
func (db *DB) Open() error {
// Open database file.
d, err := bolt.Open(db.path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return err
}
db.db = d
// Create top level bucket.
if err := db.db.Update(func(tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists([]byte(BucketName)); err != nil {
return err
}
return nil
}); err != nil {
return err
}
// Open operation log.
f, err := os.Create(db.path + OplogExt)
if err != nil {
return err
}
db.oplog = f
return nil
}
// Close closes the database.
func (db *DB) Close() error {
db.db.Close()
db.oplog.Close()
return nil
}
// blobs is a fixed set of generated data used for Put() values.
var blobs = [][]byte{}
func init() {
blobs = make([][]byte, 1024)
for i := 0; i < 1024; i++ {
blob := make([]byte, 100+rand.Intn(4096))
for j := 0; j < len(blob); j++ {
blob[j] = byte(j % 254)
}
blobs[i] = blob
}
}
func RandBlob() []byte { return blobs[rand.Intn(len(blobs))] }
// writeLogEntry writes an entry to the operations log.
func (db *DB) mustAppendLogEntry(e *LogEntry) {
if err := json.NewEncoder(db.oplog).Encode(e); err != nil {
panic(err)
}
if err := db.oplog.Sync(); err != nil {
panic(err)
}
}
// LogEntry represents an entry to the operation log.
type LogEntry struct {
Op string `json:"op,omitempty"`
Key string `json:"key,omitempty"`
Value []byte `json:"value,omitempty"`
}
func MustTempPath() string {
f, err := ioutil.TempFile("", "boltpanic-")
if err != nil {
panic(err)
}
f.Close()
os.Remove(f.Name())
return f.Name()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment