Skip to content

Instantly share code, notes, and snippets.

@mattlord
Last active June 12, 2024 02:51
Show Gist options
  • Save mattlord/54fcc8de7b98e23292d69258e29d26f7 to your computer and use it in GitHub Desktop.
Save mattlord/54fcc8de7b98e23292d69258e29d26f7 to your computer and use it in GitHub Desktop.
diff --git a/go/vt/throttler/max_replication_lag_module.go b/go/vt/throttler/max_replication_lag_module.go
index ac184fe7be..ad698520b2 100644
--- a/go/vt/throttler/max_replication_lag_module.go
+++ b/go/vt/throttler/max_replication_lag_module.go
@@ -399,7 +399,7 @@ func (m *MaxReplicationLagModule) clearReplicaUnderTest(now time.Time, testedSta
}
// Verify that the current replica under test is not in an error state.
- lr := lagRecordNow
+ lr := &lagRecordNow
if m.replicaUnderTest.key != discovery.TabletToMapKey(lr.Tablet) {
lr = m.lagCacheByType(m.replicaUnderTest.tabletType).latest(m.replicaUnderTest.key)
}
@@ -567,7 +567,8 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time,
if lagRecordBefore.isZero() {
// We should see at least "lagRecordNow" here because we did just insert it
// in processRecord().
- panic(fmt.Sprintf("BUG: replicationLagCache did not return the lagRecord for current replica: %v or a previous record of it. lastRateChange: %v replicationLagCache size: %v entries: %v", lagRecordNow, m.lastRateChange, len(m.lagCache(lagRecordNow).entries), m.lagCache(lagRecordNow).entries))
+ //panic(fmt.Sprintf("BUG: replicationLagCache did not return the lagRecord for current replica: %v or a previous record of it. lastRateChange: %v replicationLagCache size: %v entries: %v", lagRecordNow, m.lastRateChange, len(m.lagCache(lagRecordNow).entries), m.lagCache(lagRecordNow).entries))
+ panic(fmt.Sprintf("BUG: replicationLagCache did not return the lagRecord for current replica: %v or a previous record of it.", lagRecordNow))
}
// Store the record in the result.
r.LagRecordBefore = lagRecordBefore
diff --git a/go/vt/throttler/replication_lag_cache.go b/go/vt/throttler/replication_lag_cache.go
index c9c2e94f11..2393498d64 100644
--- a/go/vt/throttler/replication_lag_cache.go
+++ b/go/vt/throttler/replication_lag_cache.go
@@ -18,6 +18,8 @@ package throttler
import (
"sort"
+ "sync"
+ "sync/atomic"
"time"
"vitess.io/vitess/go/vt/discovery"
@@ -28,7 +30,7 @@ import (
type replicationLagCache struct {
// entries maps from the replica to its history.
// The map key is replicationLagRecord.LegacyTabletStats.Key.
- entries map[string]*replicationLagHistory
+ entries sync.Map
// slowReplicas is a set of slow replicas.
// The map key is replicationLagRecord.LegacyTabletStats.Key.
@@ -52,7 +54,7 @@ type replicationLagCache struct {
func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache {
return &replicationLagCache{
- entries: make(map[string]*replicationLagHistory),
+ entries: sync.Map{},
ignoredSlowReplicasInARow: make(map[string]bool),
historyCapacityPerReplica: historyCapacityPerReplica,
}
@@ -60,29 +62,27 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache
// add inserts or updates "r" in the cache for the replica with the key "r.Key".
func (c *replicationLagCache) add(r replicationLagRecord) {
+ key := discovery.TabletToMapKey(r.Tablet)
if !r.Serving {
// Tablet is down. Do no longer track it.
- delete(c.entries, discovery.TabletToMapKey(r.Tablet))
- delete(c.ignoredSlowReplicasInARow, discovery.TabletToMapKey(r.Tablet))
+ c.entries.LoadAndDelete(key)
+ delete(c.ignoredSlowReplicasInARow, key)
return
}
- entry, ok := c.entries[discovery.TabletToMapKey(r.Tablet)]
- if !ok {
- entry = newReplicationLagHistory(c.historyCapacityPerReplica)
- c.entries[discovery.TabletToMapKey(r.Tablet)] = entry
- }
-
+ e, _ := c.entries.LoadOrStore(discovery.TabletToMapKey(r.Tablet), newReplicationLagHistory(c.historyCapacityPerReplica))
+ entry := e.(*replicationLagHistory)
entry.add(r)
}
// latest returns the current lag record for the given LegacyTabletStats.Key string.
// A zero record is returned if there is no latest entry.
-func (c *replicationLagCache) latest(key string) replicationLagRecord {
- entry, ok := c.entries[key]
+func (c *replicationLagCache) latest(key string) *replicationLagRecord {
+ e, ok := c.entries.Load(key)
if !ok {
- return replicationLagRecord{}
+ return &replicationLagRecord{}
}
+ entry := e.(*replicationLagHistory)
return entry.latest()
}
@@ -90,10 +90,11 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord {
// or just after it.
// If there is no such record, a zero record is returned.
func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord {
- entry, ok := c.entries[key]
+ e, ok := c.entries.Load(key)
if !ok {
return replicationLagRecord{}
}
+ entry := e.(*replicationLagHistory)
return entry.atOrAfter(at)
}
@@ -103,21 +104,30 @@ func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumRepli
// Reset the current list of ignored replicas.
c.slowReplicas = make(map[string]bool)
- if ignoreNSlowestReplicas >= len(c.entries) {
- // Do not ignore slow replicas if all would get ignored.
- return
+ if ignoreNSlowestReplicas > 0 {
+ count := 0
+ c.entries.Range(func(k, v any) bool {
+ count++
+ return true
+ })
+ if ignoreNSlowestReplicas >= count {
+ // Do not ignore slow replicas if all would get ignored.
+ return
+ }
}
// Turn the map of replicas into a list and then sort it.
var list byLagAndTabletUID
i := 0
- for _, v := range c.entries {
- record := v.latest()
+ c.entries.Range(func(k, v any) bool {
+ val := v.(*replicationLagHistory)
+ record := val.latest()
if int64(record.Stats.ReplicationLagSeconds) >= minimumReplicationLag {
list = append(list, record.TabletHealth)
i++
}
- }
+ return true
+ })
sort.Sort(list)
// Now remember the N slowest replicas.
@@ -151,8 +161,12 @@ func (c *replicationLagCache) ignoreSlowReplica(key string) bool {
if slow {
// Record that we're ignoring this replica.
c.ignoredSlowReplicasInARow[key] = true
-
- if len(c.ignoredSlowReplicasInARow) == len(c.entries) {
+ count := 0
+ c.entries.Range(func(k, v any) bool {
+ count++
+ return true
+ })
+ if len(c.ignoredSlowReplicasInARow) == count {
// All but this replica have been ignored in a row. Break this cycle now.
slow = false
}
@@ -180,25 +194,31 @@ func (c *replicationLagCache) isIgnored(key string) bool {
// replicationLagHistory stores the most recent replicationLagRecord entries
// in a ring buffer for a single replica.
type replicationLagHistory struct {
- records []replicationLagRecord
+ records []atomic.Pointer[replicationLagRecord]
// current has the index in "records" of the last element added by add().
- current int
+ current atomic.Int32
}
func newReplicationLagHistory(capacity int) *replicationLagHistory {
- return &replicationLagHistory{
- records: make([]replicationLagRecord, capacity),
- current: -1,
+ rlh := &replicationLagHistory{
+ records: make([]atomic.Pointer[replicationLagRecord], capacity),
}
+ rlh.current.Store(-1)
+ return rlh
}
func (h *replicationLagHistory) add(r replicationLagRecord) {
h.advanceCurrent()
- h.records[h.current] = r
+ h.records[h.current.Load()].Store(&r)
}
-func (h *replicationLagHistory) latest() replicationLagRecord {
- return h.records[h.current]
+func (h *replicationLagHistory) latest() *replicationLagRecord {
+ c := h.current.Load() - 1
+ if h.records == nil || c < 0 {
+ return &replicationLagRecord{}
+ }
+ h.records[c].CompareAndSwap(nil, &replicationLagRecord{})
+ return h.records[c].Load()
}
// atOrAfter returns the oldest replicationLagRecord which happened at "at"
@@ -206,24 +226,24 @@ func (h *replicationLagHistory) latest() replicationLagRecord {
// If there is no such record, a zero record is returned.
func (h *replicationLagHistory) atOrAfter(at time.Time) replicationLagRecord {
wrapped := false
- i := h.current
+ i := h.current.Load()
for {
// Look at the previous (older) entry to decide if we should return the
// current entry.
prev := i - 1
if prev < 0 {
wrapped = true
- prev = len(h.records) - 1
+ prev = int32(len(h.records) - 1)
}
- if h.records[prev].isZero() || h.records[prev].time.Before(at) {
+ if h.records[prev].Load().isZero() || h.records[prev].Load().time.Before(at) {
// Return this entry because the previous one does not exist or
// it happened before the time we're interested in.
- return h.records[i]
+ return *h.records[i].Load()
}
- if wrapped && prev == h.current {
+ if wrapped && prev == h.current.Load() {
// We scanned the whole list and all entries match. Return the oldest.
- return h.records[i]
+ return *h.records[i].Load()
}
i = prev
@@ -231,8 +251,8 @@ func (h *replicationLagHistory) atOrAfter(at time.Time) replicationLagRecord {
}
func (h *replicationLagHistory) advanceCurrent() {
- h.current++
- if h.current == len(h.records) {
- h.current = 0
+ cur := h.current.Add(1)
+ if cur == int32(len(h.records)) {
+ h.current.Store(0)
}
}
diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go
index 909888bd0d..4f6a7f4d8f 100644
--- a/go/vt/throttler/throttler.go
+++ b/go/vt/throttler/throttler.go
@@ -228,21 +228,27 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
// MaxLag returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
- cache := t.maxReplicationLagModule.lagCacheByType(tabletType)
-
var maxLag uint32
- cacheEntries := cache.entries
-
- for key := range cacheEntries {
+ cache := t.maxReplicationLagModule.lagCacheByType(tabletType)
+ if cache == nil {
+ return 0
+ }
+ cache.entries.Range(func(k, v any) bool {
+ key := k.(string)
+ val := v.(*replicationLagHistory)
if cache.isIgnored(key) {
- continue
+ return true
}
-
- lag := cache.latest(key).Stats.ReplicationLagSeconds
+ l := val.latest()
+ if l == nil || l.Stats == nil {
+ return true
+ }
+ lag := l.Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
- }
+ return true
+ })
return maxLag
}
diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go
index b33bb2ca25..80ea65fe25 100644
--- a/go/vt/throttler/throttler_test.go
+++ b/go/vt/throttler/throttler_test.go
@@ -17,11 +17,16 @@ limitations under the License.
package throttler
import (
+ "context"
"runtime"
+ "sync"
"testing"
"time"
"github.com/stretchr/testify/require"
+ "vitess.io/vitess/go/vt/discovery"
+ "vitess.io/vitess/go/vt/proto/query"
+ "vitess.io/vitess/go/vt/proto/topodata"
)
// The main purpose of the benchmarks below is to demonstrate the functionality
@@ -398,3 +403,76 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) {
}()
throttler.ThreadFinished(0)
}
+
+func TestThrottlerMaxLag(t *testing.T) {
+ fc := &fakeClock{}
+ throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now)
+ require.NoError(t, err)
+ defer throttler.Close()
+
+ require.NotNil(t, throttler)
+ require.NotNil(t, throttler.maxReplicationLagModule)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ var wg sync.WaitGroup
+
+ // run .add() and .MaxLag() concurrently to detect races
+ for _, tabletType := range []topodata.TabletType{
+ topodata.TabletType_REPLICA,
+ topodata.TabletType_RDONLY,
+ } {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ throttler.MaxLag(tabletType)
+ }
+ }
+ }()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType)
+ require.NotNil(t, cache)
+ cache.add(replicationLagRecord{
+ time: time.Now(),
+ TabletHealth: discovery.TabletHealth{
+ Serving: true,
+ Stats: &query.RealtimeStats{
+ ReplicationLagSeconds: 5,
+ },
+ Tablet: &topodata.Tablet{
+ Hostname: t.Name(),
+ Type: tabletType,
+ PortMap: map[string]int32{
+ "test": 15999,
+ },
+ },
+ },
+ })
+ }
+ }
+ }()
+ }
+ time.Sleep(time.Second)
+ cancel()
+ wg.Wait()
+
+ // check .MaxLag()
+ for _, tabletType := range []topodata.TabletType{
+ topodata.TabletType_REPLICA,
+ topodata.TabletType_RDONLY,
+ } {
+ require.Equal(t, uint32(5), throttler.MaxLag(tabletType))
+ }
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment