Skip to content

Instantly share code, notes, and snippets.

@halgari
Created September 24, 2013 20:14
Show Gist options
  • Save halgari/6690589 to your computer and use it in GitHub Desktop.
Save halgari/6690589 to your computer and use it in GitHub Desktop.
(ns rabbit-mq-test.core
(:require [clojure.core.async :refer :all])
(:import [com.rabbitmq.client ConnectionFactory Connection Channel QueueingConsumer Consumer DefaultConsumer]))
(set! *warn-on-reflection* true)
(defprotocol Lifecycle
(start [service])
(stop [service]))
(defprotocol IQueueService
(put-msg [service ^String msg])
(get-msg [service])
(commit-msg [service msg]))
(defn rabbit-mq->async-chan [^Channel rmq ^String queue-name auto-ack? c]
(let [consumer (proxy [DefaultConsumer]
[rmq]
#_(handleCancel [this tag]
(println "closing consumer")
(close! c))
#_(handleCancelOk [this tag]
(println "closing consumer")
(close! c))
#_(handleConsumeOk [this tag]
(println "consume"))
(handleDelivery [this tag envelope properties body]
(println "hey")
(put! c {:tag tag
:envelope envelope
:properties properties
:body body}))
#_(handleRecoverOk [this tag])
#_(handleShutdownSignal [this tag sig]
(println "closing consumer")
(close! c)))]
(println "connectiing")
(.basicConsume rmq queue-name auto-ack? consumer)
c))
(defrecord RabbitMQService [^String queue-name
^String host
^Connection connection
^Channel channel
^boolean running?
^ConnectionFactory factory
^Channel consumer-channel
^QueueingConsumer consumer
get-chan]
Lifecycle
(start [service]
(if running?
service
(let [factory (ConnectionFactory.)
_ (.setHost factory host)
connection (.newConnection factory)
channel (.createChannel connection)
consumer-channel (.createChannel connection)
consumer (QueueingConsumer. consumer-channel)]
(.queueDeclare channel queue-name false false false nil)
(.basicConsume consumer-channel queue-name false consumer)
#_(rabbit-mq->async-chan consumer-channel queue-name false get-chan)
(assoc service
:running? true
:factory factory
:connection connection
:consumer-channel consumer-channel
:channel channel
:consumer consumer))))
(stop [service]
(if-not running?
service
(do #_(.close channel)
#_(.close consumer-channel)
(.close connection)
(assoc service
:running? false))))
IQueueService
(put-msg [service msg]
(.basicPublish channel "" queue-name false nil (.getBytes msg)))
(get-msg [service]
(let [delivery (.nextDelivery consumer)]
(.basicAck consumer-channel (-> delivery .getEnvelope .getDeliveryTag) false))))
(def service (atom nil))
(defn start-service [service]
(reset! service (RabbitMQService. (name (gensym "ha.testing2")) "localhost" nil nil false nil nil nil (chan (dropping-buffer 100))))
(reset! service (start @service)))
(defn stop-service [service]
(reset! service (stop @service)))
(defn get-1 [service]
(go
(println "listening")
(println (<! (:get-chan @service)))))
(defn benchmark [service count data]
(time
(let [sender (future
(dotimes [x count]
(put-msg service data)))
#_getter #_(future
(dotimes [x count]
(get-msg service)))]
@sender
#_@getter)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment