Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Simple Implementenation of AmqpAdmin which maintains all declared Amqp objects in memory allowing automatic redeclaration of them on reconnect
package com.bostontechnologies.cscore.amqp
import com.weiglewilczek.slf4s.Logging
import collection.JavaConversions._
import collection.mutable.{SynchronizedSet, HashSet, ConcurrentMap}
import java.lang.String
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import org.springframework.amqp.core.{Queue, Binding, AmqpAdmin, Exchange}
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.connection.{ChannelListener, Connection, ConnectionListener}
import com.rabbitmq.client.Channel
class ReDeclaringRabbitAdmin(wrappedRabbitAdmin: RabbitAdmin,
addConnectionListener: ConnectionListener => Unit,
addChannelListener: ChannelListener => Unit = cl => {})
extends AmqpAdmin with Logging {
val exchanges: ConcurrentMap[String, Exchange] = new ConcurrentHashMap[String, Exchange]
val queues: ConcurrentMap[String, Queue] = new ConcurrentHashMap[String, Queue]
val bindings = new HashSet[Binding] with SynchronizedSet[Binding]
//Add listener to re-declare the AMQP objects on reconnect
addConnectionListener(new ConnectionListener {
def onCreate(connection: Connection) {
//reDeclareAmqpObjects()
}
def onClose(connection: Connection) {}
})
addChannelListener(new ChannelListener {
def onCreate(channel: Channel, transactional: Boolean) {
reDeclareAmqpObjects()
}
})
def declareExchange(exchange: Exchange) {
logger.info("Declaring Exchange: " + exchange.getName)
wrappedRabbitAdmin.declareExchange(exchange)
exchanges += exchange.getName -> exchange
}
def deleteExchange(exchangeName: String) = {
logger.info("Deleting Exchange: " + exchangeName + ". It will not be redeclared on reconnect")
exchanges -= exchangeName
wrappedRabbitAdmin.deleteExchange(exchangeName)
}
def declareQueue(): Queue = {
val q = wrappedRabbitAdmin.declareQueue()
logger.info("Declared Queue: " + q.getName)
queues += q.getName -> q
q
}
def declareQueue(queue: Queue) {
logger.info("Declaring Queue: " + queue.getName)
wrappedRabbitAdmin.declareQueue(queue)
queues += queue.getName -> queue
}
def deleteQueue(queueName: String) = {
logger.info("Deleting Queue: " + queueName + ". It will not be redeclared on reconnect")
queues -= queueName
wrappedRabbitAdmin.deleteQueue(queueName)
}
def deleteQueue(queueName: String, unused: Boolean, empty: Boolean) {
logger.info("Conditionally Deleting Queue: " + queueName + ". It will not be redeclared on reconnect")
//Assume queue is deleted and prevent it from being declared on reconnect
queues -= queueName
wrappedRabbitAdmin.deleteQueue(queueName, unused, empty)
}
def purgeQueue(queueName: String, noWait: Boolean) {
wrappedRabbitAdmin.purgeQueue(queueName, noWait)
}
def declareBinding(binding: Binding) {
logger.info(bindingDeclarationInfo(binding))
bindings += binding //Don't care if it already exists
wrappedRabbitAdmin.declareBinding(binding)
}
def removeBinding(binding: Binding) {
bindings -= binding
wrappedRabbitAdmin.removeBinding(binding)
}
private val reDeclareAmqpObjects = noOpIfInProgress(() => {
logger.debug("Redeclaring Amqp Objects")
val localExchanges = exchanges.values.toArray
val localQueues = queues.values.toArray
val localBindings = bindings.clone()
for (exchange <- localExchanges) {
logger.info("Redeclaring Exchange: " + exchange.getName)
if (!exchange.isDurable) {
logger.warn("Auto-declaring a non-durable Exchange (" + exchange.getName +
"). It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.")
}
if (exchange.isAutoDelete) {
logger.warn("Auto-declaring an auto-delete Exchange (" + exchange.getName +
"). It will be deleted by the broker if not in use (if all bindings are deleted), but will only be redeclared if the connection is closed and reopened.")
}
wrappedRabbitAdmin.declareExchange(exchange)
}
for (queue <- localQueues) {
logger.info("Redeclaring Queue: " + queue.getName)
if (!queue.isDurable) {
logger.warn("Redeclaring a non-durable Queue (" + queue.getName +
"). It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.")
}
if (queue.isAutoDelete) {
logger.warn("Redeclaring an auto-delete Queue (" + queue.getName +
"). It will be deleted by the broker if not in use, and all messages will be lost. Redeclared when the connection is closed and reopened.")
}
if (queue.isExclusive) {
logger.warn("Redeclaring an exclusive Queue (" + queue.getName +
"). It cannot be accessed by consumers on another connection, and will be redeclared if the connection is reopened.")
}
wrappedRabbitAdmin.declareQueue(queue)
}
for (binding <- localBindings) {
logger.info(bindingDeclarationInfo(binding))
wrappedRabbitAdmin.declareBinding(binding)
}
logger.debug("ReDeclarations finished")
})
private def bindingDeclarationInfo(b: Binding) =
"Declaring Binding [" +
b.getDestination + " (" + b.getDestinationType + ")] to exchange [" +
b.getExchange + "] with routing key [" + b.getRoutingKey + "]"
/**
* This wraps the action in a function which will do nothing
* if called while another is in progress
*/
private def noOpIfInProgress(action: () => Unit): () => Unit = {
val inProgress: AtomicBoolean = new AtomicBoolean(false)
() => {
if (inProgress.compareAndSet(false, true)) {
try {
action()
}
finally {
inProgress.compareAndSet(true, false)
}
}
}
}
}
/* Ignore these tests for now!!!!!!!!!!!!!!!!!!
* I am experimenting with getting this to redeclare things after consumer reconsumes
package com.bostontechnologies.cscore.amqp
import org.specs.SpecificationWithJUnit
import org.specs.mock.Mockito
import org.springframework.amqp.rabbit.connection.ConnectionListener
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.core.{BindingBuilder, Queue, Exchange, TopicExchange}
class ReDeclaringRabbitAdminSpec extends SpecificationWithJUnit with Mockito {
"A ReDeclaringRabbitAdminSpec" should {
val queue = new Queue("QueueName")
val exchange = new TopicExchange("ExchangeName")
val binding = BindingBuilder bind (queue) to (exchange) `with` ("Binding")
val mockAdmin = mock[RabbitAdmin]
val mockAddListener = mock[ConnectionListener => Unit]
"Add a connection listener" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener)
there was one(mockAddListener).apply(any[ConnectionListener])
}
"declareExchanges" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareExchange(exchange)
there was one(mockAdmin).declareExchange(exchange)
}
"deleteExchanges" in {
mockAdmin.deleteExchange(exchange.getName) returns false
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteExchange(exchange.getName) must beFalse
there was one(mockAdmin).deleteExchange(exchange.getName)
}
"declareQueue with no params" in {
mockAdmin.declareQueue() returns queue
val actualQueue = new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareQueue()
actualQueue must be equalTo (queue)
}
"declare specific queue" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareQueue(queue)
there was one(mockAdmin).declareQueue(queue)
}
"delete a queue" in {
mockAdmin.deleteQueue(queue.getName) returns false
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteQueue(queue.getName) must beFalse
there was one(mockAdmin).deleteQueue(queue.getName)
}
"optionally delete a queue" in {
val unused = true
val empty = false
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteQueue(queue.getName, unused, empty)
there was one(mockAdmin).deleteQueue(queue.getName, unused, empty)
}
"purgeQueue" in {
val noWait = true
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).purgeQueue(queue.getName, noWait)
there was one(mockAdmin).purgeQueue(queue.getName, noWait)
}
"declareBinding" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareBinding(binding)
there was one(mockAdmin).declareBinding(binding)
}
"removeBinding" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareBinding(binding)
there was one(mockAdmin).declareBinding(binding)
}
"re-declare Exchanges and not re-declare Exchanges which were deleted" in {
//arrange
val deletedExchange: Exchange = new TopicExchange("DeletedExchange", true, false)
val listener = newStoringAddListener
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener)
//Act
reDeclaringAdmin.declareExchange(exchange)
reDeclaringAdmin.declareExchange(deletedExchange)
reDeclaringAdmin.deleteExchange(deletedExchange.getName)
listener.value.onCreate(null)
//Assert
there was two(mockAdmin).declareExchange(exchange)
there was one(mockAdmin).declareExchange(deletedExchange)
}
"re-declare Queues and not re-declare Queues which were deleted" in {
//arrange
val deletedQueue = new Queue("DeletedQueue")
val optionallyDeletedQueue = new Queue("OptionallyDeletedQueue")
val listener = newStoringAddListener
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener)
//Act
reDeclaringAdmin.declareQueue(queue)
reDeclaringAdmin.declareQueue(deletedQueue)
reDeclaringAdmin.declareQueue(optionallyDeletedQueue)
reDeclaringAdmin.deleteQueue(deletedQueue.getName)
reDeclaringAdmin.deleteQueue(optionallyDeletedQueue.getName, true, true)
listener.value.onCreate(null)
//Assert
there was two(mockAdmin).declareQueue(queue)
there was one(mockAdmin).declareQueue(deletedQueue)
there was one(mockAdmin).declareQueue(optionallyDeletedQueue)
}
"re-declare Bindings and not re-declare Bindings which were deleted" in {
//arrange
val deletedBinding = BindingBuilder bind(queue) to(exchange) `with`("DeletedBinding")
val listener = newStoringAddListener
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener)
//Act
reDeclaringAdmin.declareBinding(binding)
reDeclaringAdmin.declareBinding(deletedBinding)
reDeclaringAdmin.removeBinding(deletedBinding)
listener.value.onCreate(null)
//Assert
there was two(mockAdmin).declareBinding(binding)
there was one(mockAdmin).declareBinding(deletedBinding)
}
}
def newStoringAddListener = new StoringAddListener()
class StoringAddListener() {
var value: ConnectionListener = null
def apply(l: ConnectionListener) {
value = l
}
}
object StoringAddListener {
implicit def convertToAction(addListener: StoringAddListener): ConnectionListener => Unit = addListener(_)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.