Created
September 20, 2017 01:15
-
-
Save tristanstraub/34f6056908fb310844672083abd4393d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[org.apache.kafka/kafka-clients "0.11.0.0"] | |
[spootnik/kinsky "0.1.16"] | |
(ns space.kafka | |
(:require [kinsky.client :as client]) | |
(:import [java.util ArrayList Properties] | |
org.apache.kafka.clients.consumer.KafkaConsumer | |
[org.apache.kafka.clients.producer KafkaProducer ProducerRecord] | |
[org.apache.kafka.common.serialization StringDeserializer StringSerializer] | |
org.apache.kafka.common.TopicPartition | |
org.apache.kafka.common.serialization.Serializer | |
org.apache.kafka.common.serialization.Deserializer)) | |
(def kafka-ip "172.19.0.3:9092") | |
(defn consumer [topic] | |
(let [p (Properties.)] | |
(.put p "bootstrap.servers" kafka-ip) | |
(.put p "group.id" "consumer-a-b-c-d-e") | |
;; (.put p "key.deserializer" (.getName StringDeserializer)) | |
;; (.put p "value.deserializer" (.getName StringDeserializer)) | |
(.put p "auto.offset.reset" "earliest") | |
(.put p "enable.auto.commit" "false") | |
(let [c (KafkaConsumer. p | |
^Deserializer (client/keyword-deserializer) | |
^Deserializer (client/edn-deserializer))] | |
(let [partitions (ArrayList.)] | |
(let [topic-partitions | |
(->> (.partitionsFor c topic) | |
(map (fn [partition] | |
(TopicPartition. topic (.partition partition)))))] | |
(println :partitions topic-partitions) | |
(doseq [p topic-partitions] | |
(println :p p) | |
(.add partitions p)) | |
(.assign c partitions))) | |
c))) | |
(defn consume | |
[topic timeout] | |
(let [c (consumer topic)] | |
(try | |
(iterator-seq (.iterator (.poll c timeout))) | |
(finally | |
(.close c))))) | |
(defn send! [topic message] | |
(let [p (Properties.)] | |
client/edn-serializer | |
(.put p "bootstrap.servers" kafka-ip) | |
(.put p "group.id" "producer-a-b-c-d") | |
(.put p "key.serializer" (.getName StringSerializer)) | |
(.put p "value.serializer" (.getName StringSerializer)) | |
(.put p "auto.offset.reset" "earliest") | |
(.put p "enable.auto.commit" "false") | |
(let [p (KafkaProducer. p)] | |
(try | |
(.send p (ProducerRecord. topic "0" message)) | |
(.flush p) | |
(finally | |
(.close p)))))) | |
(defn read! [c] | |
(->> (.poll c 100) | |
;; (map (fn [value] | |
;; [(.offset value) (.value value)])) | |
)) | |
(-> (map #(.value %) (consume "points" 100))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment