Skip to content

Instantly share code, notes, and snippets.

@aLekSer
Last active December 8, 2020 11:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aLekSer/392cf3b4cf8cb2c754b632a635c3e7f4 to your computer and use it in GitHub Desktop.
Save aLekSer/392cf3b4cf8cb2c754b632a635c3e7f4 to your computer and use it in GitHub Desktop.
--- TicketCache.go
+++ BackfillCache.go
@@ -1,39 +1,19 @@
-/////////////////////////////////////////////////////////////////////
-/////////////////////////////////////////////////////////////////////
-
-// ticketCache unifies concurrent requests into a single cache update, and
+// cache unifies concurrent requests into a single cache update, and
// gives a safe view into that map cache.
-type ticketCache struct {
- store statestore.Service
-
+type cache struct {
+ store statestore.Service
requests chan *cacheRequest
-
// Single item buffered channel. Holds a value when runQuery can be safely
// started. Basically a channel/select friendly mutex around runQuery
// running.
startRunRequest chan struct{}
-
- wg sync.WaitGroup
-
+ wg sync.WaitGroup
// Mutlithreaded unsafe fields, only to be written by update, and read when
// request given the ok.
- tickets map[string]*pb.Ticket
- err error
-}
-
-func newTicketCache(b *appmain.Bindings, cfg config.View) *ticketCache {
- tc := &ticketCache{
- store: statestore.New(cfg),
- requests: make(chan *cacheRequest),
- startRunRequest: make(chan struct{}, 1),
- tickets: make(map[string]*pb.Ticket),
- }
-
- tc.startRunRequest <- struct{}{}
- b.AddHealthCheckFunc(tc.store.HealthCheck)
-
- return tc
+ value interface{}
+ update func(statestore.Service, interface{}) error
+ err error
}
type cacheRequest struct {
@@ -41,7 +21,7 @@
runNow chan struct{}
}
-func (tc *ticketCache) request(ctx context.Context, f func(map[string]*pb.Ticket)) error {
+func (c *cache) request(ctx context.Context, f func(interface{})) error {
cr := &cacheRequest{
ctx: ctx,
runNow: make(chan struct{}),
@@ -51,69 +31,73 @@
for {
select {
case <-ctx.Done():
- return errors.Wrap(ctx.Err(), "ticket cache request canceled before reuest sent.")
- case <-tc.startRunRequest:
- go tc.runRequest()
- case tc.requests <- cr:
+ return errors.Wrap(ctx.Err(), "cache request canceled before request sent.")
+ case <-c.startRunRequest:
+ go c.runRequest()
+ case c.requests <- cr:
break sendRequest
}
}
select {
case <-ctx.Done():
- return errors.Wrap(ctx.Err(), "ticket cache request canceled waiting for access.")
+ return errors.Wrap(ctx.Err(), "cache request canceled waiting for access.")
case <-cr.runNow:
- defer tc.wg.Done()
- }
-
- if tc.err != nil {
- return tc.err
- }
-
- f(tc.tickets)
+ defer c.wg.Done()
+ }
+
+ if c.err != nil {
+ return c.err
+ }
+
+ f(c.value)
return nil
}
-func (tc *ticketCache) runRequest() {
+func (c *cache) runRequest() {
defer func() {
- tc.startRunRequest <- struct{}{}
+ c.startRunRequest <- struct{}{}
}()
// Wait for first query request.
- reqs := []*cacheRequest{<-tc.requests}
+ reqs := []*cacheRequest{<-c.requests}
// Collect all waiting queries.
collectAllWaiting:
for {
select {
- case req := <-tc.requests:
+ case req := <-c.requests:
reqs = append(reqs, req)
default:
break collectAllWaiting
}
}
- tc.update()
+ c.err = c.update(c.store, c.value)
stats.Record(context.Background(), cacheWaitingQueries.M(int64(len(reqs))))
- // Send WaitGroup to query calls, letting them run their query on the ticket
- // cache.
+ // Send WaitGroup to query calls, letting them run their query on the cache.
for _, req := range reqs {
- tc.wg.Add(1)
+ c.wg.Add(1)
select {
case req.runNow <- struct{}{}:
case <-req.ctx.Done():
- tc.wg.Done()
- }
- }
-
- // wait for requests to finish using ticket cache.
- tc.wg.Wait()
-}
-
-func (tc *ticketCache) update() {
- st := time.Now()
- previousCount := len(tc.tickets)
+ c.wg.Done()
+ }
+ }
+
+ // wait for requests to finish using cache.
+ c.wg.Wait()
+}
+
+func newTicketCache(b *appmain.Bindings, store statestore.Service) *cache {
+ c := &cache{
+ store: store,
+ requests: make(chan *cacheRequest),
+ startRunRequest: make(chan struct{}, 1),
+ value: make(map[string]*pb.Ticket),
+ update: updateTicketCache,
+ }
c.startRunRequest <- struct{}{}
b.AddHealthCheckFunc(c.store.HealthCheck)
@@ -135,40 +119,102 @@
previousCount := len(tickets)
currentAll, err := store.GetIndexedIDSet(context.Background())
if err != nil {
- tc.err = err
- return
+ return err
}
deletedCount := 0
- for id := range tc.tickets {
+ for id := range tickets {
if _, ok := currentAll[id]; !ok {
- delete(tc.tickets, id)
+ delete(tickets, id)
deletedCount++
}
}
toFetch := []string{}
-
for id := range currentAll {
- if _, ok := tc.tickets[id]; !ok {
+ if _, ok := tickets[id]; !ok {
toFetch = append(toFetch, id)
}
}
- newTickets, err := tc.store.GetTickets(context.Background(), toFetch)
- if err != nil {
- tc.err = err
- return
+ newTickets, err := store.GetTickets(context.Background(), toFetch)
+ if err != nil {
+ return err
}
for _, t := range newTickets {
- tc.tickets[t.Id] = t
+ tickets[t.Id] = t
}
stats.Record(context.Background(), cacheTotalItems.M(int64(previousCount)))
stats.Record(context.Background(), cacheFetchedItems.M(int64(len(toFetch))))
- stats.Record(context.Background(), cacheUpdateLatency.M(float64(time.Since(st))/float64(time.Millisecond)))
-
- logger.Debugf("Ticket Cache update: Previous %d, Deleted %d, Fetched %d, Current %d", previousCount, deletedCount, len(toFetch), len(tc.tickets))
- tc.err = nil
-}
+ stats.Record(context.Background(), cacheUpdateLatency.M(float64(time.Since(t))/float64(time.Millisecond)))
+
+ logger.Debugf("Ticket Cache update: Previous %d, Deleted %d, Fetched %d, Current %d", previousCount, deletedCount, len(toFetch), len(tickets))
+ return nil
+}
+
+func newBackfillCache(b *appmain.Bindings, store statestore.Service) *cache {
+ c := &cache{
+ store: store,
+ requests: make(chan *cacheRequest),
+ startRunRequest: make(chan struct{}, 1),
+ value: make(map[string]*pb.Backfill),
+ update: updateBackfillCache,
+ }
+
+ c.startRunRequest <- struct{}{}
+ b.AddHealthCheckFunc(c.store.HealthCheck)
+
+ return c
+}
+
+func updateBackfillCache(store statestore.Service, value interface{}) error {
+ if value == nil {
+ return fmt.Errorf("expecting not nil value")
+ }
+
+ backfills, ok := value.(map[string]*pb.Backfill)
+ if !ok {
+ return fmt.Errorf("expecting value type map[string]*pb.Backfill, but got: %T", value)
+ }
+
+ t := time.Now()
+ previousCount := len(backfills)
+ index, err := store.GetIndexedBackfills(context.Background())
+ if err != nil {
+ return err
+ }
+
+ deletedCount := 0
+ for id, backfill := range backfills {
+ generation, ok := index[id]
+ if !ok || backfill.Generation < int64(generation) {
+ delete(backfills, id)
+ deletedCount++
+ }
+ }
+
+ toFetch := []string{}
+ for id := range index {
+ if _, ok := backfills[id]; !ok {
+ toFetch = append(toFetch, id)
+ }
+ }
+
+ fetchedBackfills, err := store.GetBackfills(context.Background(), toFetch)
+ if err != nil {
+ return err
+ }
+
+ for _, b := range fetchedBackfills {
+ backfills[b.Id] = b
+ }
+
+ stats.Record(context.Background(), cacheTotalItems.M(int64(previousCount)))
+ stats.Record(context.Background(), cacheFetchedItems.M(int64(len(toFetch))))
+ stats.Record(context.Background(), cacheUpdateLatency.M(float64(time.Since(t))/float64(time.Millisecond)))
+
+ logger.Debugf("Backfill Cache update: Previous %d, Deleted %d, Fetched %d, Current %d", previousCount, deletedCount, len(toFetch), len(backfills))
+ return nil
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment