Created
January 3, 2019 04:47
-
-
Save benesch/53203264cf72eb6a1efe9137a306a26f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// TestInternalExecutorTxnAbortNotSwallowed reproduces a rare bug where the | |
// internal executor could swallow transaction aborted errors. Specifically, an | |
// optimizer code path was not propagating errors, violating the contract of our | |
// transaction API, and causing partial split transactions that resulted in | |
// replica corruption errors (#32784). | |
// | |
// Note that a fix to our transaction API to eliminate this class of errors is | |
// proposed in #22615. | |
func TestInternalExecutorTxnAbortNotSwallowed(t *testing.T) { | |
defer leaktest.AfterTest(t)() | |
// Notify a channel whenever a HeartbeatTxn request notices that a txn has | |
// been aborted. | |
heartbeatSawAbortedTxn := make(chan uuid.UUID) | |
params, _ := tests.CreateTestServerParams() | |
params.Knobs.Store = &storage.StoreTestingKnobs{ | |
TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { | |
for i, r := range ba.Requests { | |
if r.GetHeartbeatTxn() != nil && br.Responses[i].GetHeartbeatTxn().Txn.Status == roachpb.ABORTED { | |
go func() { | |
heartbeatSawAbortedTxn <- ba.Txn.ID | |
}() | |
} | |
} | |
return nil | |
}, | |
} | |
ctx := context.Background() | |
s, sqlDB, kvDB := serverutils.StartServer(t, params) | |
defer s.Stopper().Stop(ctx) | |
ie := s.InternalExecutor().(*sql.InternalExecutor) | |
if _, err := sqlDB.Exec("CREATE TABLE t (a INT)"); err != nil { | |
t.Fatal(err) | |
} | |
// Create a new txn, and perform a write inside of it so that its txn record | |
// is written and the heartbeat loop is started. The particular key doesn't | |
// matter. | |
txn := client.NewTxn(ctx, kvDB, s.NodeID(), client.RootTxn) | |
origTxnID := txn.ID() | |
if err := txn.Put(ctx, "key-foo", []byte("bar")); err != nil { | |
t.Fatal(err) | |
} | |
// Abort the txn directly with a PushTxnRequest. This happens in practice | |
// when, e.g., deadlock between two txns is detected. | |
txnProto := txn.GetTxnCoordMeta(ctx).Txn | |
if _, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), &roachpb.PushTxnRequest{ | |
RequestHeader: roachpb.RequestHeader{Key: txnProto.Key}, | |
PusheeTxn: txnProto.TxnMeta, | |
PusherTxn: roachpb.Transaction{TxnMeta: enginepb.TxnMeta{Priority: roachpb.MaxTxnPriority}}, | |
PushType: roachpb.PUSH_ABORT, | |
}); pErr != nil { | |
t.Fatal(pErr) | |
} | |
// Wait for one of the txn's heartbeats to notice that the heartbeat has | |
// failed. | |
for txnID := range heartbeatSawAbortedTxn { | |
if txnID == origTxnID { | |
break | |
} | |
} | |
// Execute a SQL statement in the txn using an internal executor. Importantly, | |
// we're accessing a system table, which bypasses the descriptor cache and | |
// forces the optimizer to perform raw KV lookups during name resolution. | |
_, err := ie.Exec(ctx, t.Name(), txn, "INSERT INTO system.zones VALUES ($1, $2)", 50, "") | |
// Double-check that the client.Txn has "helpfully" given us a brand new | |
// KV txn to replace our aborted txn. (#22615) | |
if origTxnID == txn.ID() { | |
t.Fatal("test bug: txn ID did not change after executing SQL statement on aborted txn") | |
} | |
// We now have proof that the client.Txn saw the aborted error; the internal | |
// executor had better have bubbled this error up so that we know to retry our | |
// txn from the beginning. | |
if !testutils.IsError(err, "TransactionAbortedError") { | |
t.Fatalf("expected query execution on aborted txn to fail, but got %+v", err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment