This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Bean | |
public io.debezium.config.Configuration studentConnector() { | |
return io.debezium.config.Configuration.create() | |
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector") | |
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") | |
.with("offset.storage.file.filename", "/path/cdc/offset/student-offset.dat") | |
.with("offset.flush.interval.ms", 60000) | |
.with("name", "student-postgres-connector") | |
.with("database.server.name", studentDBHost+"-"+studentDBName) | |
.with("database.hostname", studentDBHost) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<dependency> | |
<groupId>io.debezium</groupId> | |
<artifactId>debezium-embedded</artifactId> | |
<version>${debezium.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>io.debezium</groupId> | |
<artifactId>debezium-connector-postgres</artifactId> | |
<version>${debezium.version}</version> | |
</dependency> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: "3.5" | |
services: | |
# Install postgres and setup the student database. | |
postgres: | |
container_name: postgres | |
image: debezium/postgres | |
ports: | |
- 5432:5432 | |
environment: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This will list all customers based on first name. | |
* | |
* @return List<CustomerDetailsDTO> | |
*/ | |
@Query(name = "customerEntity.searchCustomerByFirstName", nativeQuery = true) | |
List<CustomerDetailsDTO> searchCustomerByFirstName(@Param("firstName") String firstName); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!-- Search Customer By First Name --> | |
<named-native-query name="customerEntity.searchCustomerByFirstName"> | |
<query><![CDATA[ | |
SELECT | |
cust.id as customerId, | |
cust.FIRST_NAME as firstName, | |
cust.LAST_NAME as lastName, | |
cust.CITY as city, | |
cust.COUNTRY as country, | |
corder.ORDER_NUMBER as orderNumber, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class CustomTransformation<R extends ConnectRecord<R>> implements Transformation<R> { | |
/** | |
* This method is invoked when a change is made on the outbox schema. | |
* | |
* @param sourceRecord | |
* @return | |
*/ | |
public R apply(R sourceRecord) { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM debezium/connect | |
ENV DEBEZIUM_DIR=$KAFKA_CONNECT_PLUGINS_DIR/debezium-transformer | |
RUN mkdir $DEBEZIUM_DIR | |
COPY target/custom-debezium-transformer-0.0.1.jar $DEBEZIUM_DIR |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void sink(Map<String, Object> polledMessage) { | |
log.info("Sending to message broker: {}", polledMessage); | |
String topic = ((String) polledMessage.get("event_type")).toLowerCase(); | |
String payload = (String) polledMessage.get("payload"); | |
kafkaTemplate.send(topic, payload); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
@PostConstruct | |
void start() { | |
executor.execute(engine); | |
} | |
... | |
/* | |
* Connect to the PG database and invoke the method `handleEvent()` when an | |
* event has been polled. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
@Bean | |
public io.debezium.config.Configuration debeziumDataBaseConfig() { | |
return io.debezium.config.Configuration.create() | |
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector") | |
.with("name", "outbox-postgres-connector") | |
.with("database.server.name", "pg-outbox-server") | |
.with("database.hostname", databaseHostName) | |
.with("database.port", databasePort) | |
.with("database.user",databaseUserName) |