Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benesch/53203264cf72eb6a1efe9137a306a26f to your computer and use it in GitHub Desktop.
Save benesch/53203264cf72eb6a1efe9137a306a26f to your computer and use it in GitHub Desktop.
// TestInternalExecutorTxnAbortNotSwallowed reproduces a rare bug where the
// internal executor could swallow transaction aborted errors. Specifically, an
// optimizer code path was not propagating errors, violating the contract of our
// transaction API, and causing partial split transactions that resulted in
// replica corruption errors (#32784).
//
// Note that a fix to our transaction API to eliminate this class of errors is
// proposed in #22615.
func TestInternalExecutorTxnAbortNotSwallowed(t *testing.T) {
defer leaktest.AfterTest(t)()
// Notify a channel whenever a HeartbeatTxn request notices that a txn has
// been aborted.
heartbeatSawAbortedTxn := make(chan uuid.UUID)
params, _ := tests.CreateTestServerParams()
params.Knobs.Store = &storage.StoreTestingKnobs{
TestingResponseFilter: func(ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for i, r := range ba.Requests {
if r.GetHeartbeatTxn() != nil && br.Responses[i].GetHeartbeatTxn().Txn.Status == roachpb.ABORTED {
go func() {
heartbeatSawAbortedTxn <- ba.Txn.ID
}()
}
}
return nil
},
}
ctx := context.Background()
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
ie := s.InternalExecutor().(*sql.InternalExecutor)
if _, err := sqlDB.Exec("CREATE TABLE t (a INT)"); err != nil {
t.Fatal(err)
}
// Create a new txn, and perform a write inside of it so that its txn record
// is written and the heartbeat loop is started. The particular key doesn't
// matter.
txn := client.NewTxn(ctx, kvDB, s.NodeID(), client.RootTxn)
origTxnID := txn.ID()
if err := txn.Put(ctx, "key-foo", []byte("bar")); err != nil {
t.Fatal(err)
}
// Abort the txn directly with a PushTxnRequest. This happens in practice
// when, e.g., deadlock between two txns is detected.
txnProto := txn.GetTxnCoordMeta(ctx).Txn
if _, pErr := client.SendWrapped(ctx, kvDB.NonTransactionalSender(), &roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: txnProto.Key},
PusheeTxn: txnProto.TxnMeta,
PusherTxn: roachpb.Transaction{TxnMeta: enginepb.TxnMeta{Priority: roachpb.MaxTxnPriority}},
PushType: roachpb.PUSH_ABORT,
}); pErr != nil {
t.Fatal(pErr)
}
// Wait for one of the txn's heartbeats to notice that the heartbeat has
// failed.
for txnID := range heartbeatSawAbortedTxn {
if txnID == origTxnID {
break
}
}
// Execute a SQL statement in the txn using an internal executor. Importantly,
// we're accessing a system table, which bypasses the descriptor cache and
// forces the optimizer to perform raw KV lookups during name resolution.
_, err := ie.Exec(ctx, t.Name(), txn, "INSERT INTO system.zones VALUES ($1, $2)", 50, "")
// Double-check that the client.Txn has "helpfully" given us a brand new
// KV txn to replace our aborted txn. (#22615)
if origTxnID == txn.ID() {
t.Fatal("test bug: txn ID did not change after executing SQL statement on aborted txn")
}
// We now have proof that the client.Txn saw the aborted error; the internal
// executor had better have bubbled this error up so that we know to retry our
// txn from the beginning.
if !testutils.IsError(err, "TransactionAbortedError") {
t.Fatalf("expected query execution on aborted txn to fail, but got %+v", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment