Skip to content

Instantly share code, notes, and snippets.

@larry0x
Last active February 12, 2024 18:39
Show Gist options
  • Save larry0x/4832f11c6547686ce9e648116f11293f to your computer and use it in GitHub Desktop.
Save larry0x/4832f11c6547686ce9e648116f11293f to your computer and use it in GitHub Desktop.
How to use RocksDB's user-defined timestamp feature in Go
module playground
go 1.21
require github.com/linxGnu/grocksdb v1.8.4
// How to run this on macOS (taught by ChatGPT):
//
// 1. Install dependencies using Homebrew:
//
// ```bash
// brew install rocksdb snappy lz4 zstd
// ```
//
// 2. Add dependencies to CGO flags:
//
// ```bash
// export ROCKSDB_DIR=$(brew --prefix rocksdb)
// export SNAPPY_DIR=$(brew --prefix snappy)
// export LZ4_DIR=$(brew --prefix lz4)
// export ZSTD_DIR=$(brew --prefix zstd)
// export CGO_LDFLAGS="-L$ROCKSDB_DIR/lib -L$SNAPPY_DIR/lib -L$LZ4_DIR/lib -L$ZSTD_DIR/lib -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd"
// export CGO_CFLAGS="-I$ROCKSDB_DIR/include -I$SNAPPY_DIR/include -I$LZ4_DIR/include -I$ZSTD_DIR/include"
// ```
//
// 3. Finally, run the file:
//
// ```bash
// go run main.go
// ```
//
// I still get the following warning, not sure how to resolve, but at least the
// program runs:
//
// > ld: warning: ignoring duplicate libraries: '-lbz2', '-lc++', '-llz4', '-lm', '-lrocksdb', '-lsnappy', '-lz', '-lzstd'
package main
import (
"bytes"
"encoding/binary"
"encoding/hex"
"fmt"
"os"
"github.com/linxGnu/grocksdb"
)
const (
cFNameDefault = "default"
cFNameTest = "test"
dataDir = "./testdata"
timestampSize = 8
)
type KVPair struct {
Key []byte
Value []byte // may be nil, meaning the key doesn't exist
}
func init() {
// delete the data directory if it already exists from a previous run
if _, err := os.Stat(dataDir); !os.IsNotExist(err) {
if err = os.RemoveAll(dataDir); err != nil {
panic(err)
}
}
}
func main() {
// open the database with a "test" column family
db, cfHandles, err := grocksdb.OpenDbColumnFamilies(
newDBOptions(),
dataDir,
[]string{
cFNameDefault,
cFNameTest,
},
[]*grocksdb.Options{
// for this demo, we don't use the default colume family, so no need to
// enable timestamping on it
grocksdb.NewDefaultOptions(),
// enable timestamping on our "test" column family
newCFOptionsWithTS(),
},
)
if err != nil {
panic(err)
}
cfHandle := cfHandles[1]
// create two timestamps
ts1 := [timestampSize]byte{}
binary.LittleEndian.PutUint64(ts1[:], uint64(10000))
ts2 := [timestampSize]byte{}
binary.LittleEndian.PutUint64(ts2[:], uint64(20000))
// write a batch under the first timestamp
{
batch1 := grocksdb.NewWriteBatch()
batch1.PutCFWithTS(cfHandle, []byte("donald"), ts1[:], []byte("trump"))
batch1.PutCFWithTS(cfHandle, []byte("jake"), ts1[:], []byte("shepherd"))
batch1.PutCFWithTS(cfHandle, []byte("joe"), ts1[:], []byte("biden"))
batch1.PutCFWithTS(cfHandle, []byte("larry"), ts1[:], []byte("engineer"))
db.Write(grocksdb.NewDefaultWriteOptions(), batch1)
}
// write another batch under the second timestamp
// for testing purpose, this batch 1) deletes a key from the previous batch,
// 2) overwrites a value from the previous batch, 3) inserts a new key not
// present in the previous batch.
{
batch2 := grocksdb.NewWriteBatch()
batch2.PutCFWithTS(cfHandle, []byte("donald"), ts2[:], []byte("duck"))
batch2.DeleteCFWithTS(cfHandle, []byte("joe"), ts2[:])
batch2.PutCFWithTS(cfHandle, []byte("pumpkin"), ts2[:], []byte("cat"))
db.Write(grocksdb.NewDefaultWriteOptions(), batch2)
}
// check data at the two timestamps, respectively
checkDataAtTimestamp(db, cfHandle, ts1[:], []KVPair{
{
Key: []byte("donald"),
Value: []byte("trump"),
},
{
Key: []byte("jake"),
Value: []byte("shepherd"),
},
{
Key: []byte("joe"),
Value: []byte("biden"),
},
{
Key: []byte("larry"),
Value: []byte("engineer"),
},
{
Key: []byte("pumpkin"),
Value: nil,
},
})
checkDataAtTimestamp(db, cfHandle, ts2[:], []KVPair{
{
Key: []byte("donald"),
Value: []byte("duck"),
},
{
Key: []byte("jake"),
Value: []byte("shepherd"),
},
{
Key: []byte("joe"),
Value: nil,
},
{
Key: []byte("larry"),
Value: []byte("engineer"),
},
{
Key: []byte("pumpkin"),
Value: []byte("cat"),
},
})
}
func checkDataAtTimestamp(db *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle, ts []byte, expected []KVPair) {
fmt.Printf("checking data at timestamp %s\n", hex.EncodeToString(ts))
opts := newReadOptionsWithTS(ts)
for _, kv := range expected {
value, _, err := db.GetCFWithTS(opts, cfHandle, kv.Key)
if err != nil {
panic(err)
}
fmt.Printf(
"expect key = %s, value = %s, found value = %s\n",
string(kv.Key),
string(kv.Value),
string(value.Data()),
)
}
}
func newDBOptions() *grocksdb.Options {
opts := grocksdb.NewDefaultOptions()
opts.SetCreateIfMissing(true)
opts.SetCreateIfMissingColumnFamilies(true)
return opts
}
func newCFOptionsWithTS() *grocksdb.Options {
opts := grocksdb.NewDefaultOptions()
opts.SetComparator(grocksdb.NewComparatorWithTimestamp(
"leveldb.BytewiseComparator.u64ts",
timestampSize,
compare,
compareTS,
compareWithoutTS,
))
return opts
}
func newReadOptionsWithTS(ts []byte) *grocksdb.ReadOptions {
opts := grocksdb.NewDefaultReadOptions()
opts.SetTimestamp(ts)
return opts
}
func compare(a []byte, b []byte) int {
ord := compareWithoutTS(a, true, b, true)
if ord != 0 {
return ord
}
return -compareTS(extractTimestampFromUserKey(a), extractTimestampFromUserKey(b))
}
func compareTS(bz1, bz2 []byte) int {
ts1 := binary.LittleEndian.Uint64(bz1)
ts2 := binary.LittleEndian.Uint64(bz2)
switch {
case ts1 < ts2:
return -1
case ts1 > ts2:
return 1
default:
return 0
}
}
func compareWithoutTS(a []byte, aHasTS bool, b []byte, bHasTS bool) int {
if aHasTS {
a = stripTimestampfromUserKey(a)
}
if bHasTS {
b = stripTimestampfromUserKey(b)
}
return bytes.Compare(a, b)
}
func extractTimestampFromUserKey(userKey []byte) []byte {
return userKey[len(userKey)-timestampSize:]
}
func stripTimestampfromUserKey(userKey []byte) []byte {
return userKey[:len(userKey)-timestampSize]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment