Skip to content

Instantly share code, notes, and snippets.

@AdrianoJS
Last active September 28, 2022 18:49
Show Gist options
  • Save AdrianoJS/c29bf047ac05ca01755fafae30799045 to your computer and use it in GitHub Desktop.
Save AdrianoJS/c29bf047ac05ca01755fafae30799045 to your computer and use it in GitHub Desktop.
Debezium handler
public class CdcEvent {
private long id;
private String table;
public long getId() {
return id;
}
public CdcEvent setId(final long id) {
this.id = id;
return this;
}
public String getTable() {
return table;
}
public CdcEvent setTable(final String table) {
this.table = table;
return this;
}
}
@Component
public class SingleEventHandler implements Consumer<ChangeEvent<String, String>> {
private static final JsonPointer TABLE = JsonPointer.compile("/payload/source/table");
private static final JsonPointer PAYLOAD = JsonPointer.compile("/payload/after/id");
private ObjectMapper objectMapper = JacksonUtil.createObjectMapper();
private DataExporter exporter;
private Map<String, DataExtractor> dataExtractors;
@Override
public void accept(final ChangeEvent<String, String> event) {
var cdcEvent = parseEvent(event);
extractData(cdcEvent);
}
@Autowired
public void setDataExtractors(List<DataExtractor> dataExtractors) {
this.dataExtractors = dataExtractors.stream().collect(Collectors.toMap(DataExtractor::getPrimaryTable, e -> e));
}
@Autowired
public SingleEventHandler setExporter(final DataExporter exporter) {
this.exporter = exporter;
return this;
}
private void extractData(final CdcEvent event) {
var extractor = dataExtractors.get(event.getTable());
if (extractor == null) {
return;
}
if (extractor.shouldExport()) {
exporter.export(extractor.extract(event.getId()));
} else {
extractor.extract(event.getId());
}
}
private CdcEvent parseEvent(final ChangeEvent<String, String> event) {
if (event == null || event.value() == null) {
return new CdcEvent();
}
try {
var json = objectMapper.readTree(event.value());
var cdcEvent = new CdcEvent().setId(json.at(PAYLOAD).asLong())
.setTable(json.at(TABLE).asText());
return cdcEvent;
} catch (JsonProcessingException e) {
throw new EventParsingException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment