Skip to content

Instantly share code, notes, and snippets.

@mccraigmccraig
Created October 25, 2018 22:08
Show Gist options
  • Save mccraigmccraig/d7c0f2c9f400ff575ad27c083980e02e to your computer and use it in GitHub Desktop.
Save mccraigmccraig/d7c0f2c9f400ff575ad27c083980e02e to your computer and use it in GitHub Desktop.
(ns er-model.connectors.kafka.java-producer
(:require
[taoensso.timbre :refer [debug info warn]]
[er-model.connectors.kafka.zk :as kafka.zk]
[er-model.connectors.kafka :as c.kafka]
[er-model.connectors.kafka.serializers :as kafka.ser]
[er-model.connectors.kafka.deserializers :as kafka.deser]
[er-model.connectors.kafka.config :as cfg]
[clj-uuid :as uuid]
[manifold.deferred :as deferred])
(:import
[java.io Closeable]
[java.util Properties]
[org.apache.kafka.clients.producer KafkaProducer ProducerRecord Callback]))
(defn producer-record
[topic partition key value]
(if (nil? partition)
(ProducerRecord. topic key value)
(ProducerRecord. topic (int partition) key value)))
(defrecord JavaMessagingProducer [kafka-producer producer-opts]
c.kafka/MessagingProducer
(send [_ topic message]
(let [r (deferred/deferred)
pr (producer-record
topic
0
(str (uuid/v1))
message)
cb (reify Callback
(onCompletion [_ record-metadata error]
(if record-metadata
(deferred/success!
r
{:topic (.topic record-metadata)
:partition (.partition record-metadata)
:offset (.offset record-metadata)})
(deferred/error! r error))))]
(.send
kafka-producer
pr
cb)
r))
(close [this]
(info "closing java-messaging-producer" this)
(.close kafka-producer)))
(defn create-java-messaging-producer
[{zookeeper-opts :zookeeper-opts
producer-opts :producer-opts
:as opts}]
(info "create-java-messaging-producer" opts)
(let [props (cfg/config-properties-with-brokers
zookeeper-opts
producer-opts)
kp (KafkaProducer.
props
(kafka.ser/string-serializer)
(kafka.ser/edn-serializer))
mp (map->JavaMessagingProducer
{:producer-opts producer-opts
:kafka-producer kp})]
[mp
#(c.kafka/close mp)]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment