Skip to content

Instantly share code, notes, and snippets.

@jakzal
Created January 9, 2023 13:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jakzal/8ae335148e0686ca6875e3de7f2efd90 to your computer and use it in GitHub Desktop.
Save jakzal/8ae335148e0686ca6875e3de7f2efd90 to your computer and use it in GitHub Desktop.
Learn how to run Debezium with PostgreSQL and Kafka with Junit 5
package com.kaffeinelabs.debezium;
import io.debezium.testing.testcontainers.DebeziumContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;
import java.util.stream.Stream;
public class DebeziumContainers implements Startable {
private static final Network network = Network.newNetwork();
public static final PostgreSQLContainer<?> postgres =
new PostgreSQLContainer<>(DockerImageName.parse("debezium/postgres:11").asCompatibleSubstituteFor("postgres"))
.withNetwork(network)
.withNetworkAliases("postgres");
public static final KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka"))
.withNetwork(network);
public static final DebeziumContainer debezium =
new DebeziumContainer("debezium/connect:2.1.1.Final")
.withNetwork(network)
.withKafka(kafka)
.dependsOn(kafka);
@Override
public void start() {
Stream.of(postgres, kafka).parallel().forEach(GenericContainer::start);
debezium.start();
}
@Override
public void stop() {
debezium.stop();
Stream.of(postgres, kafka).parallel().forEach(GenericContainer::stop);
}
}
package com.kaffeinelabs.debezium;
import com.jayway.jsonpath.JsonPath;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
// @see https://debezium.io/documentation/reference/stable/integrations/testcontainers.html
@Testcontainers
public class LearnDebeziumTest {
@Container
private static final DebeziumContainers debeziumContainers = new DebeziumContainers();
@Test
public void canRegisterPostgreSqlConnector() throws Exception {
try (Connection connection = getConnection();
Statement statement = connection.createStatement();
KafkaConsumer<String, String> consumer = getConsumer(
)) {
statement.execute("create schema todo");
statement.execute("create table todo.Todo (id int8 not null, title varchar(255), primary key (id))");
statement.execute("alter table todo.Todo replica identity full");
statement.execute("insert into todo.Todo values (1, 'Learn CDC')");
statement.execute("insert into todo.Todo values (2, 'Learn Debezium')");
ConnectorConfiguration connector = ConnectorConfiguration
.forJdbcContainer(debeziumContainers.postgres)
.with("topic.prefix", "dbserver1");
debeziumContainers.debezium.registerConnector("my-connector", connector);
consumer.subscribe(List.of("dbserver1.todo.todo"));
List<ConsumerRecord<String, String>> changeEvents = drain(consumer, 2);
assertEquals(1, JsonPath.<Integer>read(changeEvents.get(0).key(), "$.id"));
assertEquals("r", JsonPath.<String>read(changeEvents.get(0).value(), "$.op"));
assertEquals("Learn CDC", JsonPath.<String>read(changeEvents.get(0).value(), "$.after.title"));
assertEquals(2, JsonPath.<Integer>read(changeEvents.get(1).key(), "$.id"));
assertEquals("r", JsonPath.<String>read(changeEvents.get(1).value(), "$.op"));
assertEquals("Learn Debezium", JsonPath.<String>read(changeEvents.get(1).value(), "$.after.title"));
consumer.unsubscribe();
}
}
private Connection getConnection() throws SQLException {
return DriverManager.getConnection(debeziumContainers.postgres.getJdbcUrl(), debeziumContainers.postgres.getUsername(), debeziumContainers.postgres.getPassword());
}
private KafkaConsumer<String, String> getConsumer() {
return new KafkaConsumer<>(
Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, debeziumContainers.kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"
),
new StringDeserializer(),
new StringDeserializer());
}
private List<ConsumerRecord<String, String>> drain(KafkaConsumer<String, String> consumer, int expectedRecordCount) {
List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();
Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
consumer.poll(Duration.ofMillis(50))
.iterator()
.forEachRemaining(allRecords::add);
return allRecords.size() == expectedRecordCount;
});
return allRecords;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment