-
-
Save slvrtrn/f1f8506575713dd3f2b68bc559067f07 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.bitvavo.debezium.converters; | |
import com.bitvavo.debezium.converters.utils.PropertiesReader; | |
import io.debezium.spi.converter.CustomConverter; | |
import io.debezium.spi.converter.RelationalColumn; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.kafka.connect.data.SchemaBuilder; | |
import java.util.Properties; | |
import java.util.Set; | |
@Slf4j | |
public class MysqlStringConverter | |
implements CustomConverter<SchemaBuilder, RelationalColumn> { | |
private Set<String> columns; | |
@Override | |
public void configure(Properties properties) { | |
PropertiesReader propertiesReader = new PropertiesReader(properties); | |
columns = propertiesReader.readStrSet("columns"); | |
} | |
@Override | |
public void converterFor( | |
RelationalColumn column, | |
ConverterRegistration<SchemaBuilder> registration) { | |
String colName = column.name(); | |
if (!columns.contains(colName)) { | |
return; | |
} | |
String sqlType = column.typeName().toUpperCase(); | |
boolean isValidType = sqlType.startsWith("VARCHAR") || sqlType.startsWith("TEXT"); | |
if (isValidType) { | |
SchemaBuilder schemaBuilder = SchemaBuilder.string().optional().defaultValue("test"); | |
registration.register(schemaBuilder, x -> x); | |
log.info( | |
"Registered a simple string converter for table {}, column {}", | |
column.dataCollection(), | |
colName | |
); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.debezium.converters.utils; | |
import lombok.extern.slf4j.Slf4j; | |
import java.util.Properties; | |
import java.util.Set; | |
@Slf4j | |
public class PropertiesReader { | |
private final Properties properties; | |
public PropertiesReader(Properties properties) { | |
this.properties = properties; | |
} | |
public Set<String> readStrSet(String key) { | |
String value = (String) this.properties.get(key); | |
checkValueIsNotNull(key, value); | |
return Set.of(value.replaceAll(" ", "").split(",")); | |
} | |
private void checkValueIsNotNull(String key, String value) { | |
if (value == null) { | |
log.error("{} setting should be defined", key); | |
throw new IllegalArgumentException(key + " setting should be defined"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment