Skip to content

Instantly share code, notes, and snippets.

@mccraigmccraig
Created October 25, 2018 21:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mccraigmccraig/9268d150c903581cf2538045f1c6683d to your computer and use it in GitHub Desktop.
Save mccraigmccraig/9268d150c903581cf2538045f1c6683d to your computer and use it in GitHub Desktop.
(ns er-model.connectors.kafka.deserializers
(:require
[clojure.edn :as edn])
(:import
[org.apache.kafka.common.serialization
Deserializer
StringDeserializer
LongDeserializer
IntegerDeserializer
ByteArrayDeserializer]
[java.io PushbackReader ByteArrayInputStream]))
;; from https://github.com/ymilky/franzy/blob/master/src/franzy/serialization/deserializers.clj
(defn byte-array-deserializer
"Kafka raw byte array deserializer.
Useful for value deserialization."
^Deserializer []
(ByteArrayDeserializer.))
(defn integer-deserializer
"Kafka integer deserializer.
Useful for key deserialization."
^Deserializer []
(IntegerDeserializer.))
(defn long-deserializer
"Kafka long deserializer.
Useful for key deserialization."
^Deserializer []
(LongDeserializer.))
(defn string-deserializer
"Kafka string deserializer.
Useful for key deserialization."
^Deserializer []
(StringDeserializer.))
(deftype EdnDeserializer [opts]
Deserializer
(configure [_ _ _])
(deserialize [_ _ data]
(when data
(with-open [r (PushbackReader. (clojure.java.io/reader (ByteArrayInputStream. data)))]
;;Can't remember if this binding is needed anymore with safer edn/read, but we like safe(r/ish) via edn/read
;;Hey you're sending raw EDN over the network, you like to live on the wild side, friend!
(binding [*read-eval* false]
(edn/read (or opts {}) r)))))
(close [_]))
(defn edn-deserializer
"An EDN deserializer for Kafka.
Contents of each item serialized must fit in memory.
> Note: Any users of EDN deserializers should note the usual serialization/deserialization attack vectors.
You should always validate any data before it is serialized so that an attack may not be executed on deserialization.
Although EDN facilities try to protect you against this, nothing in this life is ever for sure. Be vigilant."
(^EdnDeserializer [] (edn-deserializer nil))
(^EdnDeserializer [opts]
(EdnDeserializer. opts)))
(deftype SimpleEdnDeserializer [opts]
Deserializer
(configure [_ _ _])
(deserialize [_ _ data]
(edn/read-string (or opts {}) (String. ^bytes data "UTF-8")))
(close [_]))
(defn simple-edn-deserializer
"A Simple EDN deserializer for Kafka.
Useful for value deserialization."
^SimpleEdnDeserializer
([] (simple-edn-deserializer nil))
^SimpleEdnDeserializer
([opts]
(SimpleEdnDeserializer. opts)))
(deftype KeywordDeserializer []
Deserializer
(configure [_ _ _])
(deserialize [_ _ data]
(when data
(keyword (String. ^bytes data "UTF-8"))))
(close [_]))
(defn keyword-deserializer
"A deserializer that deserializes string values as keywords.
Useful for key deserializers."
^Deserializer []
(KeywordDeserializer.))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment