Skip to content

Instantly share code, notes, and snippets.

@stevekuznetsov
Last active March 10, 2022 22:37
Show Gist options
  • Save stevekuznetsov/e17d574cade6ce6ea7f914885c432b68 to your computer and use it in GitHub Desktop.
Save stevekuznetsov/e17d574cade6ce6ea7f914885c432b68 to your computer and use it in GitHub Desktop.
Investigating CockroachDB Changefeed Behavior
package main
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"time"
"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach-go/v2/testserver"
"github.com/google/go-cmp/cmp"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/log/logrusadapter"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/sirupsen/logrus"
)
func main() {
ts, err := testserver.NewTestServer()
if err != nil {
logrus.WithError(err).Fatal("failed to start crdb")
}
defer func() {
ts.Stop()
}()
ctx := context.Background()
cfg, err := pgxpool.ParseConfig(ts.PGURL().String())
if err != nil {
logrus.WithError(err).Fatal("failed to parse test connection")
}
cfg.ConnConfig.LogLevel = pgx.LogLevelTrace
cfg.ConnConfig.Logger = logrusadapter.NewLogger(logrus.New())
cfg.MaxConns = 128
client, err := pgxpool.ConnectConfig(ctx, cfg)
if err != nil {
logrus.WithError(err).Fatal("failed to connect to crdb")
}
for _, stmt := range []string{
`CREATE TABLE IF NOT EXISTS test
(
key INTEGER PRIMARY KEY,
value INTEGER NOT NULL
);`,
// enable changefeeds
`SET CLUSTER SETTING kv.rangefeed.enabled = true;`,
// set the latency floor for events
`SET CLUSTER SETTING changefeed.experimental_poll_interval = '0.2s';`,
// remove throttling
`SET CLUSTER SETTING changefeed.node_throttle_config = '{"MessageRage":0,"MessageBurst":0,"ByteRate":0,"ByteBurst":0,"FlushRate":0,"FlushBurst":0}';`,
} {
if _, err := client.Exec(ctx, stmt); err != nil {
logrus.WithError(err).Fatal("error initializing the database")
}
}
var initialClusterTimestamp apd.Decimal
if err := client.QueryRow(ctx, `SELECT cluster_logical_timestamp();`).Scan(&initialClusterTimestamp); err != nil {
logrus.WithError(err).Fatal("failed to read initial cluster logical timestamp")
}
logrus.Infof("Initial cluster timestamp: %s", initialClusterTimestamp.String())
const (
createEvent = iota
updateEvent
deleteEvent
)
numUpdates := 25
var existing []int
for i := 0; i < numUpdates; i++ {
op := rand.Intn(3)
if len(existing) == 0 {
op = createEvent
}
switch op {
case createEvent:
key := i
if _, err := client.Exec(ctx, `INSERT INTO test (key, value) VALUES ($1, $2);`, key, 0); err != nil {
logrus.WithError(err).Fatal("unexpected error while inserting new row")
}
existing = append(existing, key)
case updateEvent:
key := existing[rand.Intn(len(existing))]
if _, err := client.Exec(ctx, `UPDATE test SET value = value + 1 WHERE key=$1;`, key); err != nil {
logrus.WithError(err).Fatal("unexpected error while updating row")
}
case deleteEvent:
idx := rand.Intn(len(existing))
key := existing[idx]
if _, err := client.Exec(ctx, `DELETE FROM test WHERE key=$1;`, key); err != nil {
logrus.WithError(err).Fatal("unexpected error while removing row")
}
existing = append(existing[:idx], existing[idx+1:]...)
default:
logrus.Fatalf("invalid operation %d", op)
}
}
var finalClusterTimestamp apd.Decimal
if err := client.QueryRow(ctx, `SELECT cluster_logical_timestamp();`).Scan(&finalClusterTimestamp); err != nil {
logrus.WithError(err).Fatal("failed to read initial cluster logical timestamp")
}
logrus.Infof("Final cluster timestamp: %s", finalClusterTimestamp.String())
lock := sync.Mutex{}
var idx int
sink := make([][]event, numUpdates+1)
order := map[string]int{}
wg := sync.WaitGroup{}
into := make([]chan event, numUpdates+1)
for i := 0; i < numUpdates+1; i++ {
into[i] = make(chan event)
}
launch(ctx, client, &initialClusterTimestamp, &finalClusterTimestamp, &idx, idx, &wg, into, &sink, order, &lock)
done := make(chan interface{})
go func() {
wg.Wait()
done <- nil
}()
select {
case <-done:
for i := range into {
close(into[i])
}
case <-time.After(10 * time.Second):
logrus.Error("timed out waiting for changefeeds")
}
lock.Lock()
defer lock.Unlock()
// CRDB does not guarantee ordering between rows, just within them
sort.Slice(sink[0], func(x, y int) bool {
return sink[0][x].timestamp.Cmp(sink[0][y].timestamp) < 0
})
reorderedSink := make([][]event, numUpdates+1)
reorderedSink[0] = sink[0]
for i, item := range sink[0] {
idx := order[item.timestamp.String()]
reorderedSink[i+1] = sink[idx]
}
for i := range reorderedSink {
sort.Slice(reorderedSink[i], func(x, y int) bool {
return reorderedSink[i][x].timestamp.Cmp(reorderedSink[i][y].timestamp) < 0
})
}
cursor := func(i int) string {
if i == 0 {
return initialClusterTimestamp.String()
}
return reorderedSink[0][i-1].timestamp.String()
}
for i := range reorderedSink {
for j := range reorderedSink[i] {
if reorderedSink[i][j].timestamp.Cmp(reorderedSink[0][i].timestamp) < 0 {
logrus.Errorf("changefeed %d (cursor=%s) saw an event at %s, before the cursor", i, cursor(i), reorderedSink[i][j].timestamp.String())
}
}
}
formattedSink := make([][]string, len(reorderedSink))
for i := range reorderedSink {
formattedSink[i] = make([]string, len(reorderedSink[i]))
for j := range reorderedSink[i] {
formattedSink[i][j] = reorderedSink[i][j].String()
}
}
reference := formattedSink[0]
for i, updates := range formattedSink {
id := fmt.Sprintf("changefeed %d (cursor=%s) ", i, cursor(i))
if actual, expected := len(updates), numUpdates-i; actual != expected {
logrus.Errorf("%sgot %d events, expected %d", id, actual, expected)
}
if len(updates) == 0 {
continue
}
if i == 0 {
continue
}
startingIndex := -1
for j, item := range reference {
if item == updates[0] {
startingIndex = j
}
}
if startingIndex == -1 {
logrus.Errorf("%sstarted seeing events at timestamp %q, but the reference watcher never saw that version!", id, updates[0])
continue
}
if startingIndex != i {
logrus.Errorf("%sstarted seeing events at index %d, expected %d", id, startingIndex, i)
}
if diff := cmp.Diff(reference[i:], updates); diff != "" {
logrus.Errorf("%sgot incorrect ordering for events: %v", id, diff)
}
}
}
type event struct {
timestamp *apd.Decimal
action string
}
func (e event) String() string {
return fmt.Sprintf("%s@%s", e.action, e.timestamp)
}
func launch(ctx context.Context, client *pgxpool.Pool, start, end *apd.Decimal, idx *int, i int, wg *sync.WaitGroup, into []chan event, sink *[][]event, order map[string]int, lock *sync.Mutex) {
wg.Add(1)
go func() {
defer wg.Done()
changefeed(ctx, start, end, client, into[i])
logrus.Infof("Changefeed %d finished.", i)
}()
go func() {
for evt := range into[i] {
lock.Lock()
(*sink)[i] = append((*sink)[i], evt)
if i == 0 {
*idx++
order[evt.timestamp.String()] = *idx
launch(ctx, client, evt.timestamp, end, idx, *idx, wg, into, sink, order, lock)
}
lock.Unlock()
}
}()
}
func changefeed(ctx context.Context, start, end *apd.Decimal, client *pgxpool.Pool, into chan<- event) {
options := []string{
"updated",
"diff",
"mvcc_timestamp",
fmt.Sprintf("cursor='%s'", start.String()),
"resolved='1s'",
}
query := fmt.Sprintf(`EXPERIMENTAL CHANGEFEED FOR test WITH %s;`, strings.Join(options, ","))
logrus.WithField("sql", query).Info("Exec")
rows, err := client.Query(ctx, query)
if err != nil {
logrus.WithError(err).Fatal("failed to create changefeed")
}
defer func() {
go func() {
rows.Close()
}()
}()
for rows.Next() {
if err := rows.Err(); err != nil {
logrus.WithError(err).Fatal("failed to read changefeed row")
}
values := rows.RawValues()
if len(values) != 3 {
logrus.Fatalf("expected 3 values in changefeed row, got %d", len(values))
}
// values upacks into (tableName, primaryKey, rowData)
data := values[2]
type row struct {
Key int64 `json:"key,omitempty"`
Value int64 `json:"value,omitempty"`
}
type changefeedEvent struct {
Updated *apd.Decimal `json:"updated,omitempty"`
Resolved *apd.Decimal `json:"resolved,omitempty"`
Before *row `json:"before,omitempty"`
After *row `json:"after,omitempty"`
}
var evt changefeedEvent
if err := json.Unmarshal(data, &evt); err != nil {
logrus.WithError(err).Fatal("failed to deserialize changefeed row")
}
if evt.Resolved != nil {
if evt.Resolved.Cmp(end) == 1 {
// we've seen everything we need to see
logrus.WithField("sql", query).Info("Finished.")
return
}
} else if evt.Updated != nil {
var action string
switch {
case evt.Before == nil && evt.After != nil:
action = fmt.Sprintf("INSERT(%d=%d)", evt.After.Key, evt.After.Value)
case evt.Before != nil && evt.After != nil:
action = fmt.Sprintf("UPDATE(%d=%d->%d)", evt.After.Key, evt.Before.Value, evt.After.Value)
case evt.Before != nil && evt.After == nil:
action = fmt.Sprintf("DELETE(%d)", evt.Before.Key)
}
into <- event{
timestamp: evt.Updated,
action: action,
}
}
}
}
@stevekuznetsov
Copy link
Author

With the latest update, this passes fine now :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment