Skip to content

Instantly share code, notes, and snippets.

@tristanstraub
Created September 20, 2017 01:15
Show Gist options
  • Save tristanstraub/34f6056908fb310844672083abd4393d to your computer and use it in GitHub Desktop.
Save tristanstraub/34f6056908fb310844672083abd4393d to your computer and use it in GitHub Desktop.
[org.apache.kafka/kafka-clients "0.11.0.0"]
[spootnik/kinsky "0.1.16"]
(ns space.kafka
(:require [kinsky.client :as client])
(:import [java.util ArrayList Properties]
org.apache.kafka.clients.consumer.KafkaConsumer
[org.apache.kafka.clients.producer KafkaProducer ProducerRecord]
[org.apache.kafka.common.serialization StringDeserializer StringSerializer]
org.apache.kafka.common.TopicPartition
org.apache.kafka.common.serialization.Serializer
org.apache.kafka.common.serialization.Deserializer))
(def kafka-ip "172.19.0.3:9092")
(defn consumer [topic]
(let [p (Properties.)]
(.put p "bootstrap.servers" kafka-ip)
(.put p "group.id" "consumer-a-b-c-d-e")
;; (.put p "key.deserializer" (.getName StringDeserializer))
;; (.put p "value.deserializer" (.getName StringDeserializer))
(.put p "auto.offset.reset" "earliest")
(.put p "enable.auto.commit" "false")
(let [c (KafkaConsumer. p
^Deserializer (client/keyword-deserializer)
^Deserializer (client/edn-deserializer))]
(let [partitions (ArrayList.)]
(let [topic-partitions
(->> (.partitionsFor c topic)
(map (fn [partition]
(TopicPartition. topic (.partition partition)))))]
(println :partitions topic-partitions)
(doseq [p topic-partitions]
(println :p p)
(.add partitions p))
(.assign c partitions)))
c)))
(defn consume
[topic timeout]
(let [c (consumer topic)]
(try
(iterator-seq (.iterator (.poll c timeout)))
(finally
(.close c)))))
(defn send! [topic message]
(let [p (Properties.)]
client/edn-serializer
(.put p "bootstrap.servers" kafka-ip)
(.put p "group.id" "producer-a-b-c-d")
(.put p "key.serializer" (.getName StringSerializer))
(.put p "value.serializer" (.getName StringSerializer))
(.put p "auto.offset.reset" "earliest")
(.put p "enable.auto.commit" "false")
(let [p (KafkaProducer. p)]
(try
(.send p (ProducerRecord. topic "0" message))
(.flush p)
(finally
(.close p))))))
(defn read! [c]
(->> (.poll c 100)
;; (map (fn [value]
;; [(.offset value) (.value value)]))
))
(-> (map #(.value %) (consume "points" 100)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment