Skip to content

Instantly share code, notes, and snippets.

@ekhall
Created December 11, 2015 20:41
Show Gist options
  • Save ekhall/6ffb6f8ebdbf90a0ae21 to your computer and use it in GitHub Desktop.
Save ekhall/6ffb6f8ebdbf90a0ae21 to your computer and use it in GitHub Desktop.
(ns queuing.web
(:gen-class)
(:require
[langohr.core :as rmq]
[langohr.channel :as lch]
[langohr.queue :as lq]
[langohr.consumers :as lc]
[langohr.basic :as lb]
[compojure.core :refer [defroutes GET]]
[compojure.handler :refer [site]]
[compojure.route :as route]
[ring.adapter.jetty :as jetty]
[environ.core :refer [env]]))
(def ^{:const true} default-exchange-name "")
(def qname "langohr.examples.hello-world")
(def amqp-url (get (System/getenv) "CLOUDAMQP_URL" "amqp://guest:guest@localhost:5672"))
(defn message-handler
[ch {:keys [content-type delivery-tag] :as meta} ^bytes payload]
(println
(format "[consumer] Received a message: %s, delivery tag: %d, content type: %s"
(String. payload "UTF-8") delivery-tag content-type)))
(defroutes app
(GET "*" []
(let [conn (rmq/connect {:uri amqp-url})
ch (lch/open conn)]
(lb/publish ch default-exchange-name qname
"Hello!" {:content-type "text/plain" :type "greetings.hi"})
(rmq/close ch)
(rmq/close conn)
{:status 200
:headers {"Content-Type" "text/plain"}
:body "Published message!"})))
(defn -main [& [port]]
(let [conn (rmq/connect {:uri amqp-url})
ch (lch/open conn)
port (Integer. (or port (env :port) 5000))]
(lc/subscribe ch queue-name message-handler {:auto-ack true})
(println (format "[main] Connected. Channel id: %d" (.getChannelNumber ch)))
(lq/declare ch qname {:exclusive false :auto-delete true})
(lc/subscribe ch qname message-handler {:auto-ack true})
(jetty/run-jetty (site #'app) {:port port :join? false})
(.addShutdownHook (Runtime/getRuntime) (Thread. #(do (rmq/close ch) (rmq/close conn))))))
;; For interactive development:
;; (.stop server)
;; (def server (-main))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment