Skip to content

Instantly share code, notes, and snippets.

@joonas-fi
Created March 14, 2019 13:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joonas-fi/c56c0f6fa23edac436609af3b22a5c46 to your computer and use it in GitHub Desktop.
Save joonas-fi/c56c0f6fa23edac436609af3b22a5c46 to your computer and use it in GitHub Desktop.
Storm perf issue
package main
import (
"github.com/asdine/storm"
"github.com/asdine/storm/codec/msgpack"
"os"
"encoding/json"
"io"
"fmt"
"bufio"
)
// run this with: $ ./stormexample < export.log
// export log is a file with 587900 lines (much smaller might demonstrate this as well) of
// data like this:
//
// {"Ref":"AAAtrqmSAynWdvl/C+VDH/qvYBjalOai6fSYX708iFw=","Volumes":[1],"VolumesPendingReplication":[2],"IsPendingReplication":true,"Referenced:true}
// {"Ref":"AAAyIKB9cG/Lkp5Ja1FGRMfETTX7RKOrse0xlX4+lQc=","Volumes":[1,2],"VolumesPendingReplication":[],"IsPendingReplication":false,"Referenced:true}
func main() {
if err := importDb(os.Stdin); err != nil {
panic(err)
}
}
type Blob struct {
Ref []byte `storm:"id"`
Volumes []int
VolumesPendingReplication []int
IsPendingReplication bool
Referenced bool
}
func importDbInternal(content io.Reader, withTx func(fn func(tx storm.Node) error) error) error {
scanner := bufio.NewScanner(content)
// by default craps out on lines > 64k. set max line to many megabytes
buf := make([]byte, 0, 8*1024*1024)
scanner.Buffer(buf, cap(buf))
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
// init empty record
record := &Blob{}
if err := json.Unmarshal([]byte(line), record); err != nil {
return err
}
if err := withTx(func(tx storm.Node) error {
return tx.Save(record)
}); err != nil {
return err
}
}
if err := scanner.Err(); err != nil {
return err
}
return nil
}
func importDb(content io.Reader) error {
db, err := storm.Open("stormexample.db", storm.Codec(msgpack.Codec))
if err != nil {
return err
}
defer db.Close()
var openTx storm.Node
commitOpenTx := func() error {
if openTx == nil {
return nil
}
return openTx.Commit()
}
txUseCount := 0
// automatically commits every N calls
withTx := func(fn func(tx storm.Node) error) error {
txUseCount++
if (txUseCount % 1000) == 0 {
if err := commitOpenTx(); err != nil {
return err
}
openTx = nil
fmt.Printf(".")
}
if openTx == nil {
var errTxOpen error
openTx, errTxOpen = db.Begin(true)
if errTxOpen != nil {
return errTxOpen
}
}
return fn(openTx)
}
defer func() {
if openTx == nil {
return
}
if err := openTx.Rollback(); err != nil {
panic(fmt.Errorf("rollback failed: %v", err))
}
}()
if err := importDbInternal(content, withTx); err != nil {
return err
}
return commitOpenTx()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment