Skip to content

Instantly share code, notes, and snippets.

@leonoel
Created December 30, 2022 10:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leonoel/b4ec06fe2df30764217977918084fda9 to your computer and use it in GitHub Desktop.
Save leonoel/b4ec06fe2df30764217977918084fda9 to your computer and use it in GitHub Desktop.
Missionary implementation of kafka consumer with backpressure and manual offset commits
{:deps {org.clojure/clojure {:mvn/version "1.11.1"}
org.apache.kafka/kafka-clients {:mvn/version "3.3.1"}
missionary/missionary {:mvn/version "b.26"}}}
(ns kafka-consumer
(:require [missionary.core :as m])
(:import (org.apache.kafka.clients.consumer ConsumerConfig KafkaConsumer ConsumerRecords ConsumerRecord OffsetAndMetadata)
(org.apache.kafka.common TopicPartition)
(java.util Map Collection)
(java.time Duration)))
(defn init-consumer
"Return a new `KafkaConsumer` instance for configuration map `params`, subscribed to `topics`."
[params topic & topics]
(doto (KafkaConsumer. ^Map params) (.subscribe ^Collection (cons topic topics))))
(defn poll-consumer
"Return a task polling records from `KafkaConsumer` instance `consumer` on executor `exec`, with `millis` timeout."
[consumer millis exec]
(m/via exec (.poll ^KafkaConsumer consumer (Duration/ofMillis (long millis)))))
(defn close-consumer
"Return a task closing `KafkaConsumer` instance `consumer` on executor `exec`, with `millis` timeout."
[consumer millis exec]
(m/via exec (.close ^KafkaConsumer consumer (Duration/ofMillis (long millis)))))
(defn commit-consumer
"Return a task committing `offsets` from `KafkaConsumer` instance `consumer` on executor `exec`, with `millis` timeout."
[consumer offsets millis exec]
(m/via exec (.commitSync ^KafkaConsumer consumer ^Map offsets (Duration/ofMillis (long millis)))))
(defn pause-consumer!
"Pause `TopicPartition` collection `topic-partitions` from `KafkaConsumer` instance `consumer`."
[consumer topic-partitions]
(.pause ^KafkaConsumer consumer ^Collection topic-partitions))
(defn resume-consumer!
"Resume `TopicPartition` collection `topic-partitions` from `KafkaConsumer` instance `consumer`."
[consumer topic-partitions]
(.resume ^KafkaConsumer consumer ^Collection topic-partitions))
(defn get-and-set!
"Atomically assign value `x` to atom `a` and return previously assigned value."
[a x]
(loop []
(let [y @a]
(if (compare-and-set! a y x)
y (recur)))))
(defn commit-channel
"Return a new channel instance for record committing."
[] (atom {}))
(defn request-commit!
"Push `ConsumerRecord` `record` with optional string metadata `metadata` on commit channel `chan`."
([chan record] (request-commit! chan record ""))
([chan ^ConsumerRecord record ^String metadata]
(swap! chan assoc
(TopicPartition. (.topic record) (.partition record))
(OffsetAndMetadata. (inc (.offset record)) metadata))))
(defn poll-records
"Return a flow producing `ConsumerRecord`s by polling from `KafkaConsumer` instance `consumer`, pausing and resuming
each topic partition according to downstream backpressure, and committing records pushed on commit channel `chan`.
Blocking operations are performed on executor `exec`."
[chan ^KafkaConsumer consumer exec]
(m/ap
(let [resume (atom #{})]
(loop []
(m/? (commit-consumer consumer (get-and-set! chan {}) 1000 exec))
(let [^ConsumerRecords consumer-records (m/? (poll-consumer consumer 1000 exec))
topic-partitions (.partitions consumer-records)]
(pause-consumer! consumer topic-partitions)
(m/amb=
(loop [topic-partitions (seq topic-partitions)]
(if-some [[^TopicPartition topic-partition & topic-partitions] topic-partitions]
(m/amb> (m/?> (m/seed (.records consumer-records topic-partition)))
(do (swap! resume conj topic-partition)
(recur topic-partitions)))
(m/amb>)))
(do (resume-consumer! consumer (get-and-set! resume #{}))
(recur))))))))
(comment
(defn process-and-commit [process consumer exec]
(m/reduce {} nil
(m/ap (let [commits (commit-channel)
record (m/?> (poll-records commits consumer exec))]
(m/? (process record))
(request-commit! commits record)))))
(defn process-record [record]
(m/sp
(println "Processing" record)
(m/? (m/sleep (rand-int 2000)))
(println "Completed" record)))
(defn process-topics [& args]
(m/sp
(let [consumer (apply init-consumer args)]
(try (m/? (process-and-commit process-record consumer m/blk))
(finally (m/? (m/compel (close-consumer consumer 1000 m/blk))))))))
(def config
{ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG "localhost:9092"
ConsumerConfig/GROUP_ID_CONFIG "test-consumer-123"
ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringDeserializer"
ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG "org.apache.kafka.common.serialization.StringDeserializer"
ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG "false"
ConsumerConfig/MAX_POLL_RECORDS_CONFIG "5"
ConsumerConfig/AUTO_OFFSET_RESET_CONFIG "earliest"})
(def cancel
((process-topics config "quickstart-events")
(fn [_] (println "Done."))
(fn [^Throwable e] (.printStackTrace e))))
(cancel)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment