Skip to content

Instantly share code, notes, and snippets.

@lasiltan
Last active December 5, 2016 11:57
Show Gist options
  • Save lasiltan/f6741f7e871a43c44fe1e37780b1b08e to your computer and use it in GitHub Desktop.
Save lasiltan/f6741f7e871a43c44fe1e37780b1b08e to your computer and use it in GitHub Desktop.
Spring AMQP RabbitMQ clojure
(ns packaging-api.rabbitmq.main
(:require [clojure.tools.logging :refer [debug info]]
[packaging-api.properties :refer [parse-property]]
[cheshire.core :as json]
[packaging-api.rabbitmq.consumers.package :as package]
[rabid.core :as rabid]
[ovp.schema.core :refer [schema-version]]
[clojure.walk :refer [keywordize-keys]])
(:import (org.springframework.amqp.rabbit.core RabbitTemplate ChannelAwareMessageListener RabbitAdmin)
(org.springframework.amqp.rabbit.connection CachingConnectionFactory)
(org.springframework.amqp.rabbit.listener SimpleMessageListenerContainer)
(org.springframework.amqp.core AcknowledgeMode Message MessageProperties Queue Exchange TopicExchange DirectExchange Binding Binding$DestinationType)))
(def connection (atom nil))
(def rabbit (atom nil))
(def admin (atom nil))
(def consumers (atom ()))
(defn start! []
(let [cf (doto (CachingConnectionFactory. (parse-property :mq.hosts) (parse-property :mq.port))
(.setVirtualHost (parse-property :mq.vhost))
(.setUsername (parse-property :mq.user))
(.setPassword (parse-property :mq.pass)))
rabbit-template (RabbitTemplate. cf)
rabbit-admin (RabbitAdmin. cf)]
(swap! connection (fn [x] cf))
(swap! rabbit (fn [x] rabbit-template))
(swap! admin (fn [x] rabbit-admin))))
(defn publish! [exchange routing-key payload-map headers-map]
(let [body-bytes (-> payload-map
(json/generate-string)
(.getBytes))
message-properties (MessageProperties.)
_ (doseq [[k v] headers-map]
(.setHeader message-properties k v))
message (Message. body-bytes message-properties)]
(.send @rabbit exchange routing-key message)))
(defn consumer [handler-fn]
(fn [msg channel]
(let [delivery-tag (-> msg .getMessageProperties .getDeliveryTag)
headers (-> msg .getMessageProperties .getHeaders)
body (String. (.getBody msg))]
(if (handler-fn headers body)
(.basicAck channel delivery-tag false)
(.basicReject channel delivery-tag false)))))
(defn json-handler [handler-fn]
(fn [headers body]
(let [msg (json/parse-string body true)
headers (-> (into {} headers)
keywordize-keys)]
(handler-fn headers msg))))
(defn consume! [queue handler-fn]
(let [listener (reify ChannelAwareMessageListener
(onMessage [this msg channel] ((consumer handler-fn) msg channel)))
container (doto (SimpleMessageListenerContainer. @connection)
(.setMessageListener listener)
(.setQueueNames (into-array String [queue]))
(.setAcknowledgeMode AcknowledgeMode/MANUAL)
(.start))]
(swap! consumers conj container)))
(defn consume-all-from-queue! [queue messages timeout]
(let [msg (.receive @rabbit queue timeout)]
(if msg
(let [headers (->> msg
.getMessageProperties
.getHeaders
(into {})
keywordize-keys)
body (-> msg
(.getBody)
(String.)
(json/parse-string true))]
(swap! messages conj [headers body])
(recur queue messages timeout))
@messages)))
(defn consume-queued!
([queue]
(consume-all-from-queue! queue (atom []) 200))
([queue timeout]
(consume-all-from-queue! queue (atom []) timeout)))
(defn declare-queue! [name {:keys [exclusive auto-delete durable]}]
(let [queue (Queue. name durable exclusive auto-delete)]
(.declareQueue @admin queue)))
(defn declare-exchange! [name type {:keys [auto-delete durable]}]
(let [exchange (case type
"topic" (TopicExchange. name durable auto-delete)
"direct" (DirectExchange. name durable auto-delete))]
(.declareExchange @admin exchange)))
(defn declare-binding! [queue exchange routing-key]
(.declareBinding @admin (Binding. queue Binding$DestinationType/QUEUE exchange routing-key {})))
(defn stop! []
(doseq [c @consumers]
(do
(info "Stopping " c)
(.stop c)))
(when (and @connection (.isRunning @connection))
(do
(info "Stopping " @connection)
(.stop @connection))))
(defn start-consumers! []
: Example of starting a consumer
(consume! "my.super.queue" (json-handler package/handle)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment