Skip to content

Instantly share code, notes, and snippets.

@PeterCorless
Last active February 9, 2021 00:20
Show Gist options
  • Save PeterCorless/7e86210bae8231e2ecfa059245e97191 to your computer and use it in GitHub Desktop.
Save PeterCorless/7e86210bae8231e2ecfa059245e97191 to your computer and use it in GitHub Desktop.
Reading CDC with Java and Go
try (Cluster cluster = Cluster.builder().addContactPoint(source).build();
Session session = cluster.connect()) {
// Create the set of tables we want to watch.
Set<TableName> tables = Collections.singleton(new TableName(keyspace, table));
RawChangeConsumer changeConsumer = change -> {
// Print the change.
printChange(change);
return CompletableFuture.completedFuture(null);
};
// Build a shared consumer of changes.
RawChangeConsumer sharedChangeConsumer = change -> {
// Print the change.
printChange(change);
return CompletableFuture.completedFuture(null);
};
// Build a provider of this shared consumer.
RawChangeConsumerProvider changeConsumerProvider = threadId -> {
return sharedChangeConsumer;
};
// Build a provider of consumers.
RawChangeConsumerProvider changeConsumerProvider = threadId -> {
// Build a consumer of changes.
RawChangeConsumer changeConsumer = change -> {
// Print the change.
printChange(change);
return CompletableFuture.completedFuture(null);
};
return changeConsumer;
};
// Build a CDCConsumer instance. We start it in a single-thread
// configuration: workersCount(1).
CDCConsumer consumer = CDCConsumerBuilder.builder(session, changeConsumerProvider, tables).workersCount(1).build();
consumer.start();
// Consume changes for 10 seconds.
Thread.sleep(10000);
consumer.stop();
private static void printChange(RawChange change) {
ChangeId changeId = change.getId();
StreamId streamId = changeId.getStreamId();
ChangeTime changeTime = changeId.getChangeTime();
// Get the operation type, for example: ROW_UPDATE, POST_IMAGE.
RawChange.OperationType operationType = change.getOperationType();
ChangeSchema changeSchema = change.getSchema();
List<ChangeSchema.ColumnDefinition> nonCdcColumnDefinitions = changeSchema.getNonCdcColumnDefinitions();
for (ChangeSchema.ColumnDefinition columnDefinition : nonCdcColumnDefinitions) {
String columnName = columnDefinition.getColumnName();
// We can get information if this column was a part of primary key
// in the base table. Note that in CDC log table different columns
// are part of a primary key (cdc$stream_id, cdc$time, batch_seq_no).
ChangeSchema.ColumnType baseTableColumnType = columnDefinition.getBaseTableColumnType();
// Get the information about the data type (as present in CDC log).
ChangeSchema.DataType logDataType = columnDefinition.getCdcLogDataType();
Cell cell = change.getCell(columnName);
Object cellValue = cell.getAsObject();
Integer intCellValue = cell.getInt();
git clone https://github.com/scylladb/scylla-cdc-java.git
cd scylla-cdc-java
mvn clean install
cd scylla-cdc-printer
./scylla-cdc-printer -k KEYSPACE -t TABLE -s SOURCE
go get https://github.com/scylladb/scylla-cdc-go
cluster := gocql.NewCluster(source)
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc"))
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
}
defer session.Close()
cfg := &scyllacdc.ReaderConfig{
Session: session,
ChangeConsumerFactory: changeConsumerFactory,
TableNames: []string{keyspace + "." + table},
Logger: log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile),
}
func printerConsumer(ctx context.Context, tableName string, c scyllacdc.Change) error {
// Your logic goes here
return nil
}
var changeConsumerFactory = scyllacdc.MakeChangeConsumerFactoryFromFunc(printerConsumer)
type myConsumer struct { /* ... */ }
func (mc *myConsumer) Consume(ctx context.Context, change scyllacdc.Change) error {
// Your logic goes here
return nil
}
func (mc *myConsumer) End() error {
return nil
}
type myFactory struct { /* ... */ }
func (f *myFactory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (ChangeConsumer, error)
return &myConsumer{
/* ... */
}, nil
}
reader, err := scyllacdc.NewReader(context.Background(), cfg)
if err != nil {
log.Fatal(err)
}
// Stop the consumer after 10 seconds
time.AfterFunc(10*time.Second, func() {
reader.Stop()
})
if err := reader.Run(context.Background()); err != nil {
log.Fatal(err)
}
fmt.Printf("[%s %s]:\n", c.StreamID, c.Time.String())
for _, ci := range c.Columns() {
if !strings.HasPrefix(ci.Name, "cdc$") {
fmt.Println(ci.Name)
}
}
pkRaw, _ := changeRow.GetValue("pk")
ckRaw, _ := changeRow.GetValue("ck")
v := changeRow.GetAtomicChange("v")
pk := pkRaw.(*int)
ck := ckRaw.(*int)
fmt.Printf("Operation: %s, pk: %s, ck: %s\n", changeRow.GetOperation(),
nullableIntToStr(pk), nullableIntToStr(ck))
if v.IsDeleted {
fmt.Printf(" Column v was set to null/deleted\n")
} else {
vInt := v.Value.(*int)
if vInt != nil {
fmt.Printf(" Column v was set to %d\n", *vInt)
} else {
fmt.Print(" Column v was not changed\n")
}
}
for _, r := range c.PreImage {
fmt.Printf(" %s\n", r)
}
for _, r := range c.Delta {
fmt.Printf(" %s\n", r)
}
for _, r := range c.PostImage {
fmt.Printf(" %s\n", r)
}
git clone https://github.com/scylladb/scylla-cdc-go
cd scylla-cdc-go
go run ./examples/printer/ -keyspace KEYSPACE -table TABLE -source SOURCE
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment