Skip to content

Instantly share code, notes, and snippets.

@slvrtrn
Last active September 23, 2021 07:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slvrtrn/f1f8506575713dd3f2b68bc559067f07 to your computer and use it in GitHub Desktop.
Save slvrtrn/f1f8506575713dd3f2b68bc559067f07 to your computer and use it in GitHub Desktop.
package com.bitvavo.debezium.converters;
import com.bitvavo.debezium.converters.utils.PropertiesReader;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.util.Properties;
import java.util.Set;
@Slf4j
public class MysqlStringConverter
implements CustomConverter<SchemaBuilder, RelationalColumn> {
private Set<String> columns;
@Override
public void configure(Properties properties) {
PropertiesReader propertiesReader = new PropertiesReader(properties);
columns = propertiesReader.readStrSet("columns");
}
@Override
public void converterFor(
RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
String colName = column.name();
if (!columns.contains(colName)) {
return;
}
String sqlType = column.typeName().toUpperCase();
boolean isValidType = sqlType.startsWith("VARCHAR") || sqlType.startsWith("TEXT");
if (isValidType) {
SchemaBuilder schemaBuilder = SchemaBuilder.string().optional().defaultValue("test");
registration.register(schemaBuilder, x -> x);
log.info(
"Registered a simple string converter for table {}, column {}",
column.dataCollection(),
colName
);
}
}
}
package com.example.debezium.converters.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.Properties;
import java.util.Set;
@Slf4j
public class PropertiesReader {
private final Properties properties;
public PropertiesReader(Properties properties) {
this.properties = properties;
}
public Set<String> readStrSet(String key) {
String value = (String) this.properties.get(key);
checkValueIsNotNull(key, value);
return Set.of(value.replaceAll(" ", "").split(","));
}
private void checkValueIsNotNull(String key, String value) {
if (value == null) {
log.error("{} setting should be defined", key);
throw new IllegalArgumentException(key + " setting should be defined");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment