Last active
December 5, 2016 11:57
-
-
Save lasiltan/f6741f7e871a43c44fe1e37780b1b08e to your computer and use it in GitHub Desktop.
Spring AMQP RabbitMQ clojure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns packaging-api.rabbitmq.main | |
(:require [clojure.tools.logging :refer [debug info]] | |
[packaging-api.properties :refer [parse-property]] | |
[cheshire.core :as json] | |
[packaging-api.rabbitmq.consumers.package :as package] | |
[rabid.core :as rabid] | |
[ovp.schema.core :refer [schema-version]] | |
[clojure.walk :refer [keywordize-keys]]) | |
(:import (org.springframework.amqp.rabbit.core RabbitTemplate ChannelAwareMessageListener RabbitAdmin) | |
(org.springframework.amqp.rabbit.connection CachingConnectionFactory) | |
(org.springframework.amqp.rabbit.listener SimpleMessageListenerContainer) | |
(org.springframework.amqp.core AcknowledgeMode Message MessageProperties Queue Exchange TopicExchange DirectExchange Binding Binding$DestinationType))) | |
(def connection (atom nil)) | |
(def rabbit (atom nil)) | |
(def admin (atom nil)) | |
(def consumers (atom ())) | |
(defn start! [] | |
(let [cf (doto (CachingConnectionFactory. (parse-property :mq.hosts) (parse-property :mq.port)) | |
(.setVirtualHost (parse-property :mq.vhost)) | |
(.setUsername (parse-property :mq.user)) | |
(.setPassword (parse-property :mq.pass))) | |
rabbit-template (RabbitTemplate. cf) | |
rabbit-admin (RabbitAdmin. cf)] | |
(swap! connection (fn [x] cf)) | |
(swap! rabbit (fn [x] rabbit-template)) | |
(swap! admin (fn [x] rabbit-admin)))) | |
(defn publish! [exchange routing-key payload-map headers-map] | |
(let [body-bytes (-> payload-map | |
(json/generate-string) | |
(.getBytes)) | |
message-properties (MessageProperties.) | |
_ (doseq [[k v] headers-map] | |
(.setHeader message-properties k v)) | |
message (Message. body-bytes message-properties)] | |
(.send @rabbit exchange routing-key message))) | |
(defn consumer [handler-fn] | |
(fn [msg channel] | |
(let [delivery-tag (-> msg .getMessageProperties .getDeliveryTag) | |
headers (-> msg .getMessageProperties .getHeaders) | |
body (String. (.getBody msg))] | |
(if (handler-fn headers body) | |
(.basicAck channel delivery-tag false) | |
(.basicReject channel delivery-tag false))))) | |
(defn json-handler [handler-fn] | |
(fn [headers body] | |
(let [msg (json/parse-string body true) | |
headers (-> (into {} headers) | |
keywordize-keys)] | |
(handler-fn headers msg)))) | |
(defn consume! [queue handler-fn] | |
(let [listener (reify ChannelAwareMessageListener | |
(onMessage [this msg channel] ((consumer handler-fn) msg channel))) | |
container (doto (SimpleMessageListenerContainer. @connection) | |
(.setMessageListener listener) | |
(.setQueueNames (into-array String [queue])) | |
(.setAcknowledgeMode AcknowledgeMode/MANUAL) | |
(.start))] | |
(swap! consumers conj container))) | |
(defn consume-all-from-queue! [queue messages timeout] | |
(let [msg (.receive @rabbit queue timeout)] | |
(if msg | |
(let [headers (->> msg | |
.getMessageProperties | |
.getHeaders | |
(into {}) | |
keywordize-keys) | |
body (-> msg | |
(.getBody) | |
(String.) | |
(json/parse-string true))] | |
(swap! messages conj [headers body]) | |
(recur queue messages timeout)) | |
@messages))) | |
(defn consume-queued! | |
([queue] | |
(consume-all-from-queue! queue (atom []) 200)) | |
([queue timeout] | |
(consume-all-from-queue! queue (atom []) timeout))) | |
(defn declare-queue! [name {:keys [exclusive auto-delete durable]}] | |
(let [queue (Queue. name durable exclusive auto-delete)] | |
(.declareQueue @admin queue))) | |
(defn declare-exchange! [name type {:keys [auto-delete durable]}] | |
(let [exchange (case type | |
"topic" (TopicExchange. name durable auto-delete) | |
"direct" (DirectExchange. name durable auto-delete))] | |
(.declareExchange @admin exchange))) | |
(defn declare-binding! [queue exchange routing-key] | |
(.declareBinding @admin (Binding. queue Binding$DestinationType/QUEUE exchange routing-key {}))) | |
(defn stop! [] | |
(doseq [c @consumers] | |
(do | |
(info "Stopping " c) | |
(.stop c))) | |
(when (and @connection (.isRunning @connection)) | |
(do | |
(info "Stopping " @connection) | |
(.stop @connection)))) | |
(defn start-consumers! [] | |
: Example of starting a consumer | |
(consume! "my.super.queue" (json-handler package/handle))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment