Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@sohangp
sohangp / DebeziumConnectorConfig.java
Last active November 19, 2019 19:12
Student Connector
@Bean
public io.debezium.config.Configuration studentConnector() {
return io.debezium.config.Configuration.create()
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/path/cdc/offset/student-offset.dat")
.with("offset.flush.interval.ms", 60000)
.with("name", "student-postgres-connector")
.with("database.server.name", studentDBHost+"-"+studentDBName)
.with("database.hostname", studentDBHost)
@sohangp
sohangp / pom.xml
Last active November 19, 2019 19:12
Embedded Debezium Maven Dependency
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${debezium.version}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium.version}</version>
</dependency>
@sohangp
sohangp / docker-compose.yml
Created November 19, 2019 18:30
docker-compose
version: "3.5"
services:
# Install postgres and setup the student database.
postgres:
container_name: postgres
image: debezium/postgres
ports:
- 5432:5432
environment:
@sohangp
sohangp / CustomerRepository.java
Created October 7, 2019 13:24
Named Parameter for FirstName
/**
* This will list all customers based on first name.
*
* @return List<CustomerDetailsDTO>
*/
@Query(name = "customerEntity.searchCustomerByFirstName", nativeQuery = true)
List<CustomerDetailsDTO> searchCustomerByFirstName(@Param("firstName") String firstName);
@sohangp
sohangp / orm.xml
Created October 7, 2019 13:22
Named Parameter
<!-- Search Customer By First Name -->
<named-native-query name="customerEntity.searchCustomerByFirstName">
<query><![CDATA[
SELECT
cust.id as customerId,
cust.FIRST_NAME as firstName,
cust.LAST_NAME as lastName,
cust.CITY as city,
cust.COUNTRY as country,
corder.ORDER_NUMBER as orderNumber,
@sohangp
sohangp / CustomTransformation.java
Created October 2, 2019 17:57
Custom Transformation on Debezium event
public class CustomTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
/**
* This method is invoked when a change is made on the outbox schema.
*
* @param sourceRecord
* @return
*/
public R apply(R sourceRecord) {
@sohangp
sohangp / Dockerfile
Created October 2, 2019 17:54
Custom Debezium Transformer image
FROM debezium/connect
ENV DEBEZIUM_DIR=$KAFKA_CONNECT_PLUGINS_DIR/debezium-transformer
RUN mkdir $DEBEZIUM_DIR
COPY target/custom-debezium-transformer-0.0.1.jar $DEBEZIUM_DIR
@sohangp
sohangp / KafkaSink.java
Created October 2, 2019 01:28
Publish Kafka Message
public void sink(Map<String, Object> polledMessage) {
log.info("Sending to message broker: {}", polledMessage);
String topic = ((String) polledMessage.get("event_type")).toLowerCase();
String payload = (String) polledMessage.get("payload");
kafkaTemplate.send(topic, payload);
}
@sohangp
sohangp / OutboxPoller.java
Last active October 2, 2019 12:45
Start Debezium and Handle Events
...
@PostConstruct
void start() {
executor.execute(engine);
}
...
/*
* Connect to the PG database and invoke the method `handleEvent()` when an
* event has been polled.
@sohangp
sohangp / DebeziumConfig.java
Last active October 2, 2019 14:18
Debezium Configuration
...
@Bean
public io.debezium.config.Configuration debeziumDataBaseConfig() {
return io.debezium.config.Configuration.create()
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("name", "outbox-postgres-connector")
.with("database.server.name", "pg-outbox-server")
.with("database.hostname", databaseHostName)
.with("database.port", databasePort)
.with("database.user",databaseUserName)