Skip to content

Instantly share code, notes, and snippets.

@lenst
Created April 16, 2009 14:40
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lenst/96442 to your computer and use it in GitHub Desktop.
Save lenst/96442 to your computer and use it in GitHub Desktop.
(ns net.cddr.rabbitmq
(:import (com.rabbitmq.client Channel Connection ConnectionFactory
QueueingConsumer
RpcClient StringRpcServer)
(com.rabbitmq.tools.jsonrpc JsonRpcServer JsonRpcClient)
(com.rabbitmq.examples HelloJsonService)))
;; (import '(com.rabbitmq.client Channel Connection ConnectionFactory
;; QueueingConsumer
;; RpcClient StringRpcServer)
;; '(com.rabbitmq.tools.jsonrpc JsonRpcServer))
(def host-name "pentax")
(def queue-name "SimpleQueue")
(def rpc-queue-name "Hello")
(defonce connection (.newConnection (ConnectionFactory.) host-name ))
(defmacro with-channel
"Execute body with a private channel bound to local var"
[[var] & body]
`(with-open [~var (.createChannel connection)]
~@body))
(defn runbg [fun]
(send-off (agent nil) (fn [_] (fun))))
;;;; Send string to queue
(defn send-string
([message]
(send-string message {}))
([message {exchange :exchange, routing-key :routing,
:or {exchange "", routing-key queue-name}}]
(with-channel [ch]
(when (= exchange "")
(.queueDeclare ch routing-key))
(.basicPublish ch exchange routing-key nil (.getBytes message)))))
;;;; AMPQ Queue as a sequence
;; Helper function
(defn delivery-seq [ch q]
(lazy-seq
(let [d (.nextDelivery q)
m (String. (.getBody d))]
(.basicAck ch (.. d getEnvelope getDeliveryTag) false)
(cons m (delivery-seq ch q)))))
(defn queue-seq [conn queue-name]
"Reutrn a sequence of the messages in queue with name queue-name"
(let [ch (.createChannel conn)]
(.queueDeclare ch queue-name)
(let [consumer (QueueingConsumer. ch)]
(.basicConsume ch queue-name consumer)
(delivery-seq ch consumer))))
;;(def s (queue-seq connection queue-name))
;;;; Calling a RPC service usning raw string
(defn call-hello [s]
(with-channel [channel]
(let [service (RpcClient. channel "" rpc-queue-name)]
(.stringCall service s))))
;;;; Implementation of RPC service for plain string proto
(defn hello-handler [this request]
(println "Got request:" request)
(when (= request "stop")
(.terminateMainloop this))
(str "Hello, " request "!"))
(defn hello-server []
(with-channel [channel]
(.queueDeclare channel rpc-queue-name)
(with-open [server
(proxy [StringRpcServer] [channel rpc-queue-name]
(handleStringCall
([request] "1")
([request props] (hello-handler this request))))]
(.mainloop server))))
;;(runbg hello-server)
;;;; Json Rpc Server
(defn json-server [#^Connection conn queue-name]
(with-channel [ch]
(.queueDeclare ch queue-name)
(let [svc (proxy [com.rabbitmq.examples.HelloJsonService] []
(greeting [s] (str "hej " s))
(sum [l] (reduce + l)))
server (JsonRpcServer. ch queue-name HelloJsonService svc)]
(.mainloop server))))
;;(runbg #(json-server connection rpc-queue-name))
;;;; Json Rpc Client
(defn test-json-client []
(with-channel [ch]
(let [client (JsonRpcClient. ch "" rpc-queue-name)]
(println "Greeting:"
(.call client "greeting" (into-array ["Clojure Client"])))
(let [x [1 2 3]]
(println "Sum" x ":" (.call client "sum" (into-array [x]))))
(let [service (.createProxy client HelloJsonService)]
(println "Greeting:" (.greeting service "Clojure Client"))
(let [nums [1 2 3]]
(println "Sum" nums ":" (.sum service nums)))))))
;;; rabbitmq.clj ends here
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment