Skip to content

Instantly share code, notes, and snippets.

@aphyr
Created August 20, 2014 22:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aphyr/52ca0aec41abe8921313 to your computer and use it in GitHub Desktop.
Save aphyr/52ca0aec41abe8921313 to your computer and use it in GitHub Desktop.
Macros to make it easier to write serializers for Fressian.
(ns audience.analysis.hadoop.serialization
(:require [clojure.data.fressian :as fress]
[clojure.walk :as walk]
[parkour.wrapper :as wrapper])
(:import (audience.analysis.hadoop FressianWritable)
(java.io DataInput
DataOutput
ByteArrayInputStream
ByteArrayOutputStream)
(java.nio ByteBuffer)
(org.fressian.handlers WriteHandler
ReadHandler)
(com.clearspring.analytics.stream.quantile QDigest)))
(defn ^"[B" byte-buffer->bytes
"Convert a byte buffer to a byte array."
[^ByteBuffer buffer]
(let [array (byte-array (.remaining buffer))]
(.get buffer array)
array))
(defmacro handler
"Takes a classname as a symbol, a tag name as a string, and bodies for write
and read functions. Provides a special syntax for writing the component
count: (write-tag! some-number), which expands to (.writeTag writer tag
some-number). Returns a map with two keys: :readers, and :writers, each value
being a map suitable for use as a Fressian reader or writer, respectively.
(handler QDigest \"q-digest\"
(write [_ writer digest]
(write-tag! 1)
(.writeBytes writer (QDigest/serialize digest)))
(read [_ reader tag component-count]
(QDigest/deserialize ^bytes (.readObject reader))))"
[classname tag write-expr read-expr]
(let [writer-sym (-> write-expr second second)
write-expr (walk/prewalk
(fn [form]
(if (and (list? form)
(= 'write-tag! (first form)))
(let [count-expr (second form)]
(assert
(= 2 (count form))
"write-tag! takes 1 argument: a component count.")
`(.writeTag ~writer-sym ~tag ~count-expr))
form))
write-expr)]
`{:writers {~classname {~tag (reify WriteHandler ~write-expr)}}
:readers {~tag (reify ReadHandler ~read-expr)}}))
(defmacro handlers
"Takes a flat series of handler quartets: class-name, tag, writer, reader, as
per `handler`. Returns a {:writers {...}, :readers {...}} map, where all
writers are merged into a unified map, merged with the clojure default
handlers, and wrapped with inheritance/associative lookups. Does the same for
the readers map, but without inheritance lookups. :readers and :writers may
be passed to Fressian.
(handlers
QDigest \"q-digest\"
(write [_ writer digest]
(write-tag! 1)
(.writeBytes writer (QDigest/serialize digest)))
(read [_ reader tag component-count]
(QDigest/deserialize ^bytes (.readObject reader)))
clojure.lang.PersistentVector \"vector\"
(write [_ writer v]
(write-tag! (count v))
(doseq [e v]
(.writeObject writer e)))
(read [_ rdr tag component-count]
(let [v (transient [])]
(dotimes [_ component-count]
(conj! v (.readObject rdr)))
(persistent! v))))"
[& quartets]
(let [handlers (partition 4 quartets)
names (repeatedly (count handlers) (partial gensym "handler"))]
; Bind each handler to a symbol
`(let [~@(->> handlers
(map (partial cons `handler))
(interleave names))
; Wrap up handlers into a vector
handlers# [~@names]
; Extract writers and readers
writers# (map :writers handlers#)
readers# (map :readers handlers#)]
; Merge writers/readers together into unified maps
{:writers (->> writers#
(cons fress/clojure-write-handlers)
(reduce merge)
fress/associative-lookup
fress/inheritance-lookup)
:readers (->> readers#
(cons fress/clojure-read-handlers)
(reduce merge)
fress/associative-lookup)})))
(def fress-handlers
(handlers
QDigest "q-digest"
(write [_ writer digest]
(write-tag! 1)
(.writeBytes writer (QDigest/serialize digest)))
(read [_ reader tag component-count]
(QDigest/deserialize ^bytes (.readObject reader)))
clojure.lang.PersistentVector "vector"
(write [_ writer v]
(write-tag! (count v))
(doseq [e v]
(.writeObject writer e)))
(read [_ rdr tag component-count]
(let [v (transient [])]
(dotimes [_ component-count]
(conj! v (.readObject rdr)))
(persistent! v)))
clojure.lang.PersistentHashMap "hash-map"
(write [_ w m]
(write-tag! (* 2 (count m)))
(doseq [[k v] m]
(.writeObject w k)
(.writeObject w v)))
(read [_ rdr tag component-count]
(let [m (transient {})]
(dotimes [_ component-count]
(assoc! m (.readObject rdr) (.readObject rdr)))
(persistent! m)))
clojure.lang.PersistentTreeMap "sorted-map"
(write [_ w m]
(write-tag! (* 2 (count m)))
(doseq [[k v] m]
(.writeObject w k)
(.writeObject w v)))
(read [_ rdr tag component-count]
(loop [i component-count
m (sorted-map)]
(if (pos? i)
(recur
(- i 2)
(assoc m (.readObject rdr) (.readObject rdr)))
m)))
clojure.lang.PersistentHashSet "hash-set"
(write [_ w set]
(write-tag! (count set))
(doseq [e set]
(.writeObject w e)))
(read [_ rdr tag component-count]
(let [s (transient #{})]
(dotimes [_ component-count]
(conj! s (.readObject rdr)))
(persistent! s)))
clojure.lang.PersistentTreeSet "sorted-set"
(write [_ w set]
(write-tag! (count set))
(doseq [e set]
(.writeObject w e)))
(read [_ rdr tag component-count]
(loop [i component-count
s (sorted-set)]
(if (pos? i)
(recur (dec i)
(conj s (.readObject rdr)))
s)))))
; Serializes values to and from Fressian records, delimited by a 32-bit int
; length header. Lord have mercy on my soul.
(set! FressianWritable/readFieldsFn
(fn read-fields [^FressianWritable w ^DataInput in]
(let [buffer (-> in
.readInt
byte-array)]
; Copy input to buffer
(.readFully in buffer)
(set! (.state w)
(-> buffer
(ByteArrayInputStream.)
(fress/create-reader :handlers (:readers fress-handlers))
(fress/read-object))))))
(set! FressianWritable/writeFn
(fn write [^FressianWritable w ^DataOutput out]
(let [value (.state w)
buf (ByteArrayOutputStream.)
writer (fress/create-writer
buf :handlers (:writers fress-handlers))
_ (fress/write-object writer value)]
(.writeInt out (.size buf))
(.write out (.toByteArray buf)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment