Skip to content

Instantly share code, notes, and snippets.

@PatrickGotthard
Last active January 18, 2023 11:36
Show Gist options
  • Save PatrickGotthard/818a33246b7dc7478bde183e1fe53328 to your computer and use it in GitHub Desktop.
Save PatrickGotthard/818a33246b7dc7478bde183e1fe53328 to your computer and use it in GitHub Desktop.
Debezium Error Handling
package debezium;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Component;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.storage.file.history.FileSchemaHistory;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
@SpringBootApplication
public class Application {
public static void main(final String[] args) {
SpringApplication.run(Application.class, args);
}
}
@Component
class ChangeListener {
private final DebeziumEngine<ChangeEvent<String, String>> engine;
private final Executor executor;
public ChangeListener() {
// @formatter:off
final Properties config = Configuration.create()
.with(EmbeddedEngine.ENGINE_NAME, "debezium")
.with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class.getName())
.with(EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME, "debezium/offset.dat")
.with(EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, Duration.ofSeconds(60).toMillis())
.with(CommonConnectorConfig.TOPIC_PREFIX, "debezium")
.with(MySqlConnectorConfig.SCHEMA_HISTORY, FileSchemaHistory.class.getName())
.with(FileSchemaHistory.FILE_PATH, "debezium/schema.dat")
.with(EmbeddedEngine.CONNECTOR_CLASS, MySqlConnector.class.getName())
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER)
.with(MySqlConnectorConfig.SERVER_ID, 98765)
.with(MySqlConnectorConfig.HOSTNAME, "localhost")
.with(MySqlConnectorConfig.PORT, 3306)
.with(MySqlConnectorConfig.USER, "root")
.with(MySqlConnectorConfig.PASSWORD, "root")
.build()
.asProperties();
// @formatter:on
this.engine = DebeziumEngine.create(Json.class).using(config).notifying(this::process).build();
this.executor = Executors.newSingleThreadExecutor();
}
private void process(final ChangeEvent<String, String> event) {
// throws an error when deleting rows
System.out.println(event.value().toString());
}
@PostConstruct
public void start() {
this.executor.execute(this.engine);
}
@PreDestroy
public void stop() throws IOException {
if (this.engine != null) {
this.engine.close();
}
}
}
services:
mariadb:
container_name: mariadb
image: mariadb:latest
command: --log-bin=/var/log/mysql/mysql-bin.log --binlog-format=ROW
environment:
MYSQL_ROOT_PASSWORD: root
volumes:
- lib:/var/lib/mysql
- log:/var/log/mysql
ports:
- 3306:3306
volumes:
lib:
log:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.1</version>
</parent>
<groupId>debezium</groupId>
<artifactId>debezium</artifactId>
<version>0.0.0-SNAPSHOT</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Debezium -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>2.1.1.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>2.1.1.Final</version>
</dependency>
</dependencies>
</project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment