Skip to content

Instantly share code, notes, and snippets.

@thanhpp
Last active September 13, 2022 06:42
Show Gist options
  • Save thanhpp/9b6146682f287672754b00e50e67d33a to your computer and use it in GitHub Desktop.
Save thanhpp/9b6146682f287672754b00e50e67d33a to your computer and use it in GitHub Desktop.
time series price cache
package timeseriescache
import (
"sync"
"time"
"go.uber.org/zap"
)
const (
bucketTimeSize = 60 // minute in seconds
)
type CacheElem struct {
Timestamp time.Time
Val uint64
}
type TimeSeriesPriceCache struct {
cache map[int64][]CacheElem // bucket key = minute (unix)
oldestRecord time.Time
lastestRecord time.Time
expireDur time.Duration
slicePool sync.Pool
l sync.RWMutex
logger *zap.SugaredLogger
}
func New(expireDur time.Duration) *TimeSeriesPriceCache {
return &TimeSeriesPriceCache{
cache: make(map[int64][]CacheElem),
slicePool: sync.Pool{
New: func() any {
return make([]CacheElem, 0)
},
},
expireDur: expireDur,
logger: zap.NewNop().Sugar(),
}
}
func (t *TimeSeriesPriceCache) genBucketKey(ti time.Time) int64 {
unixTime := ti.UTC().Unix()
return unixTime - unixTime%bucketTimeSize
}
func (t *TimeSeriesPriceCache) unsafeInvalidateCache() {
validTime := t.lastestRecord.Add(-1 * t.expireDur)
if validTime.Before(t.oldestRecord) { // no invalid cache
return
}
validBucketKey := t.genBucketKey(validTime)
oldestBucketKey := t.genBucketKey(t.oldestRecord)
for k := oldestBucketKey; k <= validBucketKey; k += bucketTimeSize {
if _, ok := t.cache[k]; ok {
t.logger.Debugw("unsafeInvalidateCache - remove", "key", k)
t.slicePool.Put(t.cache[k])
delete(t.cache, k)
t.oldestRecord = time.Unix(k, 0)
}
}
}
func (t *TimeSeriesPriceCache) SetLogger(logger *zap.SugaredLogger) {
if logger == nil {
return
}
const loggerName = "TimeSeriesPriceCache"
t.logger = logger.Named(loggerName)
}
func (t *TimeSeriesPriceCache) Put(timestamp time.Time, val uint64) {
t.l.Lock()
defer t.l.Unlock()
if t.oldestRecord.IsZero() {
t.oldestRecord = timestamp
}
bucketKey := t.genBucketKey(timestamp)
if timestamp.After(t.lastestRecord) {
t.lastestRecord = timestamp
}
if timestamp.Before(t.oldestRecord) {
t.oldestRecord = timestamp
}
t.logger.Debugw("Put", "lastestRecord", t.lastestRecord, "oldestRecord", t.oldestRecord)
if _, ok := t.cache[bucketKey]; ok {
t.logger.Debugw("Put - bucket found", "key", bucketKey, "timestamp", timestamp, "val", val)
t.cache[bucketKey] = append(t.cache[bucketKey], CacheElem{
Timestamp: timestamp,
Val: val,
})
return
}
t.unsafeInvalidateCache()
poolElem := t.slicePool.Get()
elem, ok := poolElem.([]CacheElem)
if !ok {
panic("wrong elem pool type")
}
elem = append(elem, CacheElem{
Timestamp: timestamp,
Val: val,
})
t.logger.Debugw("Put - new bucket", "key", bucketKey)
t.cache[bucketKey] = elem
}
func (t *TimeSeriesPriceCache) Get(from, to time.Time) []CacheElem {
t.l.RLock()
defer t.l.RUnlock()
fromBucketKey := t.genBucketKey(from)
toBucketKey := t.genBucketKey(to)
var result []CacheElem
for k := fromBucketKey; k <= toBucketKey; k += bucketTimeSize {
if v, ok := t.cache[k]; ok {
t.logger.Debugw("Get - found bucket", "key", k, "from", from, "to", to)
for i := range v {
if v[i].Timestamp.After(from) && v[i].Timestamp.Before(to) {
result = append(result, v[i])
}
}
}
}
return result
}
package timeseriescache_test
import (
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/thanhpp/gopher/pkg/test/timeseriescache"
)
func TestTimeSeriesPriceCache(t *testing.T) {
// insert data
var (
expireDur = time.Hour
dataCount = 10_000
timestamp = time.Now()
)
cache := timeseriescache.New(expireDur)
// logger, err := zap.NewDevelopment()
// require.NoError(t, err)
// cache.SetLogger(logger.Sugar())
// fill data
for i := 0; i < dataCount; i++ {
cache.Put(timestamp, 1)
timestamp = timestamp.Add(time.Second * time.Duration(rand.Int63n(15)))
}
val := cache.Get(timestamp.Add(time.Minute*-50), timestamp)
t.Log("len(val)", len(val))
for i := 1; i < len(val); i++ {
assert.True(t, val[i].Timestamp.Unix() >= val[i-1].Timestamp.Unix())
}
}
func BenchmarkTimeSeriesPriceCache(b *testing.B) {
// insert data
var (
expireDur = time.Hour
timestamp = time.Now()
)
cache := timeseriescache.New(expireDur)
// fill data
for i := 0; i < b.N; i++ {
cache.Put(timestamp, 1)
timestamp = timestamp.Add(time.Second * time.Duration(rand.Int63n(180)))
}
/*
[thanhpp@x1carbon] [~/go/src/github.com/thanhpp/gopher/pkg/test/timeseriescache] [main  ]
[13:34:51] $ go test -bench=TimeSeriesPriceCache -benchtime=100000000x -benchmem
goos: linux
goarch: amd64
pkg: github.com/thanhpp/gopher/pkg/test/timeseriescache
cpu: 11th Gen Intel(R) Core(TM) i7-1185G7 @ 3.00GHz
BenchmarkTimeSeriesPriceCache-8 100000000 358.3 ns/op 270 B/op 4 allocs/op
PASS
ok github.com/thanhpp/gopher/pkg/test/timeseriescache 35.911s
*/
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment