Skip to content

Instantly share code, notes, and snippets.

@fr33m0nk
Forked from favila/kafka-workbench.clj
Created November 18, 2022 18:14
Show Gist options
  • Save fr33m0nk/f22a0ac511daeb14dfa9369082d617a6 to your computer and use it in GitHub Desktop.
Save fr33m0nk/f22a0ac511daeb14dfa9369082d617a6 to your computer and use it in GitHub Desktop.
code to fiddle with kafka behaviors
(ns kafka-workbench
(:require [franzy.clients.consumer.protocols :as c]
[franzy.clients.producer.protocols :as p]
[franzy.serialization.serializers :as serializers]
[franzy.serialization.deserializers :as deserializers]
[franzy.clients.consumer.client :as consumer]
[franzy.clients.producer.client :as producer]))
(def kafka-brokers [])
(def producer-config
{:bootstrap.servers kafka-brokers
:acks "all"
:retries 0
:batch.size 16384
:linger.ms 10
:buffer.memory 33554432})
(def consumer-config
{:bootstrap.servers kafka-brokers
:group.id "test-retries"
:enable.auto.commit false
:auto.offset.reset :none})
(defn kafka-put [pc topic value]
(with-open [p (producer/make-producer pc
(serializers/string-serializer)
(serializers/string-serializer))]
(p/send-sync! p topic 0 "key" value {})))
(defn kafka-get-via-assign [cc topic]
(with-open [c (consumer/make-consumer cc
(serializers/string-serializer)
(serializers/string-serializer))]
(c/assign-partitions! c [{:topic topic :partition 0}])
(run! (fn [{:keys [topic partition offset value]}]
(println (format "%s %s %s %s"
topic partition offset value)))
(c/poll! c))))
(defn kafka-get-via-subscribe [cc topic]
(with-open [c (consumer/make-consumer cc
(deserializers/string-deserializer)
(deserializers/string-deserializer))]
(c/subscribe-to-partitions! c [{:topic topic :partition 0}])
(run! (fn [{:keys [topic partition offset value]}]
(println (format "%s %s %s %s"
topic partition offset value)))
(c/poll! c))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment