Skip to content

Instantly share code, notes, and snippets.

@mccraigmccraig
Created October 25, 2018 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mccraigmccraig/7ea67bccea87da1510f07b99ca45b2d1 to your computer and use it in GitHub Desktop.
Save mccraigmccraig/7ea67bccea87da1510f07b99ca45b2d1 to your computer and use it in GitHub Desktop.
(ns er-model.connectors.kafka.serializers
(:import (org.apache.kafka.common.serialization
LongSerializer
Serializer
IntegerSerializer
StringSerializer
ByteArraySerializer)
(java.io ByteArrayOutputStream)))
;; from https://github.com/ymilky/franzy/blob/master/src/franzy/serialization/serializers.clj
(defn byte-array-serializer
"Kafka raw byte array serializer.
Useful for value serialization."
^Serializer []
(ByteArraySerializer.))
(defn string-serializer
"Kafka string serializer.
This serializer allows serializing values without a key."
^Serializer []
(StringSerializer.))
(defn integer-serializer
"Kafka integer serializer.
Useful for key serialization."
^Serializer []
(IntegerSerializer.))
(defn long-serializer
"Kafka long serializer.
Useful for key serialization."
^Serializer []
(LongSerializer.))
(deftype EdnSerializer [opts]
Serializer
(configure [_ _ _])
(serialize [_ _ data]
;;TODO: process + inject more options? better defaults via configure or opts?
;;no reason to close bos, but we do so to keep clean
(with-open [bos (ByteArrayOutputStream. 1024)]
(with-open [w (if opts (clojure.java.io/writer bos opts) (clojure.java.io/writer bos))]
(binding [*print-length* false
*out* w]
(pr data)))
;;death to efficiency, but easiest way without writing something low-level to encode a stream directly into Kafka
(.toByteArray bos)))
(close [_]))
(defn edn-serializer
(^EdnSerializer [] (edn-serializer nil))
(^EdnSerializer [opts]
(EdnSerializer. opts)))
(deftype SimpleEdnSerializer []
Serializer
(configure [_ _ _])
(serialize [_ _ data]
(some-> data pr-str .getBytes))
(close [_]))
(defn simple-edn-serializer
"A simple EDN deserializer for small amounts of data for Kafka.
Useful for value serialization."
^SimpleEdnSerializer []
(SimpleEdnSerializer.))
(deftype KeywordSerializer []
Serializer
(configure [_ _ _])
(serialize [_ _ data]
(some-> data name .getBytes))
(close [_]))
(defn keyword-serializer
"A serializer that serializers string values as keywords.
Useful for key serializers."
^KeywordSerializer []
(KeywordSerializer.))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment