Last active
November 8, 2022 00:26
-
-
Save jseparovic/ce67cdf6684c3843fa238de8edf638b8 to your computer and use it in GitHub Desktop.
Kafka Connect transforms mysql datetime(6) - microsecond precision
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.raasify.kafka.connect.transform.datetime; | |
import org.apache.kafka.common.cache.Cache; | |
import org.apache.kafka.common.cache.LRUCache; | |
import org.apache.kafka.common.cache.SynchronizedCache; | |
import org.apache.kafka.common.config.ConfigDef; | |
import org.apache.kafka.connect.connector.ConnectRecord; | |
import org.apache.kafka.connect.data.Field; | |
import org.apache.kafka.connect.data.Schema; | |
import org.apache.kafka.connect.data.SchemaBuilder; | |
import org.apache.kafka.connect.data.Struct; | |
import org.apache.kafka.connect.transforms.Transformation; | |
import org.apache.kafka.connect.transforms.util.SchemaUtil; | |
import org.apache.kafka.connect.transforms.util.SimpleConfig; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.time.Instant; | |
import java.util.HashMap; | |
import java.util.Map; | |
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; | |
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; | |
public abstract class TransformDateTime<R extends ConnectRecord<R>> implements Transformation<R> | |
{ | |
private static final Logger LOG = LoggerFactory.getLogger(TransformDateTime.class); | |
public static final String OVERVIEW_DOC = | |
"convert epoch microseconds to mysql datetime(6) compatible string"; | |
private interface ConfigName | |
{ | |
String FIELDS = "fields"; | |
} | |
public static final ConfigDef CONFIG_DEF = new ConfigDef() | |
.define(ConfigName.FIELDS, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, | |
"Comma separated list of fields"); | |
private static final String PURPOSE = "Converting DateTime"; | |
private String[] fields; | |
private Cache<Schema, Schema> schemaUpdateCache; | |
@Override | |
public void configure(Map<String, ?> props) | |
{ | |
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); | |
fields = config.getString(ConfigName.FIELDS).split(","); | |
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16)); | |
} | |
private String convert(Object epochMicroSeconds) | |
{ | |
if(epochMicroSeconds != null) { | |
long _epochMicroSeconds = (Long) epochMicroSeconds; | |
long epochSeconds = _epochMicroSeconds / 1_000_000L; | |
long nanoOffset = (_epochMicroSeconds % 1_000_000L) * 1_000L; | |
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoOffset); | |
return instant.toString().replace("T", " ").replace("Z", ""); | |
} | |
else { | |
return null; | |
} | |
} | |
@Override | |
public R apply(R record) | |
{ | |
if (operatingSchema(record) == null) | |
{ | |
return applySchemaless(record); | |
} | |
else | |
{ | |
return applyWithSchema(record); | |
} | |
} | |
public R applyWithSchema(R record) | |
{ | |
LOG.debug("[TransformDateTime] applyWithSchema record=[{}]", record); | |
final Struct value = requireStruct(operatingValue(record), PURPOSE); | |
LOG.debug("[TransformDateTime] value=[{}]", value); | |
Schema updatedSchema = schemaUpdateCache.get(value.schema()); | |
if (updatedSchema == null) | |
{ | |
updatedSchema = makeUpdatedSchema(value.schema()); | |
schemaUpdateCache.put(value.schema(), updatedSchema); | |
} | |
final Struct updatedValue = new Struct(updatedSchema); | |
LOG.debug("[TransformDateTime] updatedValue=[{}]", updatedValue); | |
LOG.debug("[TransformDateTime] updatedSchema.fields()=[{}]", updatedSchema.fields()); | |
for (Field field : updatedSchema.fields()) | |
{ | |
LOG.debug("[TransformDateTime] field=[{}]", field); | |
if(isInFields(field.name())) { | |
// Convert field from epoch microseconds to mysql datetime string with microsecond precision | |
Object newValue = convert(value.get(field)); | |
LOG.debug("[TransformDateTime] updatedValue.put=[{}][{}]", field.name(), newValue); | |
updatedValue.put(field.name(), newValue); | |
} | |
else { | |
updatedValue.put(field.name(), value.get(field)); | |
} | |
} | |
LOG.debug("[TransformDateTime] updatedSchema=[{}]", updatedSchema); | |
LOG.debug("[TransformDateTime] updatedValue=[{}]", updatedValue); | |
return newRecord(record, updatedSchema, updatedValue); | |
} | |
private R applySchemaless(R record) | |
{ | |
LOG.info("[TransformDateTime] applySchemaless record=[{}]", record); | |
Object rawValue = operatingValue(record); | |
if(rawValue == null) { | |
LOG.info("[TransformDateTime] applySchemaless returning=[null]"); | |
return newRecord(record, null, null); | |
} | |
LOG.info("[TransformDateTime] rawValue=[{}]", rawValue); | |
final Map<String, Object> value = requireMap(rawValue, PURPOSE); | |
LOG.info("[TransformDateTime] value=[{}]", value); | |
final HashMap<String, Object> updatedValue = new HashMap<>(value); | |
LOG.info("[TransformDateTime] updatedValue=[{}]", updatedValue); | |
for (Map.Entry<String, Object> entry : value.entrySet()) | |
{ | |
Field field = (Field) entry.getValue(); | |
LOG.info("[TransformDateTime] field=[{}]", field); | |
if(isInFields(field.name())) { | |
// Convert field from epoch microseconds to mysql datetime string with microsecond precision | |
updatedValue.put(field.name(), convert(value.get(field))); | |
} | |
else { | |
updatedValue.put(field.name(), value.get(field)); | |
} | |
} | |
LOG.info("[TransformDateTime] updatedValue=[{}]", updatedValue); | |
return newRecord(record, null, updatedValue); | |
} | |
private Schema makeUpdatedSchema(Schema schema) | |
{ | |
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); | |
for (Field field : schema.fields()) | |
{ | |
if(isInFields(field.name())) { | |
builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA); | |
} | |
else { | |
builder.field(field.name(), field.schema()); | |
} | |
} | |
return builder.build(); | |
} | |
private boolean isInFields(String fieldName) | |
{ | |
for (String f : fields) | |
{ | |
if (f.equals(fieldName)) | |
{ | |
return true; | |
} | |
} | |
return false; | |
} | |
@Override | |
public ConfigDef config() | |
{ | |
return CONFIG_DEF; | |
} | |
@Override | |
public void close() | |
{ | |
schemaUpdateCache = null; | |
} | |
protected abstract Schema operatingSchema(R record); | |
protected abstract Object operatingValue(R record); | |
protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); | |
public static class Key<R extends ConnectRecord<R>> extends TransformDateTime<R> | |
{ | |
@Override | |
protected Schema operatingSchema(R record) | |
{ | |
return record.keySchema(); | |
} | |
@Override | |
protected Object operatingValue(R record) | |
{ | |
return record.key(); | |
} | |
@Override | |
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) | |
{ | |
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, | |
record.valueSchema(), record.value(), record.timestamp()); | |
} | |
} | |
public static class Value<R extends ConnectRecord<R>> extends TransformDateTime<R> | |
{ | |
@Override | |
protected Schema operatingSchema(R record) | |
{ | |
return record.valueSchema(); | |
} | |
@Override | |
protected Object operatingValue(R record) | |
{ | |
return record.value(); | |
} | |
@Override | |
protected R newRecord(R record, Schema updatedSchema, Object updatedValue) | |
{ | |
return record | |
.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, | |
updatedValue, record.timestamp()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've tested INSERTs with
applyWithSchema
method for sinking to mysql datetime(6) sourced from debezium microsecond precision long format.applySchemaless
works for DELETEs. Not sure about other configurations.Sample config: