Skip to content

Instantly share code, notes, and snippets.

@fr33m0nk
Last active December 19, 2022 05:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fr33m0nk/efdca7d5e77db9bc0cdcbf6a1ce38497 to your computer and use it in GitHub Desktop.
Save fr33m0nk/efdca7d5e77db9bc0cdcbf6a1ce38497 to your computer and use it in GitHub Desktop.
Somewhat reactive Kafka consumer using core.async :(
{:paths ["src"]
:main-opts ["-m" "play.ground"]
:deps {org.clojure/clojure {:mvn/version "1.11.1"}
org.clojure/core.async {:mvn/version "1.5.648"}
org.apache.kafka/kafka-clients {:mvn/version "3.3.1"}
org.clojure/java.data {:mvn/version "1.0.95"}}
:aliases {:build {:deps {io.github.clojure/tools.build {:git/tag "v0.8.1" :git/sha "7d40500"}}
:ns-default build}}}
(ns kafka.consumer
(:require
[clojure.java.data :as j])
(:import (java.time Duration)
(java.util Map Optional)
(org.apache.kafka.clients.consumer ConsumerConfig ConsumerRecord ConsumerRecords KafkaConsumer OffsetAndMetadata)
(org.apache.kafka.common TopicPartition)))
(defn config
[bootstrap-servers]
{(ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG) bootstrap-servers
(ConsumerConfig/GROUP_ID_CONFIG) "test-consumer-123"
(ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG) "org.apache.kafka.common.serialization.StringDeserializer"
(ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG) "org.apache.kafka.common.serialization.StringDeserializer"
(ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG) "false"
(ConsumerConfig/MAX_POLL_RECORDS_CONFIG) "5"
(ConsumerConfig/AUTO_OFFSET_RESET_CONFIG) "earliest"
(ConsumerConfig/Ma)})
(defn consumer
[^Map config]
(KafkaConsumer. config))
(defn subscribe-consumer
[^KafkaConsumer consumer topics]
(.subscribe consumer topics)
consumer)
(defn topic-partitions
[^ConsumerRecords consumer-records]
(.partitions consumer-records))
(defn poll-consumer-records!
^ConsumerRecords
[^KafkaConsumer consumer duration-in-millis]
(.poll consumer (Duration/ofMillis ^Long duration-in-millis)))
(defn pause-consumer
[^KafkaConsumer consumer topic-partitions]
(.pause consumer topic-partitions)
consumer)
(defn resume-consumer
[^KafkaConsumer consumer]
(.resume consumer (.paused consumer))
consumer)
(defn commit-consumer
([^KafkaConsumer consumer timeout-in-millis]
(.commitSync consumer (Duration/ofMillis timeout-in-millis))
consumer)
([^KafkaConsumer consumer topic-partition-offset-metadata-map timeout-in-millis]
(.commitSync consumer topic-partition-offset-metadata-map (Duration/ofMillis timeout-in-millis))))
(defn close-consumer
[^KafkaConsumer consumer timeout-in-millis]
(.close consumer (Duration/ofMillis timeout-in-millis)))
(defmethod j/from-java ConsumerRecord [consumer-record]
{:topic (.topic ^ConsumerRecord consumer-record)
:partition (.partition ^ConsumerRecord consumer-record)
:headers (.headers ^ConsumerRecord consumer-record)
:key (.key ^ConsumerRecord consumer-record)
:value (.value ^ConsumerRecord consumer-record)
:offset (.offset ^ConsumerRecord consumer-record)
:timestamp (.timestamp ^ConsumerRecord consumer-record)
:timestampType (.timestampType ^ConsumerRecord consumer-record)
:leaderEpoch (.leaderEpoch ^ConsumerRecord consumer-record)})
(defmethod j/from-java ConsumerRecords [consumer-records]
{:empty? (.isEmpty ^ConsumerRecords consumer-records)
:partitions (.partitions ^ConsumerRecords consumer-records)
:records (into [] (map j/from-java) (seq consumer-records))})
(defmethod j/to-java [OffsetAndMetadata ConsumerRecord]
[_ consumer-record]
(let [offset (.offset ^ConsumerRecord consumer-record)
leaderEpoch (.leaderEpoch ^ConsumerRecord consumer-record)]
(OffsetAndMetadata. offset leaderEpoch "")))
(defmethod j/to-java [TopicPartition ConsumerRecord]
[_ consumer-record]
(let [topic (.topic ^ConsumerRecord consumer-record)
partition (.partition ^ConsumerRecord consumer-record)]
(TopicPartition. topic partition)))
(ns kafka.core-async
(:require
[clojure.java.data :as j]
[kafka.consumer :as c]
[clojure.core.async :as a])
(:import (java.util.concurrent ExecutorService Executors)
(org.apache.kafka.clients.consumer KafkaConsumer OffsetAndMetadata)
(org.apache.kafka.common TopicPartition)))
(defn poller
"Polls for ConsumerRecords"
[{:keys [^KafkaConsumer consumer data-channel control-channel commit-channel]}]
(loop []
(let [[v _c] (a/alts!! [(a/timeout 2000)
control-channel])]
(condp = v
;; Shuts consumer and all channels down
:shutdown (do
(c/close-consumer consumer 1000)
(apply a/close! [data-channel control-channel commit-channel]))
;; Processor signals that it can consume next batch of records
:next (let [;; resumes paused consumer if paused state returns records
_ (when (seq (.paused consumer)) (c/resume-consumer consumer))
;; fetches records
consumer-records (c/poll-consumer-records! consumer 1000)]
;; pauses the consumer again to allow system to process the records
(when-not (.isEmpty ^ConsumerRecords consumer-records)
(c/pause-consumer consumer (c/topic-partitions consumer-records))
(a/>!! data-channel consumer-records))
(recur))
;; Below poll effectively keeps the consumer group alive with Kafka
;; poll does not fetch any record till consumer is resumed
(do
(c/poll-consumer-records! consumer 1000)
(recur))))))
(defn init-consumer
"Initializes Consumer and starts poller with an Executor service"
[{:keys [bootstrap-servers topics ^ExecutorService executor]
:or {bootstrap-servers "127.0.0.1:29092"
topics ["hello_world"]
executor (Executors/newVirtualThreadPerTaskExecutor)}}]
(let [consumer (->> bootstrap-servers c/config c/consumer)
control-channel (a/chan)
data-channel (a/chan 10 (mapcat identity))
;; commit-channel batches processed records to make commit a batch
commit-channel (a/chan 10 (partition-all 3))
consumer-map {:data-channel data-channel
:control-channel control-channel
:commit-channel commit-channel
:consumer consumer
:executor executor}]
(c/subscribe-consumer consumer topics)
(.submit executor ^Callable (fn [] (poller consumer-map)))
consumer-map))
(defn processor
"Processes ConsumerRecord with function `f`
`f` takes care of retries and propagates failures"
[{:keys [data-channel control-channel commit-channel ^ExecutorService executor]} f]
(.submit executor
^Callable
(fn []
(try
(loop []
(let [[v _c] (a/alts!! [(a/timeout 5000)
data-channel])]
(cond
(some? v) (do
(f v)
(println "Processed record, publishing to commit channel")
(a/>!! commit-channel v)
(recur))
:else (when (a/>!! control-channel :next)
(println "requested next batch")
(recur)))))
(catch Exception e
(a/>!! control-channel :shutdown))))))
(defn committer
"Commits offsets to Kafka
Manual committing provides the ability to recover from error"
[{:keys [^KafkaConsumer consumer control-channel commit-channel ^ExecutorService executor]}]
(.submit executor
^Callable
(fn []
(try
(loop []
(when-let [consumer-records (a/<!! commit-channel)]
(let [offset-map (into {}
(map (fn [consumer-record]
[(j/to-java TopicPartition consumer-record)
(j/to-java OffsetAndMetadata consumer-record)]))
consumer-records)]
(println "Committing!!")
(c/commit-consumer consumer offset-map 1000))
(recur)))
(catch Exception e
(println "Consumer blew up while committing!")
(clojure.pprint/pprint e)
(a/>!! control-channel :shutdown))))))
(comment
(def consumer (init-consumer {}))
(committer consumer)
(processor consumer println)
(a/>!! (:control-channel consumer) :shutdown)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment