Skip to content

Instantly share code, notes, and snippets.

@pjagielski
Last active August 29, 2015 14:06
Show Gist options
  • Save pjagielski/10adaf45ff01d647585d to your computer and use it in GitHub Desktop.
Save pjagielski/10adaf45ff01d647585d to your computer and use it in GitHub Desktop.
(ns kafka.core
(:require
[clojure.core.async :as async]
[clj-kafka.core :refer [with-resource]]
[clj-kafka.producer :refer [producer send-message message]]
[clj-kafka.consumer.zk :refer [consumer messages shutdown]])
(:gen-class))
(def p (producer {"metadata.broker.list" "localhost:9092"
"serializer.class" "kafka.serializer.DefaultEncoder"
"partitioner.class" "kafka.producer.DefaultPartitioner"}))
(def config {"zookeeper.connect" "localhost:2181"
"group.id" "clj-kafka.consumer"
"auto.offset.reset" "largest"
"auto.commit.enable" "false"})
(defn kafka-send []
(println "sending")
(send-message p (message "test" (.getBytes "this is my message"))))
(defn kafka-receive [chn]
(with-resource [c (consumer config)]
shutdown
(doseq [message (messages c "test")]
(println "received")
(async/>!! chn message))))
(defn -main
[& args]
(let [chn (async/chan)]
(async/thread (kafka-receive chn))
(async/go (while true
(async/<! chn)
(async/<! (async/timeout 100))
(kafka-send)))
(async/go (while true
(async/<! (async/timeout 500))
(kafka-send)))
(Thread/sleep 50000)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment