Skip to content

Instantly share code, notes, and snippets.

@stevekuznetsov
Last active March 10, 2022 22:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stevekuznetsov/e17d574cade6ce6ea7f914885c432b68 to your computer and use it in GitHub Desktop.
Save stevekuznetsov/e17d574cade6ce6ea7f914885c432b68 to your computer and use it in GitHub Desktop.
Investigating CockroachDB Changefeed Behavior
package main
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"time"
"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach-go/v2/testserver"
"github.com/google/go-cmp/cmp"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/log/logrusadapter"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
)
func main() {
ts, err := testserver.NewTestServer()
if err != nil {
logrus.WithError(err).Fatal("failed to start crdb")
}
defer func() {
ts.Stop()
}()
ctx := context.Background()
cfg, err := pgxpool.ParseConfig(ts.PGURL().String())
if err != nil {
logrus.WithError(err).Fatal("failed to parse test connection")
}
cfg.ConnConfig.LogLevel = pgx.LogLevelTrace
cfg.ConnConfig.Logger = logrusadapter.NewLogger(logrus.New())
cfg.MaxConns = 128
client, err := pgxpool.ConnectConfig(ctx, cfg)
if err != nil {
logrus.WithError(err).Fatal("failed to connect to crdb")
}
for _, stmt := range []string{
`CREATE TABLE IF NOT EXISTS test
(
key INTEGER PRIMARY KEY,
value INTEGER NOT NULL
);`,
// enable changefeeds
`SET CLUSTER SETTING kv.rangefeed.enabled = true;`,
// set the latency floor for events
`SET CLUSTER SETTING changefeed.experimental_poll_interval = '0.2s';`,
// remove throttling
`SET CLUSTER SETTING changefeed.node_throttle_config = '{"MessageRage":0,"MessageBurst":0,"ByteRate":0,"ByteBurst":0,"FlushRate":0,"FlushBurst":0}';`,
} {
if _, err := client.Exec(ctx, stmt); err != nil {
logrus.WithError(err).Fatal("error initializing the database")
}
}
var initialClusterTimestamp apd.Decimal
if err := client.QueryRow(ctx, `SELECT cluster_logical_timestamp();`).Scan(&initialClusterTimestamp); err != nil {
logrus.WithError(err).Fatal("failed to read initial cluster logical timestamp")
}
logrus.Infof("Initial cluster timestamp: %s", initialClusterTimestamp.String())
const (
createEvent = iota
updateEvent
deleteEvent
)
numUpdates := 25
var existing []int
for i := 0; i < numUpdates; i++ {
op := rand.Intn(3)
if len(existing) == 0 {
op = createEvent
}
switch op {
case createEvent:
key := i
if _, err := client.Exec(ctx, `INSERT INTO test (key, value) VALUES ($1, $2);`, key, 0); err != nil {
logrus.WithError(err).Fatal("unexpected error while inserting new row")
}
existing = append(existing, key)
case updateEvent:
key := existing[rand.Intn(len(existing))]
if _, err := client.Exec(ctx, `UPDATE test SET value = value + 1 WHERE key=$1;`, key); err != nil {
logrus.WithError(err).Fatal("unexpected error while updating row")
}
case deleteEvent:
idx := rand.Intn(len(existing))
key := existing[idx]
if _, err := client.Exec(ctx, `DELETE FROM test WHERE key=$1;`, key); err != nil {
logrus.WithError(err).Fatal("unexpected error while removing row")
}
existing = append(existing[:idx], existing[idx+1:]...)
default:
logrus.Fatalf("invalid operation %d", op)
}
}
var finalClusterTimestamp apd.Decimal
if err := client.QueryRow(ctx, `SELECT cluster_logical_timestamp();`).Scan(&finalClusterTimestamp); err != nil {
logrus.WithError(err).Fatal("failed to read initial cluster logical timestamp")
}
logrus.Infof("Final cluster timestamp: %s", finalClusterTimestamp.String())
lock := sync.Mutex{}
var idx int
sink := make([][]event, numUpdates+1)
order := map[string]int{}
wg := sync.WaitGroup{}
into := make([]chan event, numUpdates+1)
for i := 0; i < numUpdates+1; i++ {
into[i] = make(chan event)
}
launch(ctx, client, &initialClusterTimestamp, &finalClusterTimestamp, &idx, idx, &wg, into, &sink, order, &lock)
done := make(chan interface{})
go func() {
wg.Wait()
done <- nil
}()
select {
case <-done:
for i := range into {
close(into[i])
}
case <-time.After(10 * time.Second):
logrus.Error("timed out waiting for changefeeds")
}
lock.Lock()
defer lock.Unlock()
// CRDB does not guarantee ordering between rows, just within them
sort.Slice(sink[0], func(x, y int) bool {
return sink[0][x].timestamp.Cmp(sink[0][y].timestamp) < 0
})
reorderedSink := make([][]event, numUpdates+1)
reorderedSink[0] = sink[0]
for i, item := range sink[0] {
idx := order[item.timestamp.String()]
reorderedSink[i+1] = sink[idx]
}
for i := range reorderedSink {
sort.Slice(reorderedSink[i], func(x, y int) bool {
return reorderedSink[i][x].timestamp.Cmp(reorderedSink[i][y].timestamp) < 0
})
}
cursor := func(i int) string {
if i == 0 {
return initialClusterTimestamp.String()
}
return reorderedSink[0][i-1].timestamp.String()
}
for i := range reorderedSink {
for j := range reorderedSink[i] {
if reorderedSink[i][j].timestamp.Cmp(reorderedSink[0][i].timestamp) < 0 {
logrus.Errorf("changefeed %d (cursor=%s) saw an event at %s, before the cursor", i, cursor(i), reorderedSink[i][j].timestamp.String())
}
}
}
formattedSink := make([][]string, len(reorderedSink))
for i := range reorderedSink {
formattedSink[i] = make([]string, len(reorderedSink[i]))
for j := range reorderedSink[i] {
formattedSink[i][j] = reorderedSink[i][j].String()
}
}
reference := formattedSink[0]
for i, updates := range formattedSink {
id := fmt.Sprintf("changefeed %d (cursor=%s) ", i, cursor(i))
if actual, expected := len(updates), numUpdates-i; actual != expected {
logrus.Errorf("%sgot %d events, expected %d", id, actual, expected)
}
if len(updates) == 0 {
continue
}
if i == 0 {
continue
}
startingIndex := -1
for j, item := range reference {
if item == updates[0] {
startingIndex = j
}
}
if startingIndex == -1 {
logrus.Errorf("%sstarted seeing events at timestamp %q, but the reference watcher never saw that version!", id, updates[0])
continue
}
if startingIndex != i {
logrus.Errorf("%sstarted seeing events at index %d, expected %d", id, startingIndex, i)
}
if diff := cmp.Diff(reference[i:], updates); diff != "" {
logrus.Errorf("%sgot incorrect ordering for events: %v", id, diff)
}
}
}
type event struct {
timestamp *apd.Decimal
action string
}
func (e event) String() string {
return fmt.Sprintf("%s@%s", e.action, e.timestamp)
}
func launch(ctx context.Context, client *pgxpool.Pool, start, end *apd.Decimal, idx *int, i int, wg *sync.WaitGroup, into []chan event, sink *[][]event, order map[string]int, lock *sync.Mutex) {
wg.Add(1)
go func() {
defer wg.Done()
changefeed(ctx, start, end, client, into[i])
logrus.Infof("Changefeed %d finished.", i)
}()
go func() {
for evt := range into[i] {
lock.Lock()
(*sink)[i] = append((*sink)[i], evt)
if i == 0 {
*idx++
order[evt.timestamp.String()] = *idx
launch(ctx, client, evt.timestamp, end, idx, *idx, wg, into, sink, order, lock)
}
lock.Unlock()
}
}()
}
func changefeed(ctx context.Context, start, end *apd.Decimal, client *pgxpool.Pool, into chan<- event) {
options := []string{
"updated",
"diff",
"mvcc_timestamp",
fmt.Sprintf("cursor='%s'", start.String()),
"resolved='1s'",
}
query := fmt.Sprintf(`EXPERIMENTAL CHANGEFEED FOR test WITH %s;`, strings.Join(options, ","))
logrus.WithField("sql", query).Info("Exec")
rows, err := client.Query(ctx, query)
if err != nil {
logrus.WithError(err).Fatal("failed to create changefeed")
}
defer func() {
go func() {
rows.Close()
}()
}()
for rows.Next() {
if err := rows.Err(); err != nil {
logrus.WithError(err).Fatal("failed to read changefeed row")
}
values := rows.RawValues()
if len(values) != 3 {
logrus.Fatalf("expected 3 values in changefeed row, got %d", len(values))
}
// values upacks into (tableName, primaryKey, rowData)
data := values[2]
type row struct {
Key int64 `json:"key,omitempty"`
Value int64 `json:"value,omitempty"`
}
type changefeedEvent struct {
Updated *apd.Decimal `json:"updated,omitempty"`
Resolved *apd.Decimal `json:"resolved,omitempty"`
Before *row `json:"before,omitempty"`
After *row `json:"after,omitempty"`
}
var evt changefeedEvent
if err := json.Unmarshal(data, &evt); err != nil {
logrus.WithError(err).Fatal("failed to deserialize changefeed row")
}
if evt.Resolved != nil {
if evt.Resolved.Cmp(end) == 1 {
// we've seen everything we need to see
logrus.WithField("sql", query).Info("Finished.")
return
}
} else if evt.Updated != nil {
var action string
switch {
case evt.Before == nil && evt.After != nil:
action = fmt.Sprintf("INSERT(%d=%d)", evt.After.Key, evt.After.Value)
case evt.Before != nil && evt.After != nil:
action = fmt.Sprintf("UPDATE(%d=%d->%d)", evt.After.Key, evt.Before.Value, evt.After.Value)
case evt.Before != nil && evt.After == nil:
action = fmt.Sprintf("DELETE(%d)", evt.Before.Key)
}
into <- event{
timestamp: evt.Updated,
action: action,
}
}
}
}
@stevekuznetsov
Copy link
Author

stevekuznetsov commented Mar 9, 2022

Output for me:

$ go run ./changefeeds.go 
2022/03/10 07:53:40 GET https://binaries.cockroachdb.com/cockroach-v21.2.6.linux-amd64.tgz
2022/03/10 07:53:41 Using automatically-downloaded binary: /tmp/cockroach-21-2-6
2022/03/10 07:53:41 executing: /tmp/cockroach-21-2-6 start-single-node --logtostderr --insecure --host=localhost --port=0 --http-port=0 --store=type=mem,size=0.20 --listening-url-file=/tmp/cockroach-testserver828206605/listen-url
2022/03/10 07:53:41 process 1742328 started: /tmp/cockroach-21-2-6 start-single-node --logtostderr --insecure --host=localhost --port=0 --http-port=0 --store=type=mem,size=0.20 --listening-url-file=/tmp/cockroach-testserver828206605/listen-url
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Exec                                          args="[]" commandTag="CREATE TABLE" fields.time=1.937139ms sql="CREATE TABLE IF NOT EXISTS test\n\t\t\t(\n\t\t\t\tkey INTEGER PRIMARY KEY,\n\t\t\t\tvalue INTEGER NOT NULL\n\t\t\t);"
INFO[0000] Exec                                          args="[]" commandTag="SET CLUSTER SETTING" fields.time=1.555333ms sql="SET CLUSTER SETTING kv.rangefeed.enabled = true;"
INFO[0000] Exec                                          args="[]" commandTag="SET CLUSTER SETTING" fields.time=1.398166ms sql="SET CLUSTER SETTING changefeed.experimental_poll_interval = '0.2s';"
INFO[0000] Exec                                          args="[]" commandTag="SET CLUSTER SETTING" fields.time=1.329505ms sql="SET CLUSTER SETTING changefeed.node_throttle_config = '{\"MessageRage\":0,\"MessageBurst\":0,\"ByteRate\":0,\"ByteBurst\":0,\"FlushRate\":0,\"FlushBurst\":0}';"
INFO[0000] Query                                         args="[]" fields.time="310.179µs" rowCount=1 sql="SELECT cluster_logical_timestamp();"
INFO[0000] Initial cluster timestamp: 1646927621272103662.0000000000 
INFO[0000] Exec                                          args="[0 0]" commandTag="INSERT 0 1" fields.time=3.0943ms sql="INSERT INTO test (key, value) VALUES ($1, $2);"
INFO[0000] Exec                                          args="[1 0]" commandTag="INSERT 0 1" fields.time="234.574µs" sql="INSERT INTO test (key, value) VALUES ($1, $2);"
INFO[0000] Exec                                          args="[1]" commandTag="DELETE 1" fields.time="434.971µs" sql="DELETE FROM test WHERE key=$1;"
INFO[0000] Exec                                          args="[0]" commandTag="UPDATE 1" fields.time="601.12µs" sql="UPDATE test SET value = value + 1 WHERE key=$1;"
INFO[0000] Exec                                          args="[0]" commandTag="UPDATE 1" fields.time="336.676µs" sql="UPDATE test SET value = value + 1 WHERE key=$1;"
INFO[0000] Exec                                          args="[0]" commandTag="UPDATE 1" fields.time="347.517µs" sql="UPDATE test SET value = value + 1 WHERE key=$1;"
INFO[0000] Exec                                          args="[0]" commandTag="DELETE 1" fields.time="276.256µs" sql="DELETE FROM test WHERE key=$1;"
INFO[0000] Exec                                          args="[7 0]" commandTag="INSERT 0 1" fields.time="234.897µs" sql="INSERT INTO test (key, value) VALUES ($1, $2);"
INFO[0000] Exec                                          args="[7]" commandTag="DELETE 1" fields.time="238.277µs" sql="DELETE FROM test WHERE key=$1;"
INFO[0000] Exec                                          args="[9 0]" commandTag="INSERT 0 1" fields.time="219.908µs" sql="INSERT INTO test (key, value) VALUES ($1, $2);"
INFO[0000] Exec                                          args="[10 0]" commandTag="INSERT 0 1" fields.time="208.23µs" sql="INSERT INTO test (key, value) VALUES ($1, $2);"
INFO[0000] Exec                                          args="[10]" commandTag="DELETE 1" fields.time="275.735µs" sql="DELETE FROM test WHERE key=$1;"
INFO[0000] Exec                                          args="[9]" commandTag="DELETE 1" fields.time="222.776µs" sql="DELETE FROM test WHERE key=$1;"
INFO[0000] Exec                                          args="[13 0]" commandTag="INSERT 0 1" fields.time="220.052µs" sql="INSERT INTO test (key, value) VALUES ($1, $2);"
INFO[0000] Exec                                          args="[13]" commandTag="DELETE 1" fields.time="231.428µs" sql="DELETE FROM test WHERE key=$1;"
INFO[0000] Exec                                          args="[15 0]" commandTag="INSERT 0 1" fields.time="211.511µs" sql="INSERT INTO test (key, value) VALUES ($1, $2);"
INFO[0000] Exec                                          args="[15]" commandTag="UPDATE 1" fields.time="334.657µs" sql="UPDATE test SET value = value + 1 WHERE key=$1;"
INFO[0000] Exec                                          args="[15]" commandTag="UPDATE 1" fields.time="334.41µs" sql="UPDATE test SET value = value + 1 WHERE key=$1;"
INFO[0000] Exec                                          args="[18 0]" commandTag="INSERT 0 1" fields.time="221.572µs" sql="INSERT INTO test (key, value) VALUES ($1, $2);"
INFO[0000] Exec                                          args="[15]" commandTag="UPDATE 1" fields.time="328.686µs" sql="UPDATE test SET value = value + 1 WHERE key=$1;"
INFO[0000] Exec                                          args="[18]" commandTag="DELETE 1" fields.time="222.203µs" sql="DELETE FROM test WHERE key=$1;"
INFO[0000] Exec                                          args="[21 0]" commandTag="INSERT 0 1" fields.time="196.871µs" sql="INSERT INTO test (key, value) VALUES ($1, $2);"
INFO[0000] Exec                                          args="[21]" commandTag="DELETE 1" fields.time="228.737µs" sql="DELETE FROM test WHERE key=$1;"
INFO[0000] Exec                                          args="[15]" commandTag="UPDATE 1" fields.time="339.725µs" sql="UPDATE test SET value = value + 1 WHERE key=$1;"
INFO[0000] Exec                                          args="[24 0]" commandTag="INSERT 0 1" fields.time="197.039µs" sql="INSERT INTO test (key, value) VALUES ($1, $2);"
INFO[0000] Query                                         args="[]" fields.time="120.116µs" rowCount=1 sql="SELECT cluster_logical_timestamp();"
INFO[0000] Final cluster timestamp: 1646927621282435306.0000000000 
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621272103662.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621275125100.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621276286162.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621277105885.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621277476130.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621276746804.0000000000',resolved='1s';"
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621275434703.0000000000',resolved='1s';"
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621275822822.0000000000',resolved='1s';"
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621277983926.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621277743916.0000000000',resolved='1s';"
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621278240833.0000000000',resolved='1s';"
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621278971791.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621278474615.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621278680935.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621279207412.0000000000',resolved='1s';"
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621279898710.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621279675321.0000000000',resolved='1s';"
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621280260445.0000000000',resolved='1s';"
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621279445868.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621281627525.0000000000',resolved='1s';"
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621281423049.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621280850027.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621282224055.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621281872314.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621281193701.0000000000',resolved='1s';"
INFO[0000] Exec                                          sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621280618608.0000000000',resolved='1s';"
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0000] Dialing PostgreSQL server                     host=localhost
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621278971791.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621277476130.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621281193701.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621276746804.0000000000',resolved='1s';"
INFO[0003] Changefeed 22 finished.                      
INFO[0003] Changefeed 3 finished.                       
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621275434703.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621281872314.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621276286162.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621281627525.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621279445868.0000000000',resolved='1s';"
INFO[0003] Changefeed 6 finished.                       
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621278240833.0000000000',resolved='1s';"
INFO[0003] Changefeed 10 finished.                      
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621282224055.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621275822822.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621277105885.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621280618608.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621277743916.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621279898710.0000000000',resolved='1s';"
INFO[0003] Changefeed 21 finished.                      
INFO[0003] Changefeed 8 finished.                       
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621280260445.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621279675321.0000000000',resolved='1s';"
INFO[0003] Changefeed 7 finished.                       
INFO[0003] Changefeed 4 finished.                       
INFO[0003] Changefeed 16 finished.                      
INFO[0003] Changefeed 11 finished.                      
INFO[0003] Changefeed 5 finished.                       
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621281423049.0000000000',resolved='1s';"
INFO[0003] Changefeed 23 finished.                      
INFO[0003] Changefeed 20 finished.                      
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621277983926.0000000000',resolved='1s';"
INFO[0003] Changefeed 9 finished.                       
INFO[0003] Changefeed 2 finished.                       
INFO[0003] Changefeed 24 finished.                      
INFO[0003] Changefeed 15 finished.                      
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621280850027.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621278474615.0000000000',resolved='1s';"
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621275125100.0000000000',resolved='1s';"
INFO[0003] Changefeed 1 finished.                       
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621272103662.0000000000',resolved='1s';"
INFO[0003] Changefeed 25 finished.                      
INFO[0003] Changefeed 17 finished.                      
INFO[0003] Changefeed 18 finished.                      
INFO[0003] Changefeed 19 finished.                      
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621279207412.0000000000',resolved='1s';"
INFO[0003] Changefeed 14 finished.                      
INFO[0003] Changefeed 12 finished.                      
INFO[0003] Finished.                                     sql="EXPERIMENTAL CHANGEFEED FOR test WITH updated,diff,mvcc_timestamp,cursor='1646927621278680935.0000000000',resolved='1s';"
INFO[0003] Changefeed 13 finished.                      
INFO[0003] Changefeed 0 finished.                       
ERRO[0003] changefeed 6 saw an event at 1646927621275822822.0000000000, before the cursor 1646927621277476130.0000000000 
ERRO[0003] changefeed 6 saw an event at 1646927621276286162.0000000000, before the cursor 1646927621277476130.0000000000 
ERRO[0003] changefeed 6 saw an event at 1646927621276746804.0000000000, before the cursor 1646927621277476130.0000000000 
ERRO[0003] changefeed 6 saw an event at 1646927621277105885.0000000000, before the cursor 1646927621277476130.0000000000 
ERRO[0003] changefeed 7 saw an event at 1646927621276286162.0000000000, before the cursor 1646927621277743916.0000000000 
ERRO[0003] changefeed 7 saw an event at 1646927621276746804.0000000000, before the cursor 1646927621277743916.0000000000 
ERRO[0003] changefeed 7 saw an event at 1646927621277105885.0000000000, before the cursor 1646927621277743916.0000000000 
ERRO[0003] changefeed 7 saw an event at 1646927621277476130.0000000000, before the cursor 1646927621277743916.0000000000 
ERRO[0003] changefeed 12 saw an event at 1646927621278680935.0000000000, before the cursor 1646927621278971791.0000000000 
ERRO[0003] changefeed 13 saw an event at 1646927621278971791.0000000000, before the cursor 1646927621279207412.0000000000 
ERRO[0003] changefeed 21 saw an event at 1646927621280850027.0000000000, before the cursor 1646927621281423049.0000000000 
ERRO[0003] changefeed 21 saw an event at 1646927621281193701.0000000000, before the cursor 1646927621281423049.0000000000 
ERRO[0003] changefeed 22 saw an event at 1646927621281423049.0000000000, before the cursor 1646927621281627525.0000000000 
ERRO[0003] changefeed 23 saw an event at 1646927621281627525.0000000000, before the cursor 1646927621281872314.0000000000 
ERRO[0003] changefeed 24 saw an event at 1646927621281872314.0000000000, before the cursor 1646927621282224055.0000000000 
ERRO[0003] changefeed 2 got 21 events, expected 23      
ERRO[0003] changefeed 2 started seeing events at index 4, expected 2 
ERRO[0003] changefeed 2 got incorrect ordering for events:   []string{
- 	"DELETE(1)@1646927621275822822.0000000000",
- 	"UPDATE(0=0->1)@1646927621276286162.0000000000",
  	"UPDATE(0=1->2)@1646927621276746804.0000000000",
  	"UPDATE(0=2->3)@1646927621277105885.0000000000",
  	... // 19 identical elements
  } 
ERRO[0003] changefeed 3 got 20 events, expected 22      
ERRO[0003] changefeed 3 started seeing events at index 5, expected 3 
ERRO[0003] changefeed 3 got incorrect ordering for events:   []string{
- 	"UPDATE(0=0->1)@1646927621276286162.0000000000",
- 	"UPDATE(0=1->2)@1646927621276746804.0000000000",
  	"UPDATE(0=2->3)@1646927621277105885.0000000000",
  	"DELETE(0)@1646927621277476130.0000000000",
  	... // 18 identical elements
  } 
ERRO[0003] changefeed 4 got 19 events, expected 21      
ERRO[0003] changefeed 4 started seeing events at index 6, expected 4 
ERRO[0003] changefeed 4 got incorrect ordering for events:   []string{
- 	"UPDATE(0=1->2)@1646927621276746804.0000000000",
- 	"UPDATE(0=2->3)@1646927621277105885.0000000000",
  	"DELETE(0)@1646927621277476130.0000000000",
  	"INSERT(7=0)@1646927621277743916.0000000000",
  	... // 17 identical elements
  } 
ERRO[0003] changefeed 5 got 18 events, expected 20      
ERRO[0003] changefeed 5 started seeing events at index 7, expected 5 
ERRO[0003] changefeed 5 got incorrect ordering for events:   []string{
- 	"UPDATE(0=2->3)@1646927621277105885.0000000000",
- 	"DELETE(0)@1646927621277476130.0000000000",
  	"INSERT(7=0)@1646927621277743916.0000000000",
  	"DELETE(7)@1646927621277983926.0000000000",
  	... // 16 identical elements
  } 
ERRO[0003] changefeed 6 got 23 events, expected 19      
ERRO[0003] changefeed 6 started seeing events at index 2, expected 6 
ERRO[0003] changefeed 6 got incorrect ordering for events:   []string{
+ 	"DELETE(1)@1646927621275822822.0000000000",
+ 	"UPDATE(0=0->1)@1646927621276286162.0000000000",
+ 	"UPDATE(0=1->2)@1646927621276746804.0000000000",
+ 	"UPDATE(0=2->3)@1646927621277105885.0000000000",
  	"DELETE(0)@1646927621277476130.0000000000",
  	"INSERT(7=0)@1646927621277743916.0000000000",
  	... // 17 identical elements
  } 
ERRO[0003] changefeed 7 got 22 events, expected 18      
ERRO[0003] changefeed 7 started seeing events at index 3, expected 7 
ERRO[0003] changefeed 7 got incorrect ordering for events:   []string{
+ 	"UPDATE(0=0->1)@1646927621276286162.0000000000",
+ 	"UPDATE(0=1->2)@1646927621276746804.0000000000",
+ 	"UPDATE(0=2->3)@1646927621277105885.0000000000",
+ 	"DELETE(0)@1646927621277476130.0000000000",
  	"INSERT(7=0)@1646927621277743916.0000000000",
  	"DELETE(7)@1646927621277983926.0000000000",
  	... // 16 identical elements
  } 
ERRO[0003] changefeed 11 got 12 events, expected 14     
ERRO[0003] changefeed 11 started seeing events at index 13, expected 11 
ERRO[0003] changefeed 11 got incorrect ordering for events:   []string{
- 	"DELETE(10)@1646927621278680935.0000000000",
- 	"DELETE(9)@1646927621278971791.0000000000",
  	"INSERT(13=0)@1646927621279207412.0000000000",
  	"DELETE(13)@1646927621279445868.0000000000",
  	... // 10 identical elements
  } 
ERRO[0003] changefeed 12 got 14 events, expected 13     
ERRO[0003] changefeed 12 started seeing events at index 11, expected 12 
ERRO[0003] changefeed 12 got incorrect ordering for events:   []string{
+ 	"DELETE(10)@1646927621278680935.0000000000",
  	"DELETE(9)@1646927621278971791.0000000000",
  	"INSERT(13=0)@1646927621279207412.0000000000",
  	... // 11 identical elements
  } 
ERRO[0003] changefeed 13 got 13 events, expected 12     
ERRO[0003] changefeed 13 started seeing events at index 12, expected 13 
ERRO[0003] changefeed 13 got incorrect ordering for events:   []string{
+ 	"DELETE(9)@1646927621278971791.0000000000",
  	"INSERT(13=0)@1646927621279207412.0000000000",
  	"DELETE(13)@1646927621279445868.0000000000",
  	... // 10 identical elements
  } 
ERRO[0003] changefeed 19 got 5 events, expected 6       
ERRO[0003] changefeed 19 started seeing events at index 20, expected 19 
ERRO[0003] changefeed 19 got incorrect ordering for events:   []string{
- 	"UPDATE(15=2->3)@1646927621280850027.0000000000",
  	"DELETE(18)@1646927621281193701.0000000000",
  	"INSERT(21=0)@1646927621281423049.0000000000",
  	... // 3 identical elements
  } 
ERRO[0003] changefeed 20 got 1 events, expected 5       
ERRO[0003] changefeed 20 started seeing events at index 24, expected 20 
ERRO[0003] changefeed 20 got incorrect ordering for events:   []string{
- 	"DELETE(18)@1646927621281193701.0000000000",
- 	"INSERT(21=0)@1646927621281423049.0000000000",
- 	"DELETE(21)@1646927621281627525.0000000000",
- 	"UPDATE(15=3->4)@1646927621281872314.0000000000",
  	"INSERT(24=0)@1646927621282224055.0000000000",
  } 
ERRO[0003] changefeed 21 got 6 events, expected 4       
ERRO[0003] changefeed 21 started seeing events at index 19, expected 21 
ERRO[0003] changefeed 21 got incorrect ordering for events:   []string{
+ 	"UPDATE(15=2->3)@1646927621280850027.0000000000",
+ 	"DELETE(18)@1646927621281193701.0000000000",
  	"INSERT(21=0)@1646927621281423049.0000000000",
  	"DELETE(21)@1646927621281627525.0000000000",
  	... // 2 identical elements
  } 
ERRO[0003] changefeed 22 got 4 events, expected 3       
ERRO[0003] changefeed 22 started seeing events at index 21, expected 22 
ERRO[0003] changefeed 22 got incorrect ordering for events:   []string{
+ 	"INSERT(21=0)@1646927621281423049.0000000000",
  	"DELETE(21)@1646927621281627525.0000000000",
  	"UPDATE(15=3->4)@1646927621281872314.0000000000",
  	"INSERT(24=0)@1646927621282224055.0000000000",
  } 
ERRO[0003] changefeed 23 got 3 events, expected 2       
ERRO[0003] changefeed 23 started seeing events at index 22, expected 23 
ERRO[0003] changefeed 23 got incorrect ordering for events:   []string{
+ 	"DELETE(21)@1646927621281627525.0000000000",
  	"UPDATE(15=3->4)@1646927621281872314.0000000000",
  	"INSERT(24=0)@1646927621282224055.0000000000",
  } 
ERRO[0003] changefeed 24 got 2 events, expected 1       
ERRO[0003] changefeed 24 started seeing events at index 23, expected 24 
ERRO[0003] changefeed 24 got incorrect ordering for events:   []string{
+ 	"UPDATE(15=3->4)@1646927621281872314.0000000000",
  	"INSERT(24=0)@1646927621282224055.0000000000",
  } 

@stevekuznetsov
Copy link
Author

With the latest update, this passes fine now :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment