-
-
Save creese/063193bd3b6fce1ae480 to your computer and use it in GitHub Desktop.
code snippet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn- in-que | |
"Returns a message queue to consume." | |
[conn [type exchange-name] routing-keys binding-queue] | |
(let [ch (chan 1) | |
amqp-chan (amqp.chan/open conn) | |
handler (fn [amqp-chan | |
{content-type :content-type | |
message-type :type | |
routing-key :routing-key | |
message-id :message-id | |
timestamp :timestamp} | |
data] | |
(put! ch {:type (keyword message-type) | |
:routing-key routing-key | |
:message-id message-id | |
:timestamp timestamp | |
:data (read-data {:data data | |
:content-type content-type})}))] | |
(amqp.exchange/declare amqp-chan | |
exchange-name | |
"topic" | |
{:durable true :auto-delete false}) | |
(amqp.queue/declare amqp-chan binding-queue {:auto-delete false}) | |
(doseq [rk routing-keys] | |
(println (format ";; [bind] %s" {:channel amqp-chan | |
:queue binding-queue | |
:exchange exchange-name | |
:routing-key rk})) | |
(amqp.queue/bind amqp-chan binding-queue exchange-name {:routing-key rk})) | |
(amqp.consumer/subscribe amqp-chan binding-queue handler {:auto-ack true}) | |
(println (format ";; [subscribe] %s" {:channel amqp-chan | |
:queue binding-queue})) | |
[type ch])) | |
(defn consume! | |
"Takes a queue and consumes messages until the queue is closed." | |
[queue f] | |
(let [[_ ch] queue] | |
(go | |
(while true | |
(let [{:keys [type routing-key message-id timestamp data]} (<! ch)] | |
(f type routing-key message-id timestamp data)))))) | |
(defmacro defconsumer | |
"Returns a consumer that passes messages to handler. Handler is a sum type | |
(e.g., [:command keypr.thumper.processor/handler])." | |
[consumer [type handler]] | |
`(defn ~consumer [system#] | |
(let [conn# (amqp/connect (:rmq @system#)) | |
exchange-name# (->> (:queue @system#) | |
(filter #(= (first %) ~type)) | |
(first) | |
(last)) | |
queue# (que conn# | |
[~type exchange-name#] | |
(:routing-keys @system#) | |
(:name @system#))] | |
(do | |
(println ";; [consumer] READY") | |
(consume! queue# (~handler system#)))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment