Skip to content

Instantly share code, notes, and snippets.

@AdrianoJS
Last active October 4, 2022 07:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AdrianoJS/9ddda828d8ad4d2ae37607e4384959af to your computer and use it in GitHub Desktop.
Save AdrianoJS/9ddda828d8ad4d2ae37607e4384959af to your computer and use it in GitHub Desktop.
@Component
public class DebeziumConnector {
private static final String SCHEMA_PLACEHOLDER = "<schema>";
private final Logger log = LoggerFactory.getLogger(this.getClass());
private ExecutorService executor = Executors.newSingleThreadExecutor();
private DebeziumEngine<ChangeEvent<String, String>> engine;
private SingleEventHandler eventHandler;
private String databaseHostname;
private String databaseSchema;
private String offsetFlushIntervalMs;
private String heartBeatMs;
private String pollIntervalMs;
// Fields truncated for brevity
@Autowired
public DebeziumConnector setDatabaseHostname(@Value("${database.hostname}") final String databaseHostname) {
this.databaseHostname = databaseHostname;
return this;
}
@Autowired
public DebeziumConnector setDatabaseSchema(@Value("${database.schema}") final String databaseSchema) {
this.databaseSchema = databaseSchema;
return this;
}
@Autowired
public DebeziumConnector setEventHandler(final SingleEventHandler eventHandler) {
this.eventHandler = eventHandler;
return this;
}
@Autowired
public DebeziumConnector setHeartBeatMs(@Value("${heartbeat.frequency.ms:}") final String heartBeatMs) {
this.heartBeatMs = heartBeatMs;
return this;
}
@Autowired
public DebeziumConnector setOffsetFlushIntervalMs(@Value("${offset.flush.interval.ms}") final String offsetFlushIntervalMs) {
this.offsetFlushIntervalMs = offsetFlushIntervalMs;
return this;
}
@Autowired
public DebeziumConnector setPollIntervalMs(@Value("${poll.interval.ms:}") final String pollIntervalMs) {
this.pollIntervalMs = pollIntervalMs;
return this;
}
@PreDestroy
public void shutdown() throws IOException {
log.info("Shutting down the CDC engine");
log.info("Please wait while the current position in the offset log is stored!");
engine.close();
executor.shutdown();
}
// Setters truncated for brevity
@PostConstruct
public void start() {
log.info("Starting the CDC engine");
var properties = getDebeziumProperties();
var offsetPolicy = offsetFlushIntervalMs.equals("0") ? OffsetCommitPolicy.always() : OffsetCommitPolicy.periodic(properties);
this.engine = DebeziumEngine.create(Json.class)
.using(properties)
.using(offsetPolicy)
.notifying(eventHandler)
.build();
executor.execute(engine);
}
private Properties getDebeziumProperties() {
var props = new Properties();
var snapshotTables = "<schema>.table_one,<schema>.table_two,<schema>.table_three";
var allTables = snapshotTables + ",<schema>.table_without_snapshot_need";
if (!heartBeatMs.isBlank() && !heartBeatMs.equals("0")) {
allTables = allTables + ",<schema>.cdc_heartbeat";
props.setProperty("heartbeat.interval.ms", heartBeatMs);
props.setProperty(
"heartbeat.action.query",
"insert into <schema>.cdc_heartbeat VALUES (B'1')".replace(SCHEMA_PLACEHOLDER, databaseSchema)
);
}
props.setProperty("name", "FooToBarCDC");
props.setProperty("plugin.name", "pgoutput");
props.setProperty("publication.autocreate.mode", "filtered");
setOptionalProperty(s -> props.setProperty("offset.storage", s), offsetStorage);
setOptionalProperty(s -> props.setProperty("offset.flush.interval.ms", s), offsetFlushIntervalMs);
setOptionalProperty(s -> props.setProperty("snapshot.mode", s), snapshotMode);
setOptionalProperty(s -> props.setProperty("poll.interval.ms", s), pollIntervalMs);
if (FileOffsetBackingStore.class.getName().equals(offsetStorage)) {
props.setProperty("offset.storage.file.filename", offsetStorageFileFilename);
}
props.setProperty("database.hostname", databaseHostname);
props.setProperty("database.port", databasePort);
props.setProperty("database.user", databaseUser);
props.setProperty("database.password", databasePassword);
props.setProperty("database.dbname", databaseDbname);
props.setProperty("database.server.name", "foo");
props.setProperty("table.include.list", allTables.replace(SCHEMA_PLACEHOLDER, databaseSchema));
props.setProperty("snapshot.include.collection.list", snapshotTables.replace(SCHEMA_PLACEHOLDER, databaseSchema));
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
return props;
}
private void setOptionalProperty(final Consumer<String> consumer, final String property) {
if (property != null && !property.isBlank()) {
consumer.accept(property);
}
}
}
@SpringBootApplication
public class Launcher {
public static void main(String[] args) {
var configFile = System.getProperty("configFile", "classpath:cdc.properties");
new SpringApplicationBuilder().properties("spring.config.location=" + configFile)
.sources(Launcher.class)
.registerShutdownHook(true)
.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment