Skip to content

Instantly share code, notes, and snippets.

@helpermethod
Created September 2, 2019 15:29
Show Gist options
  • Save helpermethod/4f90d8ede425ef369e45218d8a96017b to your computer and use it in GitHub Desktop.
Save helpermethod/4f90d8ede425ef369e45218d8a96017b to your computer and use it in GitHub Desktop.
JSON Serializer which infers schema
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Timestamp;
import java.lang.reflect.Field;
import java.util.Map;
public class JsonSchemaSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper;
public JsonSchemaSerializer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper
.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
}
public JsonSchemaSerializer() {
this(new ObjectMapper());
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// NOP
}
@Override
public byte[] serialize(String topic, T data) {
try {
if (isTombstone(data)) {
return objectMapper.writeValueAsBytes(new Envelope<>(null, null));
}
return objectMapper.writeValueAsBytes(new Envelope<>(data, generateSchema(topic, data.getClass())));
} catch (JsonProcessingException e) {
throw new SerializationException("Can't serialize data [" + data + "] for topic [" + topic + "]", e);
}
}
private boolean isTombstone(T data) {
return data == null;
}
private JsonNode generateSchema(String name, Class<?> clazz) {
return objectMapper
.createObjectNode()
.put("type", "struct")
.put("name", name)
.set("fields", generateFieldsSchema(clazz));
}
private ArrayNode generateFieldsSchema(Class<?> clazz) {
ArrayNode fields = objectMapper.createArrayNode();
for (Field f : clazz.getDeclaredFields()) {
fields.add(inferSchema(f.getName(), f.getType()));
}
return fields;
}
private JsonNode inferSchema(String key, Class<?> clazz) {
switch (clazz.getSimpleName()) {
case "boolean":
case "Boolean":
return objectMapper
.createObjectNode()
.put("field", key)
.put("type", "boolean");
case "String":
return objectMapper
.createObjectNode()
.put("field", key)
.put("type", "string");
case "long":
case "Long":
return objectMapper
.createObjectNode()
.put("field", key)
.put("type", "int64");
case "int":
case "Integer":
return objectMapper
.createObjectNode()
.put("field", key)
.put("type", "int32");
case "Date":
return objectMapper
.createObjectNode()
.put("field", key)
.put("name", Timestamp.LOGICAL_NAME)
.put("type", "int64");
default:
return null;
}
}
@Override
public void close() {
// NOP
}
private static class Envelope<T> {
public final T payload;
public final JsonNode schema;
private Envelope(T payload, JsonNode schema) {
this.payload = payload;
this.schema = schema;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment