Created
September 24, 2013 20:14
-
-
Save halgari/6690589 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns rabbit-mq-test.core | |
(:require [clojure.core.async :refer :all]) | |
(:import [com.rabbitmq.client ConnectionFactory Connection Channel QueueingConsumer Consumer DefaultConsumer])) | |
(set! *warn-on-reflection* true) | |
(defprotocol Lifecycle | |
(start [service]) | |
(stop [service])) | |
(defprotocol IQueueService | |
(put-msg [service ^String msg]) | |
(get-msg [service]) | |
(commit-msg [service msg])) | |
(defn rabbit-mq->async-chan [^Channel rmq ^String queue-name auto-ack? c] | |
(let [consumer (proxy [DefaultConsumer] | |
[rmq] | |
#_(handleCancel [this tag] | |
(println "closing consumer") | |
(close! c)) | |
#_(handleCancelOk [this tag] | |
(println "closing consumer") | |
(close! c)) | |
#_(handleConsumeOk [this tag] | |
(println "consume")) | |
(handleDelivery [this tag envelope properties body] | |
(println "hey") | |
(put! c {:tag tag | |
:envelope envelope | |
:properties properties | |
:body body})) | |
#_(handleRecoverOk [this tag]) | |
#_(handleShutdownSignal [this tag sig] | |
(println "closing consumer") | |
(close! c)))] | |
(println "connectiing") | |
(.basicConsume rmq queue-name auto-ack? consumer) | |
c)) | |
(defrecord RabbitMQService [^String queue-name | |
^String host | |
^Connection connection | |
^Channel channel | |
^boolean running? | |
^ConnectionFactory factory | |
^Channel consumer-channel | |
^QueueingConsumer consumer | |
get-chan] | |
Lifecycle | |
(start [service] | |
(if running? | |
service | |
(let [factory (ConnectionFactory.) | |
_ (.setHost factory host) | |
connection (.newConnection factory) | |
channel (.createChannel connection) | |
consumer-channel (.createChannel connection) | |
consumer (QueueingConsumer. consumer-channel)] | |
(.queueDeclare channel queue-name false false false nil) | |
(.basicConsume consumer-channel queue-name false consumer) | |
#_(rabbit-mq->async-chan consumer-channel queue-name false get-chan) | |
(assoc service | |
:running? true | |
:factory factory | |
:connection connection | |
:consumer-channel consumer-channel | |
:channel channel | |
:consumer consumer)))) | |
(stop [service] | |
(if-not running? | |
service | |
(do #_(.close channel) | |
#_(.close consumer-channel) | |
(.close connection) | |
(assoc service | |
:running? false)))) | |
IQueueService | |
(put-msg [service msg] | |
(.basicPublish channel "" queue-name false nil (.getBytes msg))) | |
(get-msg [service] | |
(let [delivery (.nextDelivery consumer)] | |
(.basicAck consumer-channel (-> delivery .getEnvelope .getDeliveryTag) false)))) | |
(def service (atom nil)) | |
(defn start-service [service] | |
(reset! service (RabbitMQService. (name (gensym "ha.testing2")) "localhost" nil nil false nil nil nil (chan (dropping-buffer 100)))) | |
(reset! service (start @service))) | |
(defn stop-service [service] | |
(reset! service (stop @service))) | |
(defn get-1 [service] | |
(go | |
(println "listening") | |
(println (<! (:get-chan @service))))) | |
(defn benchmark [service count data] | |
(time | |
(let [sender (future | |
(dotimes [x count] | |
(put-msg service data))) | |
#_getter #_(future | |
(dotimes [x count] | |
(get-msg service)))] | |
@sender | |
#_@getter))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment