Skip to content

Instantly share code, notes, and snippets.

@cddr
Last active March 8, 2024 18:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cddr/46cbb9cfb631cb896f12309cb6e68efe to your computer and use it in GitHub Desktop.
Save cddr/46cbb9cfb631cb896f12309cb6e68efe to your computer and use it in GitHub Desktop.
(ns kafka.test-helpers
"Here, we define a simpler version of jackdaw's test-machine
Tries to capture the two essential features of jackdaw via
a simple functional interface rather than having to learn a
whole new data format to write tests.
See `journal`/`with-journal` and `wait-for` for the main
entrypoints to this ns.
Also, `with-bg-process` for running tests that require
programs running in the background.
"
(:require
[clojure.stacktrace :as strace]
[clojure.java.io :as io]
[com.brunobonacci.mulog :as u]])
(:import
(java.io File)
(java.util UUID Properties)
(java.time Duration)
(org.apache.kafka.common TopicPartition)
(io.confluent.kafka.serializers KafkaAvroSerializer)
(org.apache.kafka.common.serialization Serdes)
(org.apache.kafka.clients.consumer KafkaConsumer ConsumerRebalanceListener)
(org.apache.kafka.clients.producer KafkaProducer ProducerRecord)))
(defn properties
[in]
(cond
(instance? File in)
(doto (Properties.)
(.load (io/input-stream in)))
(map? in)
(let [p (Properties.)]
(doseq [[k v] in]
(.setProperty k v))
p)))
(defn lazy-log
"A lazy sequence of the records received by polling the supplied consumer"
[consumer {:keys [poll-duration fuse-fn]}]
(let [r (.poll consumer poll-duration)]
(if (fuse-fn r)
(lazy-cat r (lazy-log consumer {:poll-duration poll-duration
:fuse-fn fuse-fn}))
r)))
(defn clojure-record
"A map representing the key features of a kafka `ConsumerRecord`"
[rec]
{:topic (.topic rec)
:partition (.partition rec)
:key (.key rec)
:value (.value rec)})
(defn kafka-record
"Creates a kafka `ProducerRecord` from a clojure map"
[clj]
(ProducerRecord. (:topic clj)
(:key clj)
(:value clj)))
(defn wait-for-assignment
"Return a `ConsumerBalanceListener` that delivers `:partitions-assigned`
to the provided `start-promise` when partitions have been assigned to
this consumer
This helps ensure the test consumer is ready to receive messages
before we start sending them in a test."
[start-promise]
(reify ConsumerRebalanceListener
(onPartitionsRevoked [_this _partitions-xs])
(onPartitionsLost [_this _partitions-xs])
(onPartitionsAssigned [_this partitions-xs]
(deliver start-promise :partitions-assigned))))
(defn journal
"Takes an atom `j` and in another thread, copy the output observed on the
provided `topics` to the atom.
Immediately returns a function that must be called to stop the journal.
See `with-journal` for a way to call your wrap your test-function in a
a way that makes the kafka output available as an atom
Data Structures
After reading a few messages are written to topic 'foo' for example, `@j`
might look something like this
[
{:topic \"credits\" :partition 0 key 1 :value \"ben\"}
{:topic \"credits\" :partition 0 key 1 :value \"ellie\"}
{:topic \"credits\" :partition 0 key 1 :value \"simon\"}
{:topic \"credits\" :partition 0 key 1 :value \"andy\"}
{:topic \"credits\" :partition 0 key 1 :value \"tom\"}
]"
[j {:keys [consumer-fn topics]}]
(let [strt (promise)
stopped? (promise)
done? (promise)
bg-fn (fn []
(try
(with-open [consumer (consumer-fn)]
(.subscribe consumer (or topics #".*") (wait-for-assignment strt))
(doseq [rec (->> (lazy-log consumer
{:poll-duration (Duration/ofMillis 500)
:fuse-fn #(do % (not (realized? stopped?)))})
(map clojure-record))]
(swap! j conj rec))
;; commit all offsets we've read in this consumer
(.commitSync consumer)
(deliver done? :success))
(catch Exception e
(strace/print-cause-trace e)
(deliver done? :fail)
(throw e))))
bg-fut (future (bg-fn))
stop (fn []
(deliver stopped? true)
@bg-fut)]
@strt
stop))
(defn with-journal
"Convenience wrapper for running some test function `f` and providing it with
access to an atom that mirrors one or more kafka topics.
`journal-args` is a map containing :consumer-fn and :topics keys
the consumer-fn will be invoked with zero parameters
to produce a KafkaConsumer which will then immediately
subscribe to the topics listed in :topics
`f` is a function that will be invoked with the journal atom as the single
parameter
Typically some test will be performed within the `f`."
[journal-args f]
(let [j (atom [])
stop-fn (journal j journal-args)]
(try
(f j)
(finally
(stop-fn)))))
(defn wait-for
"Monitor changes to some atom `jrnl` until the new-value satisfies predicate `pred`.
Primary use for this is to wait until all expected output has arrived on some
topic before making test-assertions"
[jrnl pred]
(let [id (str (UUID/randomUUID))
done? (promise)
done-when-f (fn [k r _old-val new-val]
(let [rslt (pred new-val)]
(when rslt
(remove-watch jrnl k)
(deliver done? :done))))]
(add-watch jrnl id done-when-f)
(u/log ::wait-for :id id :done? @done?)
@jrnl))
(defn call-with-bg-proc
"Invoke `f` after starting the background process represented by `cmd`
and waiting until we've seen a line in the output stream that satisfies
the predicate `started?`"
[cmd f started?]
(let [pb (ProcessBuilder. (into-array String cmd))
proc (.start pb)
br (io/reader (.getInputStream proc))]
(try
;; read process output until we find a line that tells us the app
;; has successfully started
(loop [line (.readLine br)]
(if (started? line)
nil
(recur (.readLine br))))
;; invoke the test-function
(f)
;; clean-up
(finally
(.destroy proc)))))
(defmacro with-bg-process [cmd started? & body]
`(call-with-bg-proc ~cmd (fn [] ~@body) ~started?))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment