public

Simple Implementenation of AmqpAdmin which maintains all declared Amqp objects in memory allowing automatic redeclaration of them on reconnect

  • Download Gist
ReDeclaringRabbitAdmin.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
package com.bostontechnologies.cscore.amqp
 
import com.weiglewilczek.slf4s.Logging
import collection.JavaConversions._
import collection.mutable.{SynchronizedSet, HashSet, ConcurrentMap}
import java.lang.String
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import org.springframework.amqp.core.{Queue, Binding, AmqpAdmin, Exchange}
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.connection.{ChannelListener, Connection, ConnectionListener}
import com.rabbitmq.client.Channel
 
class ReDeclaringRabbitAdmin(wrappedRabbitAdmin: RabbitAdmin,
addConnectionListener: ConnectionListener => Unit,
addChannelListener: ChannelListener => Unit = cl => {})
extends AmqpAdmin with Logging {
 
val exchanges: ConcurrentMap[String, Exchange] = new ConcurrentHashMap[String, Exchange]
val queues: ConcurrentMap[String, Queue] = new ConcurrentHashMap[String, Queue]
val bindings = new HashSet[Binding] with SynchronizedSet[Binding]
 
//Add listener to re-declare the AMQP objects on reconnect
addConnectionListener(new ConnectionListener {
 
def onCreate(connection: Connection) {
//reDeclareAmqpObjects()
}
 
def onClose(connection: Connection) {}
})
 
addChannelListener(new ChannelListener {
def onCreate(channel: Channel, transactional: Boolean) {
reDeclareAmqpObjects()
}
})
 
def declareExchange(exchange: Exchange) {
logger.info("Declaring Exchange: " + exchange.getName)
wrappedRabbitAdmin.declareExchange(exchange)
exchanges += exchange.getName -> exchange
}
 
def deleteExchange(exchangeName: String) = {
logger.info("Deleting Exchange: " + exchangeName + ". It will not be redeclared on reconnect")
exchanges -= exchangeName
wrappedRabbitAdmin.deleteExchange(exchangeName)
}
 
def declareQueue(): Queue = {
val q = wrappedRabbitAdmin.declareQueue()
logger.info("Declared Queue: " + q.getName)
queues += q.getName -> q
q
}
 
def declareQueue(queue: Queue) {
logger.info("Declaring Queue: " + queue.getName)
wrappedRabbitAdmin.declareQueue(queue)
queues += queue.getName -> queue
}
 
def deleteQueue(queueName: String) = {
logger.info("Deleting Queue: " + queueName + ". It will not be redeclared on reconnect")
queues -= queueName
wrappedRabbitAdmin.deleteQueue(queueName)
}
 
def deleteQueue(queueName: String, unused: Boolean, empty: Boolean) {
logger.info("Conditionally Deleting Queue: " + queueName + ". It will not be redeclared on reconnect")
//Assume queue is deleted and prevent it from being declared on reconnect
queues -= queueName
wrappedRabbitAdmin.deleteQueue(queueName, unused, empty)
}
 
def purgeQueue(queueName: String, noWait: Boolean) {
wrappedRabbitAdmin.purgeQueue(queueName, noWait)
}
 
def declareBinding(binding: Binding) {
logger.info(bindingDeclarationInfo(binding))
bindings += binding //Don't care if it already exists
wrappedRabbitAdmin.declareBinding(binding)
}
 
def removeBinding(binding: Binding) {
bindings -= binding
wrappedRabbitAdmin.removeBinding(binding)
}
 
private val reDeclareAmqpObjects = noOpIfInProgress(() => {
logger.debug("Redeclaring Amqp Objects")
val localExchanges = exchanges.values.toArray
val localQueues = queues.values.toArray
val localBindings = bindings.clone()
 
for (exchange <- localExchanges) {
logger.info("Redeclaring Exchange: " + exchange.getName)
if (!exchange.isDurable) {
logger.warn("Auto-declaring a non-durable Exchange (" + exchange.getName +
"). It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.")
}
if (exchange.isAutoDelete) {
logger.warn("Auto-declaring an auto-delete Exchange (" + exchange.getName +
"). It will be deleted by the broker if not in use (if all bindings are deleted), but will only be redeclared if the connection is closed and reopened.")
}
wrappedRabbitAdmin.declareExchange(exchange)
}
 
for (queue <- localQueues) {
logger.info("Redeclaring Queue: " + queue.getName)
if (!queue.isDurable) {
logger.warn("Redeclaring a non-durable Queue (" + queue.getName +
"). It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.")
}
if (queue.isAutoDelete) {
logger.warn("Redeclaring an auto-delete Queue (" + queue.getName +
"). It will be deleted by the broker if not in use, and all messages will be lost. Redeclared when the connection is closed and reopened.")
}
if (queue.isExclusive) {
logger.warn("Redeclaring an exclusive Queue (" + queue.getName +
"). It cannot be accessed by consumers on another connection, and will be redeclared if the connection is reopened.")
}
wrappedRabbitAdmin.declareQueue(queue)
}
 
for (binding <- localBindings) {
logger.info(bindingDeclarationInfo(binding))
wrappedRabbitAdmin.declareBinding(binding)
}
 
logger.debug("ReDeclarations finished")
})
 
private def bindingDeclarationInfo(b: Binding) =
"Declaring Binding [" +
b.getDestination + " (" + b.getDestinationType + ")] to exchange [" +
b.getExchange + "] with routing key [" + b.getRoutingKey + "]"
 
/**
* This wraps the action in a function which will do nothing
* if called while another is in progress
*/
private def noOpIfInProgress(action: () => Unit): () => Unit = {
val inProgress: AtomicBoolean = new AtomicBoolean(false)
 
() => {
if (inProgress.compareAndSet(false, true)) {
try {
action()
}
finally {
inProgress.compareAndSet(true, false)
}
}
}
}
}
ReDeclaringRabbitAdminSpec.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
/* Ignore these tests for now!!!!!!!!!!!!!!!!!!
* I am experimenting with getting this to redeclare things after consumer reconsumes
 
package com.bostontechnologies.cscore.amqp
 
import org.specs.SpecificationWithJUnit
import org.specs.mock.Mockito
import org.springframework.amqp.rabbit.connection.ConnectionListener
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.core.{BindingBuilder, Queue, Exchange, TopicExchange}
 
class ReDeclaringRabbitAdminSpec extends SpecificationWithJUnit with Mockito {
 
"A ReDeclaringRabbitAdminSpec" should {
 
val queue = new Queue("QueueName")
val exchange = new TopicExchange("ExchangeName")
val binding = BindingBuilder bind (queue) to (exchange) `with` ("Binding")
 
val mockAdmin = mock[RabbitAdmin]
val mockAddListener = mock[ConnectionListener => Unit]
 
"Add a connection listener" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener)
 
there was one(mockAddListener).apply(any[ConnectionListener])
}
 
"declareExchanges" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareExchange(exchange)
 
there was one(mockAdmin).declareExchange(exchange)
}
 
"deleteExchanges" in {
mockAdmin.deleteExchange(exchange.getName) returns false
 
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteExchange(exchange.getName) must beFalse
there was one(mockAdmin).deleteExchange(exchange.getName)
}
 
"declareQueue with no params" in {
mockAdmin.declareQueue() returns queue
 
val actualQueue = new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareQueue()
 
actualQueue must be equalTo (queue)
}
 
"declare specific queue" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareQueue(queue)
 
there was one(mockAdmin).declareQueue(queue)
}
 
"delete a queue" in {
mockAdmin.deleteQueue(queue.getName) returns false
 
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteQueue(queue.getName) must beFalse
there was one(mockAdmin).deleteQueue(queue.getName)
}
 
"optionally delete a queue" in {
val unused = true
val empty = false
 
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteQueue(queue.getName, unused, empty)
 
there was one(mockAdmin).deleteQueue(queue.getName, unused, empty)
}
 
"purgeQueue" in {
val noWait = true
 
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).purgeQueue(queue.getName, noWait)
 
there was one(mockAdmin).purgeQueue(queue.getName, noWait)
}
 
"declareBinding" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareBinding(binding)
 
there was one(mockAdmin).declareBinding(binding)
}
 
"removeBinding" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareBinding(binding)
 
there was one(mockAdmin).declareBinding(binding)
}
 
"re-declare Exchanges and not re-declare Exchanges which were deleted" in {
//arrange
val deletedExchange: Exchange = new TopicExchange("DeletedExchange", true, false)
val listener = newStoringAddListener
 
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener)
 
//Act
reDeclaringAdmin.declareExchange(exchange)
reDeclaringAdmin.declareExchange(deletedExchange)
reDeclaringAdmin.deleteExchange(deletedExchange.getName)
listener.value.onCreate(null)
 
//Assert
there was two(mockAdmin).declareExchange(exchange)
there was one(mockAdmin).declareExchange(deletedExchange)
}
 
"re-declare Queues and not re-declare Queues which were deleted" in {
//arrange
val deletedQueue = new Queue("DeletedQueue")
val optionallyDeletedQueue = new Queue("OptionallyDeletedQueue")
val listener = newStoringAddListener
 
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener)
 
//Act
reDeclaringAdmin.declareQueue(queue)
reDeclaringAdmin.declareQueue(deletedQueue)
reDeclaringAdmin.declareQueue(optionallyDeletedQueue)
reDeclaringAdmin.deleteQueue(deletedQueue.getName)
reDeclaringAdmin.deleteQueue(optionallyDeletedQueue.getName, true, true)
listener.value.onCreate(null)
 
//Assert
there was two(mockAdmin).declareQueue(queue)
there was one(mockAdmin).declareQueue(deletedQueue)
there was one(mockAdmin).declareQueue(optionallyDeletedQueue)
}
 
"re-declare Bindings and not re-declare Bindings which were deleted" in {
//arrange
val deletedBinding = BindingBuilder bind(queue) to(exchange) `with`("DeletedBinding")
val listener = newStoringAddListener
 
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener)
 
//Act
reDeclaringAdmin.declareBinding(binding)
reDeclaringAdmin.declareBinding(deletedBinding)
reDeclaringAdmin.removeBinding(deletedBinding)
listener.value.onCreate(null)
 
//Assert
there was two(mockAdmin).declareBinding(binding)
there was one(mockAdmin).declareBinding(deletedBinding)
}
}
 
def newStoringAddListener = new StoringAddListener()
 
class StoringAddListener() {
var value: ConnectionListener = null
 
def apply(l: ConnectionListener) {
value = l
}
}
 
object StoringAddListener {
implicit def convertToAction(addListener: StoringAddListener): ConnectionListener => Unit = addListener(_)
}
 
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.