-
-
Save AdrianoJS/9ddda828d8ad4d2ae37607e4384959af to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
public class DebeziumConnector { | |
private static final String SCHEMA_PLACEHOLDER = "<schema>"; | |
private final Logger log = LoggerFactory.getLogger(this.getClass()); | |
private ExecutorService executor = Executors.newSingleThreadExecutor(); | |
private DebeziumEngine<ChangeEvent<String, String>> engine; | |
private SingleEventHandler eventHandler; | |
private String databaseHostname; | |
private String databaseSchema; | |
private String offsetFlushIntervalMs; | |
private String heartBeatMs; | |
private String pollIntervalMs; | |
// Fields truncated for brevity | |
@Autowired | |
public DebeziumConnector setDatabaseHostname(@Value("${database.hostname}") final String databaseHostname) { | |
this.databaseHostname = databaseHostname; | |
return this; | |
} | |
@Autowired | |
public DebeziumConnector setDatabaseSchema(@Value("${database.schema}") final String databaseSchema) { | |
this.databaseSchema = databaseSchema; | |
return this; | |
} | |
@Autowired | |
public DebeziumConnector setEventHandler(final SingleEventHandler eventHandler) { | |
this.eventHandler = eventHandler; | |
return this; | |
} | |
@Autowired | |
public DebeziumConnector setHeartBeatMs(@Value("${heartbeat.frequency.ms:}") final String heartBeatMs) { | |
this.heartBeatMs = heartBeatMs; | |
return this; | |
} | |
@Autowired | |
public DebeziumConnector setOffsetFlushIntervalMs(@Value("${offset.flush.interval.ms}") final String offsetFlushIntervalMs) { | |
this.offsetFlushIntervalMs = offsetFlushIntervalMs; | |
return this; | |
} | |
@Autowired | |
public DebeziumConnector setPollIntervalMs(@Value("${poll.interval.ms:}") final String pollIntervalMs) { | |
this.pollIntervalMs = pollIntervalMs; | |
return this; | |
} | |
@PreDestroy | |
public void shutdown() throws IOException { | |
log.info("Shutting down the CDC engine"); | |
log.info("Please wait while the current position in the offset log is stored!"); | |
engine.close(); | |
executor.shutdown(); | |
} | |
// Setters truncated for brevity | |
@PostConstruct | |
public void start() { | |
log.info("Starting the CDC engine"); | |
var properties = getDebeziumProperties(); | |
var offsetPolicy = offsetFlushIntervalMs.equals("0") ? OffsetCommitPolicy.always() : OffsetCommitPolicy.periodic(properties); | |
this.engine = DebeziumEngine.create(Json.class) | |
.using(properties) | |
.using(offsetPolicy) | |
.notifying(eventHandler) | |
.build(); | |
executor.execute(engine); | |
} | |
private Properties getDebeziumProperties() { | |
var props = new Properties(); | |
var snapshotTables = "<schema>.table_one,<schema>.table_two,<schema>.table_three"; | |
var allTables = snapshotTables + ",<schema>.table_without_snapshot_need"; | |
if (!heartBeatMs.isBlank() && !heartBeatMs.equals("0")) { | |
allTables = allTables + ",<schema>.cdc_heartbeat"; | |
props.setProperty("heartbeat.interval.ms", heartBeatMs); | |
props.setProperty( | |
"heartbeat.action.query", | |
"insert into <schema>.cdc_heartbeat VALUES (B'1')".replace(SCHEMA_PLACEHOLDER, databaseSchema) | |
); | |
} | |
props.setProperty("name", "FooToBarCDC"); | |
props.setProperty("plugin.name", "pgoutput"); | |
props.setProperty("publication.autocreate.mode", "filtered"); | |
setOptionalProperty(s -> props.setProperty("offset.storage", s), offsetStorage); | |
setOptionalProperty(s -> props.setProperty("offset.flush.interval.ms", s), offsetFlushIntervalMs); | |
setOptionalProperty(s -> props.setProperty("snapshot.mode", s), snapshotMode); | |
setOptionalProperty(s -> props.setProperty("poll.interval.ms", s), pollIntervalMs); | |
if (FileOffsetBackingStore.class.getName().equals(offsetStorage)) { | |
props.setProperty("offset.storage.file.filename", offsetStorageFileFilename); | |
} | |
props.setProperty("database.hostname", databaseHostname); | |
props.setProperty("database.port", databasePort); | |
props.setProperty("database.user", databaseUser); | |
props.setProperty("database.password", databasePassword); | |
props.setProperty("database.dbname", databaseDbname); | |
props.setProperty("database.server.name", "foo"); | |
props.setProperty("table.include.list", allTables.replace(SCHEMA_PLACEHOLDER, databaseSchema)); | |
props.setProperty("snapshot.include.collection.list", snapshotTables.replace(SCHEMA_PLACEHOLDER, databaseSchema)); | |
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector"); | |
return props; | |
} | |
private void setOptionalProperty(final Consumer<String> consumer, final String property) { | |
if (property != null && !property.isBlank()) { | |
consumer.accept(property); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@SpringBootApplication | |
public class Launcher { | |
public static void main(String[] args) { | |
var configFile = System.getProperty("configFile", "classpath:cdc.properties"); | |
new SpringApplicationBuilder().properties("spring.config.location=" + configFile) | |
.sources(Launcher.class) | |
.registerShutdownHook(true) | |
.run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment