Skip to content

Instantly share code, notes, and snippets.

@creese
Last active August 29, 2015 14:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save creese/063193bd3b6fce1ae480 to your computer and use it in GitHub Desktop.
Save creese/063193bd3b6fce1ae480 to your computer and use it in GitHub Desktop.
code snippet
(defn- in-que
"Returns a message queue to consume."
[conn [type exchange-name] routing-keys binding-queue]
(let [ch (chan 1)
amqp-chan (amqp.chan/open conn)
handler (fn [amqp-chan
{content-type :content-type
message-type :type
routing-key :routing-key
message-id :message-id
timestamp :timestamp}
data]
(put! ch {:type (keyword message-type)
:routing-key routing-key
:message-id message-id
:timestamp timestamp
:data (read-data {:data data
:content-type content-type})}))]
(amqp.exchange/declare amqp-chan
exchange-name
"topic"
{:durable true :auto-delete false})
(amqp.queue/declare amqp-chan binding-queue {:auto-delete false})
(doseq [rk routing-keys]
(println (format ";; [bind] %s" {:channel amqp-chan
:queue binding-queue
:exchange exchange-name
:routing-key rk}))
(amqp.queue/bind amqp-chan binding-queue exchange-name {:routing-key rk}))
(amqp.consumer/subscribe amqp-chan binding-queue handler {:auto-ack true})
(println (format ";; [subscribe] %s" {:channel amqp-chan
:queue binding-queue}))
[type ch]))
(defn consume!
"Takes a queue and consumes messages until the queue is closed."
[queue f]
(let [[_ ch] queue]
(go
(while true
(let [{:keys [type routing-key message-id timestamp data]} (<! ch)]
(f type routing-key message-id timestamp data))))))
(defmacro defconsumer
"Returns a consumer that passes messages to handler. Handler is a sum type
(e.g., [:command keypr.thumper.processor/handler])."
[consumer [type handler]]
`(defn ~consumer [system#]
(let [conn# (amqp/connect (:rmq @system#))
exchange-name# (->> (:queue @system#)
(filter #(= (first %) ~type))
(first)
(last))
queue# (que conn#
[~type exchange-name#]
(:routing-keys @system#)
(:name @system#))]
(do
(println ";; [consumer] READY")
(consume! queue# (~handler system#))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment