-
-
Save fr33m0nk/efdca7d5e77db9bc0cdcbf6a1ce38497 to your computer and use it in GitHub Desktop.
Somewhat reactive Kafka consumer using core.async :(
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{:paths ["src"] | |
:main-opts ["-m" "play.ground"] | |
:deps {org.clojure/clojure {:mvn/version "1.11.1"} | |
org.clojure/core.async {:mvn/version "1.5.648"} | |
org.apache.kafka/kafka-clients {:mvn/version "3.3.1"} | |
org.clojure/java.data {:mvn/version "1.0.95"}} | |
:aliases {:build {:deps {io.github.clojure/tools.build {:git/tag "v0.8.1" :git/sha "7d40500"}} | |
:ns-default build}}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns kafka.consumer | |
(:require | |
[clojure.java.data :as j]) | |
(:import (java.time Duration) | |
(java.util Map Optional) | |
(org.apache.kafka.clients.consumer ConsumerConfig ConsumerRecord ConsumerRecords KafkaConsumer OffsetAndMetadata) | |
(org.apache.kafka.common TopicPartition))) | |
(defn config | |
[bootstrap-servers] | |
{(ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG) bootstrap-servers | |
(ConsumerConfig/GROUP_ID_CONFIG) "test-consumer-123" | |
(ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG) "org.apache.kafka.common.serialization.StringDeserializer" | |
(ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG) "org.apache.kafka.common.serialization.StringDeserializer" | |
(ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG) "false" | |
(ConsumerConfig/MAX_POLL_RECORDS_CONFIG) "5" | |
(ConsumerConfig/AUTO_OFFSET_RESET_CONFIG) "earliest" | |
(ConsumerConfig/Ma)}) | |
(defn consumer | |
[^Map config] | |
(KafkaConsumer. config)) | |
(defn subscribe-consumer | |
[^KafkaConsumer consumer topics] | |
(.subscribe consumer topics) | |
consumer) | |
(defn topic-partitions | |
[^ConsumerRecords consumer-records] | |
(.partitions consumer-records)) | |
(defn poll-consumer-records! | |
^ConsumerRecords | |
[^KafkaConsumer consumer duration-in-millis] | |
(.poll consumer (Duration/ofMillis ^Long duration-in-millis))) | |
(defn pause-consumer | |
[^KafkaConsumer consumer topic-partitions] | |
(.pause consumer topic-partitions) | |
consumer) | |
(defn resume-consumer | |
[^KafkaConsumer consumer] | |
(.resume consumer (.paused consumer)) | |
consumer) | |
(defn commit-consumer | |
([^KafkaConsumer consumer timeout-in-millis] | |
(.commitSync consumer (Duration/ofMillis timeout-in-millis)) | |
consumer) | |
([^KafkaConsumer consumer topic-partition-offset-metadata-map timeout-in-millis] | |
(.commitSync consumer topic-partition-offset-metadata-map (Duration/ofMillis timeout-in-millis)))) | |
(defn close-consumer | |
[^KafkaConsumer consumer timeout-in-millis] | |
(.close consumer (Duration/ofMillis timeout-in-millis))) | |
(defmethod j/from-java ConsumerRecord [consumer-record] | |
{:topic (.topic ^ConsumerRecord consumer-record) | |
:partition (.partition ^ConsumerRecord consumer-record) | |
:headers (.headers ^ConsumerRecord consumer-record) | |
:key (.key ^ConsumerRecord consumer-record) | |
:value (.value ^ConsumerRecord consumer-record) | |
:offset (.offset ^ConsumerRecord consumer-record) | |
:timestamp (.timestamp ^ConsumerRecord consumer-record) | |
:timestampType (.timestampType ^ConsumerRecord consumer-record) | |
:leaderEpoch (.leaderEpoch ^ConsumerRecord consumer-record)}) | |
(defmethod j/from-java ConsumerRecords [consumer-records] | |
{:empty? (.isEmpty ^ConsumerRecords consumer-records) | |
:partitions (.partitions ^ConsumerRecords consumer-records) | |
:records (into [] (map j/from-java) (seq consumer-records))}) | |
(defmethod j/to-java [OffsetAndMetadata ConsumerRecord] | |
[_ consumer-record] | |
(let [offset (.offset ^ConsumerRecord consumer-record) | |
leaderEpoch (.leaderEpoch ^ConsumerRecord consumer-record)] | |
(OffsetAndMetadata. offset leaderEpoch ""))) | |
(defmethod j/to-java [TopicPartition ConsumerRecord] | |
[_ consumer-record] | |
(let [topic (.topic ^ConsumerRecord consumer-record) | |
partition (.partition ^ConsumerRecord consumer-record)] | |
(TopicPartition. topic partition))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns kafka.core-async | |
(:require | |
[clojure.java.data :as j] | |
[kafka.consumer :as c] | |
[clojure.core.async :as a]) | |
(:import (java.util.concurrent ExecutorService Executors) | |
(org.apache.kafka.clients.consumer KafkaConsumer OffsetAndMetadata) | |
(org.apache.kafka.common TopicPartition))) | |
(defn poller | |
"Polls for ConsumerRecords" | |
[{:keys [^KafkaConsumer consumer data-channel control-channel commit-channel]}] | |
(loop [] | |
(let [[v _c] (a/alts!! [(a/timeout 2000) | |
control-channel])] | |
(condp = v | |
;; Shuts consumer and all channels down | |
:shutdown (do | |
(c/close-consumer consumer 1000) | |
(apply a/close! [data-channel control-channel commit-channel])) | |
;; Processor signals that it can consume next batch of records | |
:next (let [;; resumes paused consumer if paused state returns records | |
_ (when (seq (.paused consumer)) (c/resume-consumer consumer)) | |
;; fetches records | |
consumer-records (c/poll-consumer-records! consumer 1000)] | |
;; pauses the consumer again to allow system to process the records | |
(when-not (.isEmpty ^ConsumerRecords consumer-records) | |
(c/pause-consumer consumer (c/topic-partitions consumer-records)) | |
(a/>!! data-channel consumer-records)) | |
(recur)) | |
;; Below poll effectively keeps the consumer group alive with Kafka | |
;; poll does not fetch any record till consumer is resumed | |
(do | |
(c/poll-consumer-records! consumer 1000) | |
(recur)))))) | |
(defn init-consumer | |
"Initializes Consumer and starts poller with an Executor service" | |
[{:keys [bootstrap-servers topics ^ExecutorService executor] | |
:or {bootstrap-servers "127.0.0.1:29092" | |
topics ["hello_world"] | |
executor (Executors/newVirtualThreadPerTaskExecutor)}}] | |
(let [consumer (->> bootstrap-servers c/config c/consumer) | |
control-channel (a/chan) | |
data-channel (a/chan 10 (mapcat identity)) | |
;; commit-channel batches processed records to make commit a batch | |
commit-channel (a/chan 10 (partition-all 3)) | |
consumer-map {:data-channel data-channel | |
:control-channel control-channel | |
:commit-channel commit-channel | |
:consumer consumer | |
:executor executor}] | |
(c/subscribe-consumer consumer topics) | |
(.submit executor ^Callable (fn [] (poller consumer-map))) | |
consumer-map)) | |
(defn processor | |
"Processes ConsumerRecord with function `f` | |
`f` takes care of retries and propagates failures" | |
[{:keys [data-channel control-channel commit-channel ^ExecutorService executor]} f] | |
(.submit executor | |
^Callable | |
(fn [] | |
(try | |
(loop [] | |
(let [[v _c] (a/alts!! [(a/timeout 5000) | |
data-channel])] | |
(cond | |
(some? v) (do | |
(f v) | |
(println "Processed record, publishing to commit channel") | |
(a/>!! commit-channel v) | |
(recur)) | |
:else (when (a/>!! control-channel :next) | |
(println "requested next batch") | |
(recur))))) | |
(catch Exception e | |
(a/>!! control-channel :shutdown)))))) | |
(defn committer | |
"Commits offsets to Kafka | |
Manual committing provides the ability to recover from error" | |
[{:keys [^KafkaConsumer consumer control-channel commit-channel ^ExecutorService executor]}] | |
(.submit executor | |
^Callable | |
(fn [] | |
(try | |
(loop [] | |
(when-let [consumer-records (a/<!! commit-channel)] | |
(let [offset-map (into {} | |
(map (fn [consumer-record] | |
[(j/to-java TopicPartition consumer-record) | |
(j/to-java OffsetAndMetadata consumer-record)])) | |
consumer-records)] | |
(println "Committing!!") | |
(c/commit-consumer consumer offset-map 1000)) | |
(recur))) | |
(catch Exception e | |
(println "Consumer blew up while committing!") | |
(clojure.pprint/pprint e) | |
(a/>!! control-channel :shutdown)))))) | |
(comment | |
(def consumer (init-consumer {})) | |
(committer consumer) | |
(processor consumer println) | |
(a/>!! (:control-channel consumer) :shutdown) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment