Skip to content

Instantly share code, notes, and snippets.

@mattlord
Last active May 13, 2024 02:32
Show Gist options
  • Save mattlord/68e8636fe65cf72baab31e8f6b7b8604 to your computer and use it in GitHub Desktop.
Save mattlord/68e8636fe65cf72baab31e8f6b7b8604 to your computer and use it in GitHub Desktop.
KeyspaceRoutingRules diff
diff --git a/go/test/endtoend/vreplication/multi_tenant_test.go b/go/test/endtoend/vreplication/multi_tenant_test.go
index 0c08126c83..7cd6bb3036 100644
--- a/go/test/endtoend/vreplication/multi_tenant_test.go
+++ b/go/test/endtoend/vreplication/multi_tenant_test.go
@@ -71,6 +71,8 @@ var (
chNotSetup, chNotCreated, chInProgress, chSwitched, chCompleted chan int64
// counters to keep track of the number of tenants in each state
numSetup, numInProgress, numSwitched, numCompleted atomic.Int64
+
+ emptyKeyspaceRoutingRules = &vschemapb.KeyspaceRoutingRules{}
)
// multiTenantMigration manages the migration of multiple tenants to a single target keyspace.
@@ -226,7 +228,7 @@ func TestMultiTenantSimple(t *testing.T) {
t.Run("Test ApplyKeyspaceRoutingRules", func(t *testing.T) {
// First set of rules
- applyKeyspaceRoutingRules(t, nil, initialRules)
+ applyKeyspaceRoutingRules(t, emptyKeyspaceRoutingRules, initialRules)
updatedRules := &vschemapb.KeyspaceRoutingRules{
Rules: []*vschemapb.KeyspaceRoutingRule{
@@ -240,12 +242,9 @@ func TestMultiTenantSimple(t *testing.T) {
// Update with the same rules
applyKeyspaceRoutingRules(t, updatedRules, updatedRules)
// Remove the rules
- emptyRules := &vschemapb.KeyspaceRoutingRules{
- Rules: []*vschemapb.KeyspaceRoutingRule{},
- }
- applyKeyspaceRoutingRules(t, updatedRules, emptyRules)
+ applyKeyspaceRoutingRules(t, updatedRules, emptyKeyspaceRoutingRules)
// Test setting empty rules again
- applyKeyspaceRoutingRules(t, emptyRules, emptyRules)
+ applyKeyspaceRoutingRules(t, emptyKeyspaceRoutingRules, emptyKeyspaceRoutingRules)
})
}
@@ -265,12 +264,12 @@ func applyKeyspaceRoutingRules(t *testing.T, oldRules, newRules *vschemapb.Keysp
err = json.Unmarshal([]byte(output), response)
require.NoError(t, err)
if oldRules == nil || len(oldRules.Rules) == 0 {
- require.Nil(t, response.GetOldKeyspaceRoutingRules())
+ require.Empty(t, response.GetOldKeyspaceRoutingRules().String())
} else {
require.ElementsMatch(t, oldRules.Rules, response.GetOldKeyspaceRoutingRules().Rules)
}
if newRules == nil || len(newRules.Rules) == 0 {
- require.Nil(t, response.GetNewKeyspaceRoutingRules())
+ require.Empty(t, response.GetNewKeyspaceRoutingRules().String())
} else {
require.ElementsMatch(t, newRules.Rules, response.GetNewKeyspaceRoutingRules().Rules)
}
diff --git a/go/vt/topo/routing_rules_lock.go b/go/vt/topo/routing_rules_lock.go
index a714d5e52a..db4fa63bc9 100644
--- a/go/vt/topo/routing_rules_lock.go
+++ b/go/vt/topo/routing_rules_lock.go
@@ -19,8 +19,6 @@ package topo
import (
"context"
"fmt"
-
- "vitess.io/vitess/go/vt/log"
)
// RoutingRulesLock is a wrapper over TopoLock, to serialize updates to routing rules.
@@ -29,11 +27,6 @@ type RoutingRulesLock struct {
}
func NewRoutingRulesLock(ctx context.Context, ts *Server, name string) (*RoutingRulesLock, error) {
- if err := ts.EnsureTopoPathExists(ctx, "Routing Rules", RoutingRulesPath); err != nil {
- log.Errorf("Failed to create routing rules lock file: %v", err)
- return nil, err
- }
-
return &RoutingRulesLock{
TopoLock: &TopoLock{
Path: RoutingRulesPath,
diff --git a/go/vt/topo/routing_rules_lock_test.go b/go/vt/topo/routing_rules_lock_test.go
index 5a6152532b..2302751701 100644
--- a/go/vt/topo/routing_rules_lock_test.go
+++ b/go/vt/topo/routing_rules_lock_test.go
@@ -24,6 +24,8 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
+
+ vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)
// TestKeyspaceRoutingRulesLock tests that the lock is acquired and released correctly.
@@ -39,6 +41,9 @@ func TestKeyspaceRoutingRulesLock(t *testing.T) {
topo.LockTimeout = currentTopoLockTimeout
}()
+ err := ts.CreateKeyspaceRoutingRules(ctx, &vschemapb.KeyspaceRoutingRules{})
+ require.NoError(t, err)
+
lock, err := topo.NewRoutingRulesLock(ctx, ts, "ks1")
require.NoError(t, err)
_, unlock, err := lock.Lock(ctx)
diff --git a/go/vt/topo/topo_lock.go b/go/vt/topo/topo_lock.go
index 1c5248c849..ffd732fff3 100644
--- a/go/vt/topo/topo_lock.go
+++ b/go/vt/topo/topo_lock.go
@@ -19,7 +19,6 @@ package topo
import (
"context"
"fmt"
- "path"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
@@ -168,27 +167,3 @@ func CheckLocked(ctx context.Context, keyPath string) error {
// and we're good for now.
return nil
}
-
-/*
-EnsureTopoPathExists creates the specified key by creating a sentinel (dummy) child key under it.
-Vitess expects a key to exist, and to have a child key (it imposes a directory-like structure), before locking it.
-Without this we get an error when trying to lock :node doesn't exist: /vitess/global/<keyPath>/.
-*/
-func (ts *Server) EnsureTopoPathExists(ctx context.Context, name, keyPath string) error {
- sentinelPath := path.Join(keyPath, "sentinel")
- _, _, err := ts.globalCell.Get(ctx, sentinelPath)
- if IsErrType(err, NoNode) {
- _, err = ts.globalCell.Create(ctx, sentinelPath,
- []byte(fmt.Sprintf("ensure creation of %s root key: %s", name, keyPath)))
- if IsErrType(err, NodeExists) {
- // Another process created the file, which is fine.
- return nil
- }
- if err != nil {
- log.Errorf(fmt.Sprintf("Failed to create sentinel file %s: %v", sentinelPath, err))
- } else {
- log.Infof(fmt.Sprintf("Successfully created sentinel file %s", sentinelPath))
- }
- }
- return err
-}
diff --git a/go/vt/topo/topo_lock_test.go b/go/vt/topo/topo_lock_test.go
index 203968ddaa..c378c05a9f 100644
--- a/go/vt/topo/topo_lock_test.go
+++ b/go/vt/topo/topo_lock_test.go
@@ -25,6 +25,8 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
+
+ vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)
// lower the lock timeout for testing
@@ -36,7 +38,10 @@ func TestTopoLockTimeout(t *testing.T) {
defer cancel()
ts := memorytopo.NewServer(ctx, "zone1")
defer ts.Close()
- err := ts.EnsureTopoPathExists(ctx, "test", "root/key1")
+
+ err := ts.CreateKeyspaceRoutingRules(ctx, &vschemapb.KeyspaceRoutingRules{})
+ require.NoError(t, err)
+ lock, err := topo.NewRoutingRulesLock(ctx, ts, "ks1")
require.NoError(t, err)
currentTopoLockTimeout := topo.LockTimeout
@@ -47,13 +52,12 @@ func TestTopoLockTimeout(t *testing.T) {
// acquire the lock
origCtx := ctx
- tl1 := ts.NewTopoLock("root", "name")
- _, unlock, err := tl1.Lock(origCtx)
+ _, unlock, err := lock.Lock(origCtx)
require.NoError(t, err)
defer unlock(&err)
// re-acquiring the lock should fail
- _, _, err2 := tl1.Lock(origCtx)
+ _, _, err2 := lock.Lock(origCtx)
require.Errorf(t, err2, "deadline exceeded")
}
@@ -63,29 +67,23 @@ func TestTopoLockBasic(t *testing.T) {
defer cancel()
ts := memorytopo.NewServer(ctx, "zone1")
defer ts.Close()
- err := ts.EnsureTopoPathExists(ctx, "test", "root/key1")
+
+ err := ts.CreateKeyspaceRoutingRules(ctx, &vschemapb.KeyspaceRoutingRules{})
+ require.NoError(t, err)
+ lock, err := topo.NewRoutingRulesLock(ctx, ts, "ks1")
require.NoError(t, err)
origCtx := ctx
- tl1 := ts.NewTopoLock("root/key1", "name")
- ctx, unlock, err := tl1.Lock(origCtx)
+ ctx, unlock, err := lock.Lock(origCtx)
require.NoError(t, err)
// locking the same key again, without unlocking, should return an error
- _, _, err2 := tl1.Lock(ctx)
+ _, _, err2 := lock.Lock(ctx)
require.ErrorContains(t, err2, "already held")
// confirm that the lock can be re-acquired after unlocking
unlock(&err)
- ctx, unlock, err = tl1.Lock(origCtx)
+ _, unlock, err = lock.Lock(origCtx)
require.NoError(t, err)
defer unlock(&err)
-
- // locking another key should work
- err = ts.EnsureTopoPathExists(ctx, "test", "root/key2")
- require.NoError(t, err)
- tl2 := ts.NewTopoLock("root/key2", "name")
- _, unlock2, err := tl2.Lock(ctx)
- require.NoError(t, err)
- defer unlock2(&err)
}
diff --git a/go/vt/topo/vschema.go b/go/vt/topo/vschema.go
index 8da217ad26..e6b7bcb24c 100644
--- a/go/vt/topo/vschema.go
+++ b/go/vt/topo/vschema.go
@@ -159,18 +159,23 @@ func (ts *Server) GetShardRoutingRules(ctx context.Context) (*vschemapb.ShardRou
return srr, nil
}
+// CreateKeyspaceRoutingRules wraps the underlying Conn.Create.
+func (ts *Server) CreateKeyspaceRoutingRules(ctx context.Context, value *vschemapb.KeyspaceRoutingRules) error {
+ data, err := value.MarshalVT()
+ if err != nil {
+ return err
+ }
+ if _, err := ts.globalCell.Create(ctx, ts.GetKeyspaceRoutingRulesPath(), data); err != nil {
+ return err
+ }
+ return nil
+}
+
func (ts *Server) SaveKeyspaceRoutingRules(ctx context.Context, rules *vschemapb.KeyspaceRoutingRules) error {
data, err := rules.MarshalVT()
if err != nil {
return err
}
- if len(data) == 0 {
- // No rules, remove it.
- if err := ts.globalCell.Delete(ctx, ts.GetKeyspaceRoutingRulesPath(), nil); err != nil && !IsErrType(err, NoNode) {
- return err
- }
- return nil
- }
_, err = ts.globalCell.Update(ctx, ts.GetKeyspaceRoutingRulesPath(), data, nil)
return err
}
diff --git a/go/vt/topotools/routing_rules.go b/go/vt/topotools/routing_rules.go
index d8eb1ba9f4..a3bc5a8a95 100644
--- a/go/vt/topotools/routing_rules.go
+++ b/go/vt/topotools/routing_rules.go
@@ -150,26 +150,34 @@ func GetKeyspaceRoutingRules(ctx context.Context, ts *topo.Server) (map[string]s
return rules, nil
}
+// buildKeyspaceRoutingRules builds a vschemapb.KeyspaceRoutingRules struct from a map of
+// fromKeyspace=>toKeyspace values.
+func buildKeyspaceRoutingRules(rules *map[string]string) *vschemapb.KeyspaceRoutingRules {
+ keyspaceRoutingRules := &vschemapb.KeyspaceRoutingRules{Rules: make([]*vschemapb.KeyspaceRoutingRule, 0, len(*rules))}
+ for from, to := range *rules {
+ keyspaceRoutingRules.Rules = append(keyspaceRoutingRules.Rules, &vschemapb.KeyspaceRoutingRule{
+ FromKeyspace: from,
+ ToKeyspace: to,
+ })
+ }
+ return keyspaceRoutingRules
+}
+
// saveKeyspaceRoutingRulesLocked saves the keyspace routing rules in the topo server. It expects the caller to
// have acquired a RoutingRulesLock.
func saveKeyspaceRoutingRulesLocked(ctx context.Context, ts *topo.Server, rules map[string]string) error {
if err := topo.CheckLocked(ctx, topo.RoutingRulesPath); err != nil {
return err
}
- keyspaceRoutingRules := &vschemapb.KeyspaceRoutingRules{Rules: make([]*vschemapb.KeyspaceRoutingRule, 0, len(rules))}
- for from, to := range rules {
- keyspaceRoutingRules.Rules = append(keyspaceRoutingRules.Rules, &vschemapb.KeyspaceRoutingRule{
- FromKeyspace: from,
- ToKeyspace: to,
- })
- }
- return ts.SaveKeyspaceRoutingRules(ctx, keyspaceRoutingRules)
+ return ts.SaveKeyspaceRoutingRules(ctx, buildKeyspaceRoutingRules(&rules))
}
-// UpdateKeyspaceRoutingRules updates the keyspace routing rules in the topo server. It initially acquires a
-// RoutingRulesLock and then calls the update function to modify the rules in-place.
-// If the update function returns an error, the rules are not saved and the lock is released.
-// If the update function is successful, the rules are saved to the topo and the lock is released.
+// UpdateKeyspaceRoutingRules updates the keyspace routing rules in the topo server.
+// If the keyspace routing rules do not yet exist, it will create them. If multiple callers
+// are racing to create the initial keyspace routing rules then the first writer will win
+// and the other callers can immediately retry when getting the resulting topo.NodeExists
+// error. When the routing rules already exist, it will acquire a RoutingRulesLock and
+// then modify the keyspace routing rules in-place.
func UpdateKeyspaceRoutingRules(ctx context.Context, ts *topo.Server, reason string,
update func(ctx context.Context, rules *map[string]string) error) (err error) {
var lock *topo.RoutingRulesLock
@@ -179,12 +187,26 @@ func UpdateKeyspaceRoutingRules(ctx context.Context, ts *topo.Server, reason str
}
lockCtx, unlock, lockErr := lock.Lock(ctx)
if lockErr != nil {
- return lockErr
+ // If the key does not yet exist then let's create it.
+ if !topo.IsErrType(lockErr, topo.NoNode) {
+ return lockErr
+ }
+ rules := make(map[string]string)
+ if err := update(ctx, &rules); err != nil {
+ return err
+ }
+ // This will fail if the key already exists and thus avoids any races here. The first
+ // writer will win and the others will have to retry. This situation should be very
+ // rare as we are typically only updating the rules from here on out.
+ if err := ts.CreateKeyspaceRoutingRules(ctx, buildKeyspaceRoutingRules(&rules)); err != nil {
+ return err
+ }
+ return nil
}
defer unlock(&err)
- rules, _ := GetKeyspaceRoutingRules(lockCtx, ts)
- if rules == nil {
- rules = make(map[string]string)
+ rules, err := GetKeyspaceRoutingRules(lockCtx, ts)
+ if err != nil {
+ return err
}
if err := update(lockCtx, &rules); err != nil {
return err
diff --git a/go/vt/topotools/routing_rules_test.go b/go/vt/topotools/routing_rules_test.go
index 0238970c53..2d4d9feacd 100644
--- a/go/vt/topotools/routing_rules_test.go
+++ b/go/vt/topotools/routing_rules_test.go
@@ -128,6 +128,22 @@ func TestSaveKeyspaceRoutingRulesLocked(t *testing.T) {
"ks1": "ks2",
"ks4": "ks5",
}
+
+ t.Run("unlocked, doesn't exist", func(t *testing.T) {
+ err := saveKeyspaceRoutingRulesLocked(ctx, ts, rulesMap)
+ require.Errorf(t, err, "node doesn't exist: routing_rules")
+ })
+
+ t.Run("create", func(t *testing.T) {
+ err := ts.CreateKeyspaceRoutingRules(ctx, buildKeyspaceRoutingRules(&rulesMap))
+ require.NoError(t, err)
+ })
+
+ t.Run("create again", func(t *testing.T) {
+ err := ts.CreateKeyspaceRoutingRules(ctx, buildKeyspaceRoutingRules(&rulesMap))
+ require.True(t, topo.IsErrType(err, topo.NodeExists))
+ })
+
t.Run("unlocked", func(t *testing.T) {
err := saveKeyspaceRoutingRulesLocked(ctx, ts, rulesMap)
require.Errorf(t, err, "routing_rules is not locked (no locksInfo)")
@@ -140,12 +156,12 @@ func TestSaveKeyspaceRoutingRulesLocked(t *testing.T) {
require.NoError(t, err)
defer unlock(&err)
- t.Run("locked, unlocked ctx", func(t *testing.T) {
- err = saveKeyspaceRoutingRulesLocked(ctx, ts, rulesMap)
- require.Errorf(t, err, "routing_rules is not locked (no locksInfo)")
- })
t.Run("locked, locked ctx", func(t *testing.T) {
err = saveKeyspaceRoutingRulesLocked(lockCtx, ts, rulesMap)
require.NoError(t, err)
})
+ t.Run("locked, unlocked ctx", func(t *testing.T) {
+ err = saveKeyspaceRoutingRulesLocked(ctx, ts, rulesMap)
+ require.Errorf(t, err, "routing_rules is not locked (no locksInfo)")
+ })
}
diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go
index df2123ee9e..5e89bab406 100644
--- a/go/vt/vtctl/grpcvtctldserver/server.go
+++ b/go/vt/vtctl/grpcvtctldserver/server.go
@@ -5183,15 +5183,26 @@ func (s *VtctldServer) ApplyKeyspaceRoutingRules(ctx context.Context, req *vtctl
}
resp.OldKeyspaceRoutingRules = currentRules
- if err := topotools.UpdateKeyspaceRoutingRules(ctx, s.ts, "ApplyKeyspaceRoutingRules",
- func(ctx context.Context, rules *map[string]string) error {
- clear(*rules)
- for _, rule := range req.GetKeyspaceRoutingRules().Rules {
- (*rules)[rule.FromKeyspace] = rule.ToKeyspace
- }
- return nil
- }); err != nil {
- return nil, err
+ update := func() error {
+ return topotools.UpdateKeyspaceRoutingRules(ctx, s.ts, "ApplyKeyspaceRoutingRules",
+ func(ctx context.Context, rules *map[string]string) error {
+ clear(*rules)
+ for _, rule := range req.GetKeyspaceRoutingRules().Rules {
+ (*rules)[rule.FromKeyspace] = rule.ToKeyspace
+ }
+ return nil
+ })
+ }
+ err = update()
+ if err != nil {
+ // If we were racing with another caller to create the initial routing rules, then
+ // we can immediately retry the operation.
+ if !topo.IsErrType(err, topo.NodeExists) {
+ return nil, err
+ }
+ if err = update(); err != nil {
+ return nil, err
+ }
}
newRules, err := s.ts.GetKeyspaceRoutingRules(ctx)
diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go
index ff35e85f74..9ea1c8b609 100644
--- a/go/vt/vtctl/workflow/traffic_switcher.go
+++ b/go/vt/vtctl/workflow/traffic_switcher.go
@@ -444,8 +444,8 @@ func (ts *trafficSwitcher) deleteKeyspaceRoutingRules(ctx context.Context) error
return nil
}
log.Infof("deleteKeyspaceRoutingRules: workflow %s.%s", ts.targetKeyspace, ts.workflow)
- name := fmt.Sprintf("Deleting %s", ts.SourceKeyspaceName())
- return topotools.UpdateKeyspaceRoutingRules(ctx, ts.TopoServer(), name,
+ reason := fmt.Sprintf("Deleting rules for %s", ts.SourceKeyspaceName())
+ return topotools.UpdateKeyspaceRoutingRules(ctx, ts.TopoServer(), reason,
func(ctx context.Context, rules *map[string]string) error {
for _, suffix := range tabletTypeSuffixes {
delete(*rules, ts.SourceKeyspaceName()+suffix)
diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go
index 79e5481b93..bc7fa7f704 100644
--- a/go/vt/vtctl/workflow/utils.go
+++ b/go/vt/vtctl/workflow/utils.go
@@ -863,19 +863,28 @@ func changeKeyspaceRouting(ctx context.Context, ts *topo.Server, tabletTypes []t
return ts.RebuildSrvVSchema(ctx, nil)
}
-// updateKeyspaceRoutingRules updates the keyspace routing rule for the (effective) source keyspace to the target keyspace.
+// updateKeyspaceRoutingRules updates the keyspace routing rules for the (effective) source
+// keyspace to the target keyspace.
func updateKeyspaceRoutingRules(ctx context.Context, ts *topo.Server, sourceKeyspace, reason string, routes map[string]string) error {
- err := topotools.UpdateKeyspaceRoutingRules(ctx, ts, reason,
- func(ctx context.Context, rules *map[string]string) error {
- for fromKeyspace, toKeyspace := range routes {
- (*rules)[fromKeyspace] = toKeyspace
- }
- return nil
- })
- if err != nil {
- log.Errorf("Failed to update keyspace routing rules for keyspace %s: %v", sourceKeyspace, err)
+ update := func() error {
+ return topotools.UpdateKeyspaceRoutingRules(ctx, ts, reason,
+ func(ctx context.Context, rules *map[string]string) error {
+ for fromKeyspace, toKeyspace := range routes {
+ (*rules)[fromKeyspace] = toKeyspace
+ }
+ return nil
+ })
+ }
+ err := update()
+ if err == nil {
+ return nil
+ }
+ // If we were racing with another caller to create the initial routing rules, then
+ // we can immediately retry the operation.
+ if !topo.IsErrType(err, topo.NodeExists) {
+ return err
}
- return err
+ return update()
}
func validateTenantId(dataType querypb.Type, value string) error {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment