Skip to content

Instantly share code, notes, and snippets.

@kdrakon
Created December 21, 2017 01:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kdrakon/23f9ac7bb28812c82573bb9a3d8fae36 to your computer and use it in GitHub Desktop.
Save kdrakon/23f9ac7bb28812c82573bb9a3d8fae36 to your computer and use it in GitHub Desktop.
A Kafka Serde for turning Avro-generated classes into JSON objects (only does serialization)
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.nio.charset.Charset;
public class JsonSerdeForAvroClasses<T> {
public static <T> JsonSerdeForAvroClasses<T> gen() {
return new JsonSerdeForAvroClasses<>();
}
public JsonSerdeForAvroClasses() {}
final static private ObjectMapper mapper = new ObjectMapper();
static {
mapper.setVisibility(mapper.getVisibilityChecker()
.withFieldVisibility(JsonAutoDetect.Visibility.PUBLIC_ONLY)
.withGetterVisibility(JsonAutoDetect.Visibility.NONE)
.withSetterVisibility(JsonAutoDetect.Visibility.NONE)
.withCreatorVisibility(JsonAutoDetect.Visibility.NONE));
}
final private ValueMapper<T, ObjectNode> jsonMapper = (v -> mapper.convertValue(v, ObjectNode.class));
public SerdeFunction<T> serdeFunction() {
return new SerdeFunction<>(t -> {
try {
return jsonMapper.apply(t).toString().getBytes(Charset.defaultCharset());
} catch (Throwable e) {
throw new RuntimeException("Failed to serialize", e);
}
}, b -> {
throw new UnsupportedOperationException();
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment