Last active
February 9, 2021 00:20
-
-
Save PeterCorless/7e86210bae8231e2ecfa059245e97191 to your computer and use it in GitHub Desktop.
Reading CDC with Java and Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try (Cluster cluster = Cluster.builder().addContactPoint(source).build(); | |
Session session = cluster.connect()) { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Create the set of tables we want to watch. | |
Set<TableName> tables = Collections.singleton(new TableName(keyspace, table)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
RawChangeConsumer changeConsumer = change -> { | |
// Print the change. | |
printChange(change); | |
return CompletableFuture.completedFuture(null); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Build a shared consumer of changes. | |
RawChangeConsumer sharedChangeConsumer = change -> { | |
// Print the change. | |
printChange(change); | |
return CompletableFuture.completedFuture(null); | |
}; | |
// Build a provider of this shared consumer. | |
RawChangeConsumerProvider changeConsumerProvider = threadId -> { | |
return sharedChangeConsumer; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Build a provider of consumers. | |
RawChangeConsumerProvider changeConsumerProvider = threadId -> { | |
// Build a consumer of changes. | |
RawChangeConsumer changeConsumer = change -> { | |
// Print the change. | |
printChange(change); | |
return CompletableFuture.completedFuture(null); | |
}; | |
return changeConsumer; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Build a CDCConsumer instance. We start it in a single-thread | |
// configuration: workersCount(1). | |
CDCConsumer consumer = CDCConsumerBuilder.builder(session, changeConsumerProvider, tables).workersCount(1).build(); | |
consumer.start(); | |
// Consume changes for 10 seconds. | |
Thread.sleep(10000); | |
consumer.stop(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static void printChange(RawChange change) { | |
ChangeId changeId = change.getId(); | |
StreamId streamId = changeId.getStreamId(); | |
ChangeTime changeTime = changeId.getChangeTime(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Get the operation type, for example: ROW_UPDATE, POST_IMAGE. | |
RawChange.OperationType operationType = change.getOperationType(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ChangeSchema changeSchema = change.getSchema(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
List<ChangeSchema.ColumnDefinition> nonCdcColumnDefinitions = changeSchema.getNonCdcColumnDefinitions(); | |
for (ChangeSchema.ColumnDefinition columnDefinition : nonCdcColumnDefinitions) { | |
String columnName = columnDefinition.getColumnName(); | |
// We can get information if this column was a part of primary key | |
// in the base table. Note that in CDC log table different columns | |
// are part of a primary key (cdc$stream_id, cdc$time, batch_seq_no). | |
ChangeSchema.ColumnType baseTableColumnType = columnDefinition.getBaseTableColumnType(); | |
// Get the information about the data type (as present in CDC log). | |
ChangeSchema.DataType logDataType = columnDefinition.getCdcLogDataType(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Cell cell = change.getCell(columnName); | |
Object cellValue = cell.getAsObject(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Integer intCellValue = cell.getInt(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git clone https://github.com/scylladb/scylla-cdc-java.git | |
cd scylla-cdc-java | |
mvn clean install | |
cd scylla-cdc-printer | |
./scylla-cdc-printer -k KEYSPACE -t TABLE -s SOURCE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
go get https://github.com/scylladb/scylla-cdc-go |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cluster := gocql.NewCluster(source) | |
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc")) | |
session, err := cluster.CreateSession() | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer session.Close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cfg := &scyllacdc.ReaderConfig{ | |
Session: session, | |
ChangeConsumerFactory: changeConsumerFactory, | |
TableNames: []string{keyspace + "." + table}, | |
Logger: log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile), | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func printerConsumer(ctx context.Context, tableName string, c scyllacdc.Change) error { | |
// Your logic goes here | |
return nil | |
} | |
var changeConsumerFactory = scyllacdc.MakeChangeConsumerFactoryFromFunc(printerConsumer) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type myConsumer struct { /* ... */ } | |
func (mc *myConsumer) Consume(ctx context.Context, change scyllacdc.Change) error { | |
// Your logic goes here | |
return nil | |
} | |
func (mc *myConsumer) End() error { | |
return nil | |
} | |
type myFactory struct { /* ... */ } | |
func (f *myFactory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (ChangeConsumer, error) | |
return &myConsumer{ | |
/* ... */ | |
}, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
reader, err := scyllacdc.NewReader(context.Background(), cfg) | |
if err != nil { | |
log.Fatal(err) | |
} | |
// Stop the consumer after 10 seconds | |
time.AfterFunc(10*time.Second, func() { | |
reader.Stop() | |
}) | |
if err := reader.Run(context.Background()); err != nil { | |
log.Fatal(err) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fmt.Printf("[%s %s]:\n", c.StreamID, c.Time.String()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for _, ci := range c.Columns() { | |
if !strings.HasPrefix(ci.Name, "cdc$") { | |
fmt.Println(ci.Name) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pkRaw, _ := changeRow.GetValue("pk") | |
ckRaw, _ := changeRow.GetValue("ck") | |
v := changeRow.GetAtomicChange("v") | |
pk := pkRaw.(*int) | |
ck := ckRaw.(*int) | |
fmt.Printf("Operation: %s, pk: %s, ck: %s\n", changeRow.GetOperation(), | |
nullableIntToStr(pk), nullableIntToStr(ck)) | |
if v.IsDeleted { | |
fmt.Printf(" Column v was set to null/deleted\n") | |
} else { | |
vInt := v.Value.(*int) | |
if vInt != nil { | |
fmt.Printf(" Column v was set to %d\n", *vInt) | |
} else { | |
fmt.Print(" Column v was not changed\n") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for _, r := range c.PreImage { | |
fmt.Printf(" %s\n", r) | |
} | |
for _, r := range c.Delta { | |
fmt.Printf(" %s\n", r) | |
} | |
for _, r := range c.PostImage { | |
fmt.Printf(" %s\n", r) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git clone https://github.com/scylladb/scylla-cdc-go | |
cd scylla-cdc-go | |
go run ./examples/printer/ -keyspace KEYSPACE -table TABLE -source SOURCE |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment