Skip to content

Instantly share code, notes, and snippets.

@mccraigmccraig
Created December 19, 2019 11:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mccraigmccraig/e77ae00c08a4700e3f73ab27e528977e to your computer and use it in GitHub Desktop.
Save mccraigmccraig/e77ae00c08a4700e3f73ab27e528977e to your computer and use it in GitHub Desktop.
(defrecord JavaMessagingProducer [kafka-producer producer-opts]
c.kafka/MessagingProducer
(send [_ topic message]
(let [r (deferred/deferred)
pr (producer-record
topic
nil ;; 0 ;; don't specify a partition !
(routable-item-key message)
message)
cb (reify Callback
(onCompletion [_ record-metadata error]
(if record-metadata
(deferred/success!
r
{:topic (.topic record-metadata)
:partition (.partition record-metadata)
:offset (.offset record-metadata)})
(deferred/error! r error))))]
(.send
kafka-producer
pr
cb)
r))
(close [this]
(info "closing java-messaging-producer" this)
(.close kafka-producer)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment