Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Macros to make it easier to write serializers for Fressian.
(ns audience.analysis.hadoop.serialization
(:require [clojure.data.fressian :as fress]
[clojure.walk :as walk]
[parkour.wrapper :as wrapper])
(:import (audience.analysis.hadoop FressianWritable)
(java.io DataInput
DataOutput
ByteArrayInputStream
ByteArrayOutputStream)
(java.nio ByteBuffer)
(org.fressian.handlers WriteHandler
ReadHandler)
(com.clearspring.analytics.stream.quantile QDigest)))
(defn ^"[B" byte-buffer->bytes
"Convert a byte buffer to a byte array."
[^ByteBuffer buffer]
(let [array (byte-array (.remaining buffer))]
(.get buffer array)
array))
(defmacro handler
"Takes a classname as a symbol, a tag name as a string, and bodies for write
and read functions. Provides a special syntax for writing the component
count: (write-tag! some-number), which expands to (.writeTag writer tag
some-number). Returns a map with two keys: :readers, and :writers, each value
being a map suitable for use as a Fressian reader or writer, respectively.
(handler QDigest \"q-digest\"
(write [_ writer digest]
(write-tag! 1)
(.writeBytes writer (QDigest/serialize digest)))
(read [_ reader tag component-count]
(QDigest/deserialize ^bytes (.readObject reader))))"
[classname tag write-expr read-expr]
(let [writer-sym (-> write-expr second second)
write-expr (walk/prewalk
(fn [form]
(if (and (list? form)
(= 'write-tag! (first form)))
(let [count-expr (second form)]
(assert
(= 2 (count form))
"write-tag! takes 1 argument: a component count.")
`(.writeTag ~writer-sym ~tag ~count-expr))
form))
write-expr)]
`{:writers {~classname {~tag (reify WriteHandler ~write-expr)}}
:readers {~tag (reify ReadHandler ~read-expr)}}))
(defmacro handlers
"Takes a flat series of handler quartets: class-name, tag, writer, reader, as
per `handler`. Returns a {:writers {...}, :readers {...}} map, where all
writers are merged into a unified map, merged with the clojure default
handlers, and wrapped with inheritance/associative lookups. Does the same for
the readers map, but without inheritance lookups. :readers and :writers may
be passed to Fressian.
(handlers
QDigest \"q-digest\"
(write [_ writer digest]
(write-tag! 1)
(.writeBytes writer (QDigest/serialize digest)))
(read [_ reader tag component-count]
(QDigest/deserialize ^bytes (.readObject reader)))
clojure.lang.PersistentVector \"vector\"
(write [_ writer v]
(write-tag! (count v))
(doseq [e v]
(.writeObject writer e)))
(read [_ rdr tag component-count]
(let [v (transient [])]
(dotimes [_ component-count]
(conj! v (.readObject rdr)))
(persistent! v))))"
[& quartets]
(let [handlers (partition 4 quartets)
names (repeatedly (count handlers) (partial gensym "handler"))]
; Bind each handler to a symbol
`(let [~@(->> handlers
(map (partial cons `handler))
(interleave names))
; Wrap up handlers into a vector
handlers# [~@names]
; Extract writers and readers
writers# (map :writers handlers#)
readers# (map :readers handlers#)]
; Merge writers/readers together into unified maps
{:writers (->> writers#
(cons fress/clojure-write-handlers)
(reduce merge)
fress/associative-lookup
fress/inheritance-lookup)
:readers (->> readers#
(cons fress/clojure-read-handlers)
(reduce merge)
fress/associative-lookup)})))
(def fress-handlers
(handlers
QDigest "q-digest"
(write [_ writer digest]
(write-tag! 1)
(.writeBytes writer (QDigest/serialize digest)))
(read [_ reader tag component-count]
(QDigest/deserialize ^bytes (.readObject reader)))
clojure.lang.PersistentVector "vector"
(write [_ writer v]
(write-tag! (count v))
(doseq [e v]
(.writeObject writer e)))
(read [_ rdr tag component-count]
(let [v (transient [])]
(dotimes [_ component-count]
(conj! v (.readObject rdr)))
(persistent! v)))
clojure.lang.PersistentHashMap "hash-map"
(write [_ w m]
(write-tag! (* 2 (count m)))
(doseq [[k v] m]
(.writeObject w k)
(.writeObject w v)))
(read [_ rdr tag component-count]
(let [m (transient {})]
(dotimes [_ component-count]
(assoc! m (.readObject rdr) (.readObject rdr)))
(persistent! m)))
clojure.lang.PersistentTreeMap "sorted-map"
(write [_ w m]
(write-tag! (* 2 (count m)))
(doseq [[k v] m]
(.writeObject w k)
(.writeObject w v)))
(read [_ rdr tag component-count]
(loop [i component-count
m (sorted-map)]
(if (pos? i)
(recur
(- i 2)
(assoc m (.readObject rdr) (.readObject rdr)))
m)))
clojure.lang.PersistentHashSet "hash-set"
(write [_ w set]
(write-tag! (count set))
(doseq [e set]
(.writeObject w e)))
(read [_ rdr tag component-count]
(let [s (transient #{})]
(dotimes [_ component-count]
(conj! s (.readObject rdr)))
(persistent! s)))
clojure.lang.PersistentTreeSet "sorted-set"
(write [_ w set]
(write-tag! (count set))
(doseq [e set]
(.writeObject w e)))
(read [_ rdr tag component-count]
(loop [i component-count
s (sorted-set)]
(if (pos? i)
(recur (dec i)
(conj s (.readObject rdr)))
s)))))
; Serializes values to and from Fressian records, delimited by a 32-bit int
; length header. Lord have mercy on my soul.
(set! FressianWritable/readFieldsFn
(fn read-fields [^FressianWritable w ^DataInput in]
(let [buffer (-> in
.readInt
byte-array)]
; Copy input to buffer
(.readFully in buffer)
(set! (.state w)
(-> buffer
(ByteArrayInputStream.)
(fress/create-reader :handlers (:readers fress-handlers))
(fress/read-object))))))
(set! FressianWritable/writeFn
(fn write [^FressianWritable w ^DataOutput out]
(let [value (.state w)
buf (ByteArrayOutputStream.)
writer (fress/create-writer
buf :handlers (:writers fress-handlers))
_ (fress/write-object writer value)]
(.writeInt out (.size buf))
(.write out (.toByteArray buf)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.