Skip to content

Instantly share code, notes, and snippets.

@bartosz25
Created May 28, 2019 12:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bartosz25/4c6060af18f254f7116ed9f31ff8964a to your computer and use it in GitHub Desktop.
Save bartosz25/4c6060af18f254f7116ed9f31ff8964a to your computer and use it in GitHub Desktop.
Cassandra CDC idempotent consumer
package com.waitingforcode.cassandra_cdc;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class CdcCommitLogReadHandler implements CommitLogReadHandler {
public boolean shouldSkipSegmentOnError(CommitLogReadException e) throws IOException {
System.out.println("Should skip segment error ?");
e.printStackTrace();
return false;
}
public void handleUnrecoverableError(CommitLogReadException e) throws IOException {
e.printStackTrace();
}
public void handleMutation(Mutation mutation, int size, int entryLocation, CommitLogDescriptor commitLogDescriptor) {
for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates()) {
if (partitionUpdate.metadata().ksName.equals("cdc_test")) {
String rowKey = partitionUpdate.metadata().getKeyValidator().getString(partitionUpdate.partitionKey().getKey());
List<String> values = new ArrayList<>();
values.add(partitionUpdate.metadata().partitionKeyColumns().toString()+"="+rowKey);
partitionUpdate.unfilteredIterator().forEachRemaining(partitionRow -> {
Row row = partitionUpdate.getRow((Clustering) partitionRow.clustering());
Iterator<Cell> cells = row.cells().iterator();
Iterator<ColumnDefinition> columns = row.columns().iterator();
while (cells.hasNext() && columns.hasNext()) {
ColumnDefinition definition = columns.next();
Cell cell = cells.next();
values.add(definition.name.toString()+"="+definition.type.getString(cell.value()));
}
System.out.println("Got new row="+values);
});
}
}
}
}

Cluster setup:

ccm remove cdc_cluster
ccm create cdc_cluster -v 3.11.3
ccm populate -n 1
ccm node1 start

Init data

ccm node1 cqlsh
 CREATE KEYSPACE cdc_test  WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1};
 USE cdc_test;
 CREATE TABLE orders (id int, amount double, first_order boolean, PRIMARY KEY(id)) WITH cdc=true;
 INSERT INTO  orders (id, amount, first_order) VALUES (1, 100, true) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (2, 100, false) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (3, 100, true) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (4, 100, false) IF NOT EXISTS;  
 INSERT INTO  orders (id, amount, first_order) VALUES (5, 100, true) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (6, 100, false) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (7, 100, true) IF NOT EXISTS;
 INSERT INTO orders (id, amount, first_order) VALUES (8, 100, false) IF NOT EXISTS;  
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.waitingforcode</groupId>
<artifactId>cassandra-cdc</artifactId>
<version>1.0-SNAPSHOT</version>
<project>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
</project>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>3.11.3</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
</project>
// Inspired from https://github.com/smartcat-labs/cassandra-kafka-connector/blob/master/cassandra-cdc/src/main/java/io/smartcat/cassandra/cdc/Reader.java
public class Reader {
private final WatchService watcher;
private final Path dir;
private final WatchKey key;
private final CommitLogReader commitLogReader;
private final CdcCommitLogReadHandler commitLogReadHandler;
public Reader() throws IOException {
this.dir = Paths.get("/home/bartosz/.ccm/cdc_cluster/node1/cdc_raw");
watcher = FileSystems.getDefault().newWatchService();
key = dir.register(watcher, ENTRY_CREATE, ENTRY_MODIFY);
commitLogReader = new CommitLogReader();
commitLogReadHandler = new CdcCommitLogReadHandler();
System.setProperty("cassandra.config", "file:///home/bartosz/.ccm/cdc_cluster/node1/conf/cassandra.yaml");
DatabaseDescriptor.toolInitialization();
Schema.instance.loadFromDisk(false);
}
public void processEvents() throws InterruptedException, IOException {
System.out.println("Processing events");
while (true) {
WatchKey aKey = watcher.take();
if (!key.equals(aKey)) {
System.out.println("WatchKey not recognized.");
continue;
}
for (WatchEvent<?> event : key.pollEvents()) {
WatchEvent.Kind<?> kind = event.kind();
if (kind != ENTRY_CREATE) {
continue;
}
// Context for directory entry event is the file name of entry
WatchEvent<Path> ev = (WatchEvent<Path>) event;
Path relativePath = ev.context();
Path absolutePath = dir.resolve(relativePath);
processCommitLogSegment(absolutePath);
//Files.delete(absolutePath);
// print out event
System.out.println(event.kind().name() + " for " + absolutePath);
}
key.reset();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
new Reader().processEvents();
}
private void processCommitLogSegment(Path path) throws IOException {
System.out.println("Processing commitlog segment..."+path.getFileName().toAbsolutePath());
commitLogReader.readCommitLogSegment(commitLogReadHandler, path.toFile(), false);
System.out.println("Commitlog segment processed.");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment