Created
September 2, 2019 15:29
-
-
Save helpermethod/4f90d8ede425ef369e45218d8a96017b to your computer and use it in GitHub Desktop.
JSON Serializer which infers schema
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.core.JsonProcessingException; | |
import com.fasterxml.jackson.databind.JsonNode; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.fasterxml.jackson.databind.PropertyNamingStrategy; | |
import com.fasterxml.jackson.databind.node.ArrayNode; | |
import org.apache.kafka.common.errors.SerializationException; | |
import org.apache.kafka.common.serialization.Serializer; | |
import org.apache.kafka.connect.data.Timestamp; | |
import java.lang.reflect.Field; | |
import java.util.Map; | |
public class JsonSchemaSerializer<T> implements Serializer<T> { | |
private final ObjectMapper objectMapper; | |
public JsonSchemaSerializer(ObjectMapper objectMapper) { | |
this.objectMapper = objectMapper | |
.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); | |
} | |
public JsonSchemaSerializer() { | |
this(new ObjectMapper()); | |
} | |
@Override | |
public void configure(Map<String, ?> configs, boolean isKey) { | |
// NOP | |
} | |
@Override | |
public byte[] serialize(String topic, T data) { | |
try { | |
if (isTombstone(data)) { | |
return objectMapper.writeValueAsBytes(new Envelope<>(null, null)); | |
} | |
return objectMapper.writeValueAsBytes(new Envelope<>(data, generateSchema(topic, data.getClass()))); | |
} catch (JsonProcessingException e) { | |
throw new SerializationException("Can't serialize data [" + data + "] for topic [" + topic + "]", e); | |
} | |
} | |
private boolean isTombstone(T data) { | |
return data == null; | |
} | |
private JsonNode generateSchema(String name, Class<?> clazz) { | |
return objectMapper | |
.createObjectNode() | |
.put("type", "struct") | |
.put("name", name) | |
.set("fields", generateFieldsSchema(clazz)); | |
} | |
private ArrayNode generateFieldsSchema(Class<?> clazz) { | |
ArrayNode fields = objectMapper.createArrayNode(); | |
for (Field f : clazz.getDeclaredFields()) { | |
fields.add(inferSchema(f.getName(), f.getType())); | |
} | |
return fields; | |
} | |
private JsonNode inferSchema(String key, Class<?> clazz) { | |
switch (clazz.getSimpleName()) { | |
case "boolean": | |
case "Boolean": | |
return objectMapper | |
.createObjectNode() | |
.put("field", key) | |
.put("type", "boolean"); | |
case "String": | |
return objectMapper | |
.createObjectNode() | |
.put("field", key) | |
.put("type", "string"); | |
case "long": | |
case "Long": | |
return objectMapper | |
.createObjectNode() | |
.put("field", key) | |
.put("type", "int64"); | |
case "int": | |
case "Integer": | |
return objectMapper | |
.createObjectNode() | |
.put("field", key) | |
.put("type", "int32"); | |
case "Date": | |
return objectMapper | |
.createObjectNode() | |
.put("field", key) | |
.put("name", Timestamp.LOGICAL_NAME) | |
.put("type", "int64"); | |
default: | |
return null; | |
} | |
} | |
@Override | |
public void close() { | |
// NOP | |
} | |
private static class Envelope<T> { | |
public final T payload; | |
public final JsonNode schema; | |
private Envelope(T payload, JsonNode schema) { | |
this.payload = payload; | |
this.schema = schema; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment