Skip to content

Instantly share code, notes, and snippets.

@mauricioszabo
Created June 13, 2021 01:29
Show Gist options
  • Save mauricioszabo/4aab32c7898a8b126da5eaf3bb314037 to your computer and use it in GitHub Desktop.
Save mauricioszabo/4aab32c7898a8b126da5eaf3bb314037 to your computer and use it in GitHub Desktop.
Parallel Consumer Example
[io.confluent.parallelconsumer/parallel-consumer-core "0.3.0.2"]
[fundingcircle/jackdaw "0.8.0"]
(ns parallel-test
(:require [jackdaw.client :as jack-client]
[jackdaw.data :as jack-data])
(:import [java.util.function Consumer]
[io.confluent.parallelconsumer
ParallelStreamProcessor
ParallelConsumerOptions
ParallelConsumerOptions$CommitMode
ParallelConsumerOptions$ProcessingOrder]))
(defn- ^Consumer as-consumer [f]
(reify Consumer (accept [_ t] (f t))))
(defn- prepare-handler [logger producer orig-handler dlx-topic]
#(try
(let [msg (-> %
jack-data/datafy
(update :value serdes/deserialize))]
(logger :info "Received message")
(orig-handler msg)
(logger :info "Message processed successfully"))
(catch Throwable t
(logger :error "Failed to process message")
@(jack-client/produce! producer
dlx-topic
nil
(.timestamp %)
(.key %)
(.value %))
(logger :info "Message sent to deadletter"))))
(defn- prepare-kafka-parallel [config logger producer]
(let [consumer (jack-client/consumer ; or any consumer, really
{"group.id" (:group-id config)
"bootstrap.servers" (:bootstrap-servers config)
"auto.offset.reset" "earliest"
"enable.auto.commit" false
"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})
options (.. (ParallelConsumerOptions/builder)
(ordering ParallelConsumerOptions$ProcessingOrder/UNORDERED)
(defaultMessageRetryDelay (java.time.Duration/ofMillis 500))
(maxConcurrency (:thread-pool-size config))
(commitMode ParallelConsumerOptions$CommitMode/PERIODIC_CONSUMER_ASYNCHRONOUS)
(consumer consumer)
build)
pool (doto (ParallelStreamProcessor/createEosStreamProcessor options)
(.subscribe [(-> config :topics :name)])
(.poll (as-consumer (prepare-handler logger
producer
(:handler config)
(-> config :topics :dlx)))))]
(reify java.io.Closeable
(close [_]
(.close pool)
(.close producer)))))
(def config
{:topics {:name "some.topic" :dlx "some.topic.mygroup.dlx"}
:group-id "mygroup"
:bootstrap-servers "localhost:9092"
:thread-pool-size 10
:handler #(do
(Thread/sleep (random-int 10000))
(prn :MESSAGE %))})
(def producer
...instantiate-a-producer...)
(def logger
...instantiate-a-logger-fn...)
(with-open [consumer (prepare-kafka-parallel config logger producer)]
...produce-some-msgs...
...at-least-20-to-see-parallel...
(Thread/sleep 50000))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment