Skip to content

Instantly share code, notes, and snippets.

@tolitius
Last active April 8, 2023 17:36
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tolitius/cc968a2adcc9dccc24cf15386fc44345 to your computer and use it in GitHub Desktop.
Save tolitius/cc968a2adcc9dccc24cf15386fc44345 to your computer and use it in GitHub Desktop.
event listener with multiple kafka consumer threads

starting multiple kafka consumer threads

  • Example below is based on gregor, but it could use other Kafka bindings.
  • Long CamelCased things come from Java
  • defstate comes from mount
(deftype BlahBlahThreadFactory [name ^AtomicInteger thread-counter]
ThreadFactory
(newThread [_ r]
(doto
(Thread. r)
(.setName (format "%s-%d" name (.getAndIncrement thread-counter)))
(.setDaemon true)
(.setUncaughtExceptionHandler
(reify Thread$UncaughtExceptionHandler
(uncaughtException [_ thread ex]
(error (format "Error in thread id: %s name: %s" (.getId thread) (.getName thread)) ex)))))))
(defn new-executor
([] (new-executor 1))
([num-threads]
(Executors/newFixedThreadPool num-threads
(BlahBlahThreadFactory. "blah-blah-runner"
(AtomicInteger. 0)))))
(defn consume [consumer process running? ms n]
(info "starting" (inc n) "consumer")
(while @running?
(try
(let [consumer-records (gregor/poll consumer ms)]
(process consumer consumer-records)
(gregor/commit-offsets! consumer))
(catch Throwable t
(error "kafka: could not consume a message" t))))
(gregor/close consumer))
(defn consumer [conf]
(->> (edn-to-consumer conf)
(apply gregor/consumer)))
(defn run-consumers [process {:keys [threads poll-ms] :as conf}]
(let [running? (atom true)
pool (new-executor (if (number? threads)
threads
42))]
(dotimes [t threads]
(let [c (consumer (dissoc conf :threads :poll-ms))]
(info "subscribing to:" (gregor/subscription c))
(.submit pool #(consume c process running? poll-ms t))))
(info "started" threads "consumers ->" conf)
{:pool pool :running? running?}))
(defn stop-consumers [{:keys [pool running? consumers]}]
(reset! running? false)
(.shutdownNow pool))
(defn process [c batch]
(let [bsize (count batch)]
(when (pos? bsize)
(info "received" bsize "events")
(doseq [event batch]
(->> (:value event)
;; your logic here <<<<<<
)))))
(defstate event-listener :start (run-consumers process
{... your config})
:stop (stop-consumers event-listener))
@nbomberger
Copy link

nbomberger commented Dec 30, 2016

I am trying to run kafka single threaded consumer. Having issues with it. I have tried the following

(def run-kafka (atom true))
(def consumer (atom nil))

(defn start-customer-history-consumer []
  (reset! consumer (kafka/consumer consumer-url
                                   (:consumer-group consumer-config)
                                   [(:topic consumer-config)]
                                   consumer-props))
  (future (while @run-kafka
            (let [consumer-records (kafka/poll @consumer)
                  values (process-records consumer-records)]
              (doseq [v values]
                (model/save-history-event v))
              (kafka/commit-offsets! @consumer)))))```

It seems to die or not continue running.  Any thoughts?  

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment