Last active
June 12, 2024 02:51
-
-
Save mattlord/54fcc8de7b98e23292d69258e29d26f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/go/vt/throttler/max_replication_lag_module.go b/go/vt/throttler/max_replication_lag_module.go | |
index ac184fe7be..ad698520b2 100644 | |
--- a/go/vt/throttler/max_replication_lag_module.go | |
+++ b/go/vt/throttler/max_replication_lag_module.go | |
@@ -399,7 +399,7 @@ func (m *MaxReplicationLagModule) clearReplicaUnderTest(now time.Time, testedSta | |
} | |
// Verify that the current replica under test is not in an error state. | |
- lr := lagRecordNow | |
+ lr := &lagRecordNow | |
if m.replicaUnderTest.key != discovery.TabletToMapKey(lr.Tablet) { | |
lr = m.lagCacheByType(m.replicaUnderTest.tabletType).latest(m.replicaUnderTest.key) | |
} | |
@@ -567,7 +567,8 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, | |
if lagRecordBefore.isZero() { | |
// We should see at least "lagRecordNow" here because we did just insert it | |
// in processRecord(). | |
- panic(fmt.Sprintf("BUG: replicationLagCache did not return the lagRecord for current replica: %v or a previous record of it. lastRateChange: %v replicationLagCache size: %v entries: %v", lagRecordNow, m.lastRateChange, len(m.lagCache(lagRecordNow).entries), m.lagCache(lagRecordNow).entries)) | |
+ //panic(fmt.Sprintf("BUG: replicationLagCache did not return the lagRecord for current replica: %v or a previous record of it. lastRateChange: %v replicationLagCache size: %v entries: %v", lagRecordNow, m.lastRateChange, len(m.lagCache(lagRecordNow).entries), m.lagCache(lagRecordNow).entries)) | |
+ panic(fmt.Sprintf("BUG: replicationLagCache did not return the lagRecord for current replica: %v or a previous record of it.", lagRecordNow)) | |
} | |
// Store the record in the result. | |
r.LagRecordBefore = lagRecordBefore | |
diff --git a/go/vt/throttler/replication_lag_cache.go b/go/vt/throttler/replication_lag_cache.go | |
index c9c2e94f11..2393498d64 100644 | |
--- a/go/vt/throttler/replication_lag_cache.go | |
+++ b/go/vt/throttler/replication_lag_cache.go | |
@@ -18,6 +18,8 @@ package throttler | |
import ( | |
"sort" | |
+ "sync" | |
+ "sync/atomic" | |
"time" | |
"vitess.io/vitess/go/vt/discovery" | |
@@ -28,7 +30,7 @@ import ( | |
type replicationLagCache struct { | |
// entries maps from the replica to its history. | |
// The map key is replicationLagRecord.LegacyTabletStats.Key. | |
- entries map[string]*replicationLagHistory | |
+ entries sync.Map | |
// slowReplicas is a set of slow replicas. | |
// The map key is replicationLagRecord.LegacyTabletStats.Key. | |
@@ -52,7 +54,7 @@ type replicationLagCache struct { | |
func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache { | |
return &replicationLagCache{ | |
- entries: make(map[string]*replicationLagHistory), | |
+ entries: sync.Map{}, | |
ignoredSlowReplicasInARow: make(map[string]bool), | |
historyCapacityPerReplica: historyCapacityPerReplica, | |
} | |
@@ -60,29 +62,27 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache | |
// add inserts or updates "r" in the cache for the replica with the key "r.Key". | |
func (c *replicationLagCache) add(r replicationLagRecord) { | |
+ key := discovery.TabletToMapKey(r.Tablet) | |
if !r.Serving { | |
// Tablet is down. Do no longer track it. | |
- delete(c.entries, discovery.TabletToMapKey(r.Tablet)) | |
- delete(c.ignoredSlowReplicasInARow, discovery.TabletToMapKey(r.Tablet)) | |
+ c.entries.LoadAndDelete(key) | |
+ delete(c.ignoredSlowReplicasInARow, key) | |
return | |
} | |
- entry, ok := c.entries[discovery.TabletToMapKey(r.Tablet)] | |
- if !ok { | |
- entry = newReplicationLagHistory(c.historyCapacityPerReplica) | |
- c.entries[discovery.TabletToMapKey(r.Tablet)] = entry | |
- } | |
- | |
+ e, _ := c.entries.LoadOrStore(discovery.TabletToMapKey(r.Tablet), newReplicationLagHistory(c.historyCapacityPerReplica)) | |
+ entry := e.(*replicationLagHistory) | |
entry.add(r) | |
} | |
// latest returns the current lag record for the given LegacyTabletStats.Key string. | |
// A zero record is returned if there is no latest entry. | |
-func (c *replicationLagCache) latest(key string) replicationLagRecord { | |
- entry, ok := c.entries[key] | |
+func (c *replicationLagCache) latest(key string) *replicationLagRecord { | |
+ e, ok := c.entries.Load(key) | |
if !ok { | |
- return replicationLagRecord{} | |
+ return &replicationLagRecord{} | |
} | |
+ entry := e.(*replicationLagHistory) | |
return entry.latest() | |
} | |
@@ -90,10 +90,11 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord { | |
// or just after it. | |
// If there is no such record, a zero record is returned. | |
func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord { | |
- entry, ok := c.entries[key] | |
+ e, ok := c.entries.Load(key) | |
if !ok { | |
return replicationLagRecord{} | |
} | |
+ entry := e.(*replicationLagHistory) | |
return entry.atOrAfter(at) | |
} | |
@@ -103,21 +104,30 @@ func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumRepli | |
// Reset the current list of ignored replicas. | |
c.slowReplicas = make(map[string]bool) | |
- if ignoreNSlowestReplicas >= len(c.entries) { | |
- // Do not ignore slow replicas if all would get ignored. | |
- return | |
+ if ignoreNSlowestReplicas > 0 { | |
+ count := 0 | |
+ c.entries.Range(func(k, v any) bool { | |
+ count++ | |
+ return true | |
+ }) | |
+ if ignoreNSlowestReplicas >= count { | |
+ // Do not ignore slow replicas if all would get ignored. | |
+ return | |
+ } | |
} | |
// Turn the map of replicas into a list and then sort it. | |
var list byLagAndTabletUID | |
i := 0 | |
- for _, v := range c.entries { | |
- record := v.latest() | |
+ c.entries.Range(func(k, v any) bool { | |
+ val := v.(*replicationLagHistory) | |
+ record := val.latest() | |
if int64(record.Stats.ReplicationLagSeconds) >= minimumReplicationLag { | |
list = append(list, record.TabletHealth) | |
i++ | |
} | |
- } | |
+ return true | |
+ }) | |
sort.Sort(list) | |
// Now remember the N slowest replicas. | |
@@ -151,8 +161,12 @@ func (c *replicationLagCache) ignoreSlowReplica(key string) bool { | |
if slow { | |
// Record that we're ignoring this replica. | |
c.ignoredSlowReplicasInARow[key] = true | |
- | |
- if len(c.ignoredSlowReplicasInARow) == len(c.entries) { | |
+ count := 0 | |
+ c.entries.Range(func(k, v any) bool { | |
+ count++ | |
+ return true | |
+ }) | |
+ if len(c.ignoredSlowReplicasInARow) == count { | |
// All but this replica have been ignored in a row. Break this cycle now. | |
slow = false | |
} | |
@@ -180,25 +194,31 @@ func (c *replicationLagCache) isIgnored(key string) bool { | |
// replicationLagHistory stores the most recent replicationLagRecord entries | |
// in a ring buffer for a single replica. | |
type replicationLagHistory struct { | |
- records []replicationLagRecord | |
+ records []atomic.Pointer[replicationLagRecord] | |
// current has the index in "records" of the last element added by add(). | |
- current int | |
+ current atomic.Int32 | |
} | |
func newReplicationLagHistory(capacity int) *replicationLagHistory { | |
- return &replicationLagHistory{ | |
- records: make([]replicationLagRecord, capacity), | |
- current: -1, | |
+ rlh := &replicationLagHistory{ | |
+ records: make([]atomic.Pointer[replicationLagRecord], capacity), | |
} | |
+ rlh.current.Store(-1) | |
+ return rlh | |
} | |
func (h *replicationLagHistory) add(r replicationLagRecord) { | |
h.advanceCurrent() | |
- h.records[h.current] = r | |
+ h.records[h.current.Load()].Store(&r) | |
} | |
-func (h *replicationLagHistory) latest() replicationLagRecord { | |
- return h.records[h.current] | |
+func (h *replicationLagHistory) latest() *replicationLagRecord { | |
+ c := h.current.Load() - 1 | |
+ if h.records == nil || c < 0 { | |
+ return &replicationLagRecord{} | |
+ } | |
+ h.records[c].CompareAndSwap(nil, &replicationLagRecord{}) | |
+ return h.records[c].Load() | |
} | |
// atOrAfter returns the oldest replicationLagRecord which happened at "at" | |
@@ -206,24 +226,24 @@ func (h *replicationLagHistory) latest() replicationLagRecord { | |
// If there is no such record, a zero record is returned. | |
func (h *replicationLagHistory) atOrAfter(at time.Time) replicationLagRecord { | |
wrapped := false | |
- i := h.current | |
+ i := h.current.Load() | |
for { | |
// Look at the previous (older) entry to decide if we should return the | |
// current entry. | |
prev := i - 1 | |
if prev < 0 { | |
wrapped = true | |
- prev = len(h.records) - 1 | |
+ prev = int32(len(h.records) - 1) | |
} | |
- if h.records[prev].isZero() || h.records[prev].time.Before(at) { | |
+ if h.records[prev].Load().isZero() || h.records[prev].Load().time.Before(at) { | |
// Return this entry because the previous one does not exist or | |
// it happened before the time we're interested in. | |
- return h.records[i] | |
+ return *h.records[i].Load() | |
} | |
- if wrapped && prev == h.current { | |
+ if wrapped && prev == h.current.Load() { | |
// We scanned the whole list and all entries match. Return the oldest. | |
- return h.records[i] | |
+ return *h.records[i].Load() | |
} | |
i = prev | |
@@ -231,8 +251,8 @@ func (h *replicationLagHistory) atOrAfter(at time.Time) replicationLagRecord { | |
} | |
func (h *replicationLagHistory) advanceCurrent() { | |
- h.current++ | |
- if h.current == len(h.records) { | |
- h.current = 0 | |
+ cur := h.current.Add(1) | |
+ if cur == int32(len(h.records)) { | |
+ h.current.Store(0) | |
} | |
} | |
diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go | |
index 909888bd0d..4f6a7f4d8f 100644 | |
--- a/go/vt/throttler/throttler.go | |
+++ b/go/vt/throttler/throttler.go | |
@@ -228,21 +228,27 @@ func (t *Throttler) Throttle(threadID int) time.Duration { | |
// MaxLag returns the max of all the last replication lag values seen across all tablets of | |
// the provided type, excluding ignored tablets. | |
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { | |
- cache := t.maxReplicationLagModule.lagCacheByType(tabletType) | |
- | |
var maxLag uint32 | |
- cacheEntries := cache.entries | |
- | |
- for key := range cacheEntries { | |
+ cache := t.maxReplicationLagModule.lagCacheByType(tabletType) | |
+ if cache == nil { | |
+ return 0 | |
+ } | |
+ cache.entries.Range(func(k, v any) bool { | |
+ key := k.(string) | |
+ val := v.(*replicationLagHistory) | |
if cache.isIgnored(key) { | |
- continue | |
+ return true | |
} | |
- | |
- lag := cache.latest(key).Stats.ReplicationLagSeconds | |
+ l := val.latest() | |
+ if l == nil || l.Stats == nil { | |
+ return true | |
+ } | |
+ lag := l.Stats.ReplicationLagSeconds | |
if lag > maxLag { | |
maxLag = lag | |
} | |
- } | |
+ return true | |
+ }) | |
return maxLag | |
} | |
diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go | |
index b33bb2ca25..80ea65fe25 100644 | |
--- a/go/vt/throttler/throttler_test.go | |
+++ b/go/vt/throttler/throttler_test.go | |
@@ -17,11 +17,16 @@ limitations under the License. | |
package throttler | |
import ( | |
+ "context" | |
"runtime" | |
+ "sync" | |
"testing" | |
"time" | |
"github.com/stretchr/testify/require" | |
+ "vitess.io/vitess/go/vt/discovery" | |
+ "vitess.io/vitess/go/vt/proto/query" | |
+ "vitess.io/vitess/go/vt/proto/topodata" | |
) | |
// The main purpose of the benchmarks below is to demonstrate the functionality | |
@@ -398,3 +403,76 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) { | |
}() | |
throttler.ThreadFinished(0) | |
} | |
+ | |
+func TestThrottlerMaxLag(t *testing.T) { | |
+ fc := &fakeClock{} | |
+ throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now) | |
+ require.NoError(t, err) | |
+ defer throttler.Close() | |
+ | |
+ require.NotNil(t, throttler) | |
+ require.NotNil(t, throttler.maxReplicationLagModule) | |
+ | |
+ ctx, cancel := context.WithCancel(context.Background()) | |
+ var wg sync.WaitGroup | |
+ | |
+ // run .add() and .MaxLag() concurrently to detect races | |
+ for _, tabletType := range []topodata.TabletType{ | |
+ topodata.TabletType_REPLICA, | |
+ topodata.TabletType_RDONLY, | |
+ } { | |
+ wg.Add(1) | |
+ go func() { | |
+ defer wg.Done() | |
+ for { | |
+ select { | |
+ case <-ctx.Done(): | |
+ return | |
+ default: | |
+ throttler.MaxLag(tabletType) | |
+ } | |
+ } | |
+ }() | |
+ | |
+ wg.Add(1) | |
+ go func() { | |
+ defer wg.Done() | |
+ for { | |
+ select { | |
+ case <-ctx.Done(): | |
+ return | |
+ default: | |
+ cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType) | |
+ require.NotNil(t, cache) | |
+ cache.add(replicationLagRecord{ | |
+ time: time.Now(), | |
+ TabletHealth: discovery.TabletHealth{ | |
+ Serving: true, | |
+ Stats: &query.RealtimeStats{ | |
+ ReplicationLagSeconds: 5, | |
+ }, | |
+ Tablet: &topodata.Tablet{ | |
+ Hostname: t.Name(), | |
+ Type: tabletType, | |
+ PortMap: map[string]int32{ | |
+ "test": 15999, | |
+ }, | |
+ }, | |
+ }, | |
+ }) | |
+ } | |
+ } | |
+ }() | |
+ } | |
+ time.Sleep(time.Second) | |
+ cancel() | |
+ wg.Wait() | |
+ | |
+ // check .MaxLag() | |
+ for _, tabletType := range []topodata.TabletType{ | |
+ topodata.TabletType_REPLICA, | |
+ topodata.TabletType_RDONLY, | |
+ } { | |
+ require.Equal(t, uint32(5), throttler.MaxLag(tabletType)) | |
+ } | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment