Skip to content

Instantly share code, notes, and snippets.

@also
Created November 8, 2013 04:58
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save also/7366471 to your computer and use it in GitHub Desktop.
Save also/7366471 to your computer and use it in GitHub Desktop.
riemann kafka transport
(ns riemann.transport.kafka
(:import java.util.concurrent.Executors
java.util.Properties
kafka.consumer.Consumer
kafka.consumer.ConsumerConfig
kafka.javaapi.consumer.ConsumerConnector)
(:use [riemann.service :only [Service ServiceEquiv]]))
(defn- consumer-properties
[opts]
(let [props (Properties.)]
(.putAll props {"zookeeper.connect" (:zk-connect opts)
"group.id" (:group-id opts)})
props))
(defrecord KafkaConsumer [consumer-properties
topic
threads
handler
killer
core]
ServiceEquiv
(equiv? [this other]
(instance? KafkaConsumer other)); LIES
Service
(conflict? [this other]
(instance? KafkaConsumer other)) ; LIES
(reload! [this new-core]
(reset! core new-core))
(start! [this]
(locking this
(when-not @killer
(let [config (ConsumerConfig. consumer-properties)
connector (Consumer/createJavaConsumerConnector config)
streams (get (.createMessageStreams connector {topic (Integer. 1)}) topic)
pool (Executors/newFixedThreadPool threads)]
(doseq [stream streams]
(.submit pool (cast Runnable (fn [] (doseq [msg stream] (handler @core msg)))))))))))
(defn kafka-consumer
[{:keys [topic handler] :as opts}]
(KafkaConsumer. (consumer-properties opts) topic 9 handler (atom nil) (atom nil)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment