Skip to content

Instantly share code, notes, and snippets.

@jseparovic
Last active November 8, 2022 00:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jseparovic/ce67cdf6684c3843fa238de8edf638b8 to your computer and use it in GitHub Desktop.
Save jseparovic/ce67cdf6684c3843fa238de8edf638b8 to your computer and use it in GitHub Desktop.
Kafka Connect transforms mysql datetime(6) - microsecond precision
package io.raasify.kafka.connect.transform.datetime;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
public abstract class TransformDateTime<R extends ConnectRecord<R>> implements Transformation<R>
{
private static final Logger LOG = LoggerFactory.getLogger(TransformDateTime.class);
public static final String OVERVIEW_DOC =
"convert epoch microseconds to mysql datetime(6) compatible string";
private interface ConfigName
{
String FIELDS = "fields";
}
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.FIELDS, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH,
"Comma separated list of fields");
private static final String PURPOSE = "Converting DateTime";
private String[] fields;
private Cache<Schema, Schema> schemaUpdateCache;
@Override
public void configure(Map<String, ?> props)
{
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
fields = config.getString(ConfigName.FIELDS).split(",");
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
}
private String convert(Object epochMicroSeconds)
{
if(epochMicroSeconds != null) {
long _epochMicroSeconds = (Long) epochMicroSeconds;
long epochSeconds = _epochMicroSeconds / 1_000_000L;
long nanoOffset = (_epochMicroSeconds % 1_000_000L) * 1_000L;
Instant instant = Instant.ofEpochSecond(epochSeconds, nanoOffset);
return instant.toString().replace("T", " ").replace("Z", "");
}
else {
return null;
}
}
@Override
public R apply(R record)
{
if (operatingSchema(record) == null)
{
return applySchemaless(record);
}
else
{
return applyWithSchema(record);
}
}
public R applyWithSchema(R record)
{
LOG.debug("[TransformDateTime] applyWithSchema record=[{}]", record);
final Struct value = requireStruct(operatingValue(record), PURPOSE);
LOG.debug("[TransformDateTime] value=[{}]", value);
Schema updatedSchema = schemaUpdateCache.get(value.schema());
if (updatedSchema == null)
{
updatedSchema = makeUpdatedSchema(value.schema());
schemaUpdateCache.put(value.schema(), updatedSchema);
}
final Struct updatedValue = new Struct(updatedSchema);
LOG.debug("[TransformDateTime] updatedValue=[{}]", updatedValue);
LOG.debug("[TransformDateTime] updatedSchema.fields()=[{}]", updatedSchema.fields());
for (Field field : updatedSchema.fields())
{
LOG.debug("[TransformDateTime] field=[{}]", field);
if(isInFields(field.name())) {
// Convert field from epoch microseconds to mysql datetime string with microsecond precision
Object newValue = convert(value.get(field));
LOG.debug("[TransformDateTime] updatedValue.put=[{}][{}]", field.name(), newValue);
updatedValue.put(field.name(), newValue);
}
else {
updatedValue.put(field.name(), value.get(field));
}
}
LOG.debug("[TransformDateTime] updatedSchema=[{}]", updatedSchema);
LOG.debug("[TransformDateTime] updatedValue=[{}]", updatedValue);
return newRecord(record, updatedSchema, updatedValue);
}
private R applySchemaless(R record)
{
LOG.info("[TransformDateTime] applySchemaless record=[{}]", record);
Object rawValue = operatingValue(record);
if(rawValue == null) {
LOG.info("[TransformDateTime] applySchemaless returning=[null]");
return newRecord(record, null, null);
}
LOG.info("[TransformDateTime] rawValue=[{}]", rawValue);
final Map<String, Object> value = requireMap(rawValue, PURPOSE);
LOG.info("[TransformDateTime] value=[{}]", value);
final HashMap<String, Object> updatedValue = new HashMap<>(value);
LOG.info("[TransformDateTime] updatedValue=[{}]", updatedValue);
for (Map.Entry<String, Object> entry : value.entrySet())
{
Field field = (Field) entry.getValue();
LOG.info("[TransformDateTime] field=[{}]", field);
if(isInFields(field.name())) {
// Convert field from epoch microseconds to mysql datetime string with microsecond precision
updatedValue.put(field.name(), convert(value.get(field)));
}
else {
updatedValue.put(field.name(), value.get(field));
}
}
LOG.info("[TransformDateTime] updatedValue=[{}]", updatedValue);
return newRecord(record, null, updatedValue);
}
private Schema makeUpdatedSchema(Schema schema)
{
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (Field field : schema.fields())
{
if(isInFields(field.name())) {
builder.field(field.name(), Schema.OPTIONAL_STRING_SCHEMA);
}
else {
builder.field(field.name(), field.schema());
}
}
return builder.build();
}
private boolean isInFields(String fieldName)
{
for (String f : fields)
{
if (f.equals(fieldName))
{
return true;
}
}
return false;
}
@Override
public ConfigDef config()
{
return CONFIG_DEF;
}
@Override
public void close()
{
schemaUpdateCache = null;
}
protected abstract Schema operatingSchema(R record);
protected abstract Object operatingValue(R record);
protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue);
public static class Key<R extends ConnectRecord<R>> extends TransformDateTime<R>
{
@Override
protected Schema operatingSchema(R record)
{
return record.keySchema();
}
@Override
protected Object operatingValue(R record)
{
return record.key();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue)
{
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue,
record.valueSchema(), record.value(), record.timestamp());
}
}
public static class Value<R extends ConnectRecord<R>> extends TransformDateTime<R>
{
@Override
protected Schema operatingSchema(R record)
{
return record.valueSchema();
}
@Override
protected Object operatingValue(R record)
{
return record.value();
}
@Override
protected R newRecord(R record, Schema updatedSchema, Object updatedValue)
{
return record
.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema,
updatedValue, record.timestamp());
}
}
}
@jseparovic
Copy link
Author

jseparovic commented Nov 7, 2022

I've tested INSERTs with applyWithSchema method for sinking to mysql datetime(6) sourced from debezium microsecond precision long format. applySchemaless works for DELETEs. Not sure about other configurations.

Sample config:

"transforms.datetime.type": "io.raasify.kafka.connect.transform.datetime.TransformDateTime$Value",
"transforms.datetime.fields": "created,updated,startAtDateTime,endAtDateTime,startDateTime,endDateTime",

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment