Created
April 16, 2009 14:40
-
-
Save lenst/96442 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns net.cddr.rabbitmq | |
(:import (com.rabbitmq.client Channel Connection ConnectionFactory | |
QueueingConsumer | |
RpcClient StringRpcServer) | |
(com.rabbitmq.tools.jsonrpc JsonRpcServer JsonRpcClient) | |
(com.rabbitmq.examples HelloJsonService))) | |
;; (import '(com.rabbitmq.client Channel Connection ConnectionFactory | |
;; QueueingConsumer | |
;; RpcClient StringRpcServer) | |
;; '(com.rabbitmq.tools.jsonrpc JsonRpcServer)) | |
(def host-name "pentax") | |
(def queue-name "SimpleQueue") | |
(def rpc-queue-name "Hello") | |
(defonce connection (.newConnection (ConnectionFactory.) host-name )) | |
(defmacro with-channel | |
"Execute body with a private channel bound to local var" | |
[[var] & body] | |
`(with-open [~var (.createChannel connection)] | |
~@body)) | |
(defn runbg [fun] | |
(send-off (agent nil) (fn [_] (fun)))) | |
;;;; Send string to queue | |
(defn send-string | |
([message] | |
(send-string message {})) | |
([message {exchange :exchange, routing-key :routing, | |
:or {exchange "", routing-key queue-name}}] | |
(with-channel [ch] | |
(when (= exchange "") | |
(.queueDeclare ch routing-key)) | |
(.basicPublish ch exchange routing-key nil (.getBytes message))))) | |
;;;; AMPQ Queue as a sequence | |
;; Helper function | |
(defn delivery-seq [ch q] | |
(lazy-seq | |
(let [d (.nextDelivery q) | |
m (String. (.getBody d))] | |
(.basicAck ch (.. d getEnvelope getDeliveryTag) false) | |
(cons m (delivery-seq ch q))))) | |
(defn queue-seq [conn queue-name] | |
"Reutrn a sequence of the messages in queue with name queue-name" | |
(let [ch (.createChannel conn)] | |
(.queueDeclare ch queue-name) | |
(let [consumer (QueueingConsumer. ch)] | |
(.basicConsume ch queue-name consumer) | |
(delivery-seq ch consumer)))) | |
;;(def s (queue-seq connection queue-name)) | |
;;;; Calling a RPC service usning raw string | |
(defn call-hello [s] | |
(with-channel [channel] | |
(let [service (RpcClient. channel "" rpc-queue-name)] | |
(.stringCall service s)))) | |
;;;; Implementation of RPC service for plain string proto | |
(defn hello-handler [this request] | |
(println "Got request:" request) | |
(when (= request "stop") | |
(.terminateMainloop this)) | |
(str "Hello, " request "!")) | |
(defn hello-server [] | |
(with-channel [channel] | |
(.queueDeclare channel rpc-queue-name) | |
(with-open [server | |
(proxy [StringRpcServer] [channel rpc-queue-name] | |
(handleStringCall | |
([request] "1") | |
([request props] (hello-handler this request))))] | |
(.mainloop server)))) | |
;;(runbg hello-server) | |
;;;; Json Rpc Server | |
(defn json-server [#^Connection conn queue-name] | |
(with-channel [ch] | |
(.queueDeclare ch queue-name) | |
(let [svc (proxy [com.rabbitmq.examples.HelloJsonService] [] | |
(greeting [s] (str "hej " s)) | |
(sum [l] (reduce + l))) | |
server (JsonRpcServer. ch queue-name HelloJsonService svc)] | |
(.mainloop server)))) | |
;;(runbg #(json-server connection rpc-queue-name)) | |
;;;; Json Rpc Client | |
(defn test-json-client [] | |
(with-channel [ch] | |
(let [client (JsonRpcClient. ch "" rpc-queue-name)] | |
(println "Greeting:" | |
(.call client "greeting" (into-array ["Clojure Client"]))) | |
(let [x [1 2 3]] | |
(println "Sum" x ":" (.call client "sum" (into-array [x])))) | |
(let [service (.createProxy client HelloJsonService)] | |
(println "Greeting:" (.greeting service "Clojure Client")) | |
(let [nums [1 2 3]] | |
(println "Sum" nums ":" (.sum service nums))))))) | |
;;; rabbitmq.clj ends here |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment