Skip to content

Instantly share code, notes, and snippets.

@kmyokoyama
Created July 10, 2020 00:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kmyokoyama/588511c9f2a0c48b8bf7bb15df2e769a to your computer and use it in GitHub Desktop.
Save kmyokoyama/588511c9f2a0c48b8bf7bb15df2e769a to your computer and use it in GitHub Desktop.
Simple example of a Kafka consumer with Component for development
(ns my.example.kafka.consumer
(:require [com.stuartsierra.component :as component]
[taoensso.timbre :as log]
[clojure.core.async :refer [thread]])
(:import [org.apache.kafka.clients.consumer KafkaConsumer ConsumerRecord]
[java.time Duration]
[java.util Properties List]
[org.apache.kafka.common.errors WakeupException]))
(declare loop-consumer!)
(declare print-record)
(defrecord KafkaHandler [consumer bootstrap-server group-id topic]
component/Lifecycle
(start [this]
(log/info "Starting Kafka consumer - bootstrap-servers:" bootstrap-server ", group-id:" group-id ", topic:" topic)
(let [props (doto (Properties.)
(.put "bootstrap.servers" bootstrap-server)
(.put "group.id" group-id)
(.put "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer")
(.put "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"))
consumer (KafkaConsumer. props)]
(.subscribe consumer ^List [topic])
(loop-consumer! consumer)
(assoc this :consumer consumer)))
(stop [this]
(log/info "Stopping Kafka consumer")
(.wakeup consumer)
this))
(defn make-kafka-handler
[bootstrap-server group-id topic]
(map->KafkaHandler {:bootstrap-server bootstrap-server
:group-id group-id
:topic topic}))
(defn loop-consumer!
[^KafkaConsumer consumer]
(thread (try (loop []
(let [records (.poll consumer (Duration/ofMillis 5000))]
(log/info "Has poll returned any message?" (not (.isEmpty records)))
(doseq [record records]
(print-record record)))
(recur))
(catch WakeupException _e (log/info "Received WakeupException"))
(finally (log/info "Closing KafkaConsumer")
(.close consumer)))))
(def sys (atom nil))
(defn- dev-system-map
[]
(component/system-map
:kafka-consumer (make-kafka-handler "localhost:9092" "my-group-id" "my-topic")))
(defn- start!
[]
(reset! sys (component/start (dev-system-map))))
(defn- stop!
[]
(component/stop @sys))
(defn- print-record
[^ConsumerRecord record]
(let [topic (.topic record)
partition (.partition record)
offset (.offset record)
key (.key record)
value (.value record)]
(println (str "topic: " topic
", partition: " partition
", offset: " offset
", key: " key
", value: " value))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment