Skip to content

Instantly share code, notes, and snippets.

@drstevens
Created January 23, 2012 19:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drstevens/1665118 to your computer and use it in GitHub Desktop.
Save drstevens/1665118 to your computer and use it in GitHub Desktop.
Simple Implementenation of AmqpAdmin which maintains all declared Amqp objects in memory allowing automatic redeclaration of them on reconnect
package com.bostontechnologies.cscore.amqp
import com.weiglewilczek.slf4s.Logging
import collection.JavaConversions._
import collection.mutable.{SynchronizedSet, HashSet, ConcurrentMap}
import java.lang.String
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import org.springframework.amqp.core.{Queue, Binding, AmqpAdmin, Exchange}
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.connection.{ChannelListener, Connection, ConnectionListener}
import com.rabbitmq.client.Channel
class ReDeclaringRabbitAdmin(wrappedRabbitAdmin: RabbitAdmin,
addConnectionListener: ConnectionListener => Unit,
addChannelListener: ChannelListener => Unit = cl => {})
extends AmqpAdmin with Logging {
val exchanges: ConcurrentMap[String, Exchange] = new ConcurrentHashMap[String, Exchange]
val queues: ConcurrentMap[String, Queue] = new ConcurrentHashMap[String, Queue]
val bindings = new HashSet[Binding] with SynchronizedSet[Binding]
//Add listener to re-declare the AMQP objects on reconnect
addConnectionListener(new ConnectionListener {
def onCreate(connection: Connection) {
//reDeclareAmqpObjects()
}
def onClose(connection: Connection) {}
})
addChannelListener(new ChannelListener {
def onCreate(channel: Channel, transactional: Boolean) {
reDeclareAmqpObjects()
}
})
def declareExchange(exchange: Exchange) {
logger.info("Declaring Exchange: " + exchange.getName)
wrappedRabbitAdmin.declareExchange(exchange)
exchanges += exchange.getName -> exchange
}
def deleteExchange(exchangeName: String) = {
logger.info("Deleting Exchange: " + exchangeName + ". It will not be redeclared on reconnect")
exchanges -= exchangeName
wrappedRabbitAdmin.deleteExchange(exchangeName)
}
def declareQueue(): Queue = {
val q = wrappedRabbitAdmin.declareQueue()
logger.info("Declared Queue: " + q.getName)
queues += q.getName -> q
q
}
def declareQueue(queue: Queue) {
logger.info("Declaring Queue: " + queue.getName)
wrappedRabbitAdmin.declareQueue(queue)
queues += queue.getName -> queue
}
def deleteQueue(queueName: String) = {
logger.info("Deleting Queue: " + queueName + ". It will not be redeclared on reconnect")
queues -= queueName
wrappedRabbitAdmin.deleteQueue(queueName)
}
def deleteQueue(queueName: String, unused: Boolean, empty: Boolean) {
logger.info("Conditionally Deleting Queue: " + queueName + ". It will not be redeclared on reconnect")
//Assume queue is deleted and prevent it from being declared on reconnect
queues -= queueName
wrappedRabbitAdmin.deleteQueue(queueName, unused, empty)
}
def purgeQueue(queueName: String, noWait: Boolean) {
wrappedRabbitAdmin.purgeQueue(queueName, noWait)
}
def declareBinding(binding: Binding) {
logger.info(bindingDeclarationInfo(binding))
bindings += binding //Don't care if it already exists
wrappedRabbitAdmin.declareBinding(binding)
}
def removeBinding(binding: Binding) {
bindings -= binding
wrappedRabbitAdmin.removeBinding(binding)
}
private val reDeclareAmqpObjects = noOpIfInProgress(() => {
logger.debug("Redeclaring Amqp Objects")
val localExchanges = exchanges.values.toArray
val localQueues = queues.values.toArray
val localBindings = bindings.clone()
for (exchange <- localExchanges) {
logger.info("Redeclaring Exchange: " + exchange.getName)
if (!exchange.isDurable) {
logger.warn("Auto-declaring a non-durable Exchange (" + exchange.getName +
"). It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.")
}
if (exchange.isAutoDelete) {
logger.warn("Auto-declaring an auto-delete Exchange (" + exchange.getName +
"). It will be deleted by the broker if not in use (if all bindings are deleted), but will only be redeclared if the connection is closed and reopened.")
}
wrappedRabbitAdmin.declareExchange(exchange)
}
for (queue <- localQueues) {
logger.info("Redeclaring Queue: " + queue.getName)
if (!queue.isDurable) {
logger.warn("Redeclaring a non-durable Queue (" + queue.getName +
"). It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.")
}
if (queue.isAutoDelete) {
logger.warn("Redeclaring an auto-delete Queue (" + queue.getName +
"). It will be deleted by the broker if not in use, and all messages will be lost. Redeclared when the connection is closed and reopened.")
}
if (queue.isExclusive) {
logger.warn("Redeclaring an exclusive Queue (" + queue.getName +
"). It cannot be accessed by consumers on another connection, and will be redeclared if the connection is reopened.")
}
wrappedRabbitAdmin.declareQueue(queue)
}
for (binding <- localBindings) {
logger.info(bindingDeclarationInfo(binding))
wrappedRabbitAdmin.declareBinding(binding)
}
logger.debug("ReDeclarations finished")
})
private def bindingDeclarationInfo(b: Binding) =
"Declaring Binding [" +
b.getDestination + " (" + b.getDestinationType + ")] to exchange [" +
b.getExchange + "] with routing key [" + b.getRoutingKey + "]"
/**
* This wraps the action in a function which will do nothing
* if called while another is in progress
*/
private def noOpIfInProgress(action: () => Unit): () => Unit = {
val inProgress: AtomicBoolean = new AtomicBoolean(false)
() => {
if (inProgress.compareAndSet(false, true)) {
try {
action()
}
finally {
inProgress.compareAndSet(true, false)
}
}
}
}
}
/* Ignore these tests for now!!!!!!!!!!!!!!!!!!
* I am experimenting with getting this to redeclare things after consumer reconsumes
package com.bostontechnologies.cscore.amqp
import org.specs.SpecificationWithJUnit
import org.specs.mock.Mockito
import org.springframework.amqp.rabbit.connection.ConnectionListener
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.core.{BindingBuilder, Queue, Exchange, TopicExchange}
class ReDeclaringRabbitAdminSpec extends SpecificationWithJUnit with Mockito {
"A ReDeclaringRabbitAdminSpec" should {
val queue = new Queue("QueueName")
val exchange = new TopicExchange("ExchangeName")
val binding = BindingBuilder bind (queue) to (exchange) `with` ("Binding")
val mockAdmin = mock[RabbitAdmin]
val mockAddListener = mock[ConnectionListener => Unit]
"Add a connection listener" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener)
there was one(mockAddListener).apply(any[ConnectionListener])
}
"declareExchanges" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareExchange(exchange)
there was one(mockAdmin).declareExchange(exchange)
}
"deleteExchanges" in {
mockAdmin.deleteExchange(exchange.getName) returns false
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteExchange(exchange.getName) must beFalse
there was one(mockAdmin).deleteExchange(exchange.getName)
}
"declareQueue with no params" in {
mockAdmin.declareQueue() returns queue
val actualQueue = new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareQueue()
actualQueue must be equalTo (queue)
}
"declare specific queue" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareQueue(queue)
there was one(mockAdmin).declareQueue(queue)
}
"delete a queue" in {
mockAdmin.deleteQueue(queue.getName) returns false
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteQueue(queue.getName) must beFalse
there was one(mockAdmin).deleteQueue(queue.getName)
}
"optionally delete a queue" in {
val unused = true
val empty = false
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteQueue(queue.getName, unused, empty)
there was one(mockAdmin).deleteQueue(queue.getName, unused, empty)
}
"purgeQueue" in {
val noWait = true
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).purgeQueue(queue.getName, noWait)
there was one(mockAdmin).purgeQueue(queue.getName, noWait)
}
"declareBinding" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareBinding(binding)
there was one(mockAdmin).declareBinding(binding)
}
"removeBinding" in {
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareBinding(binding)
there was one(mockAdmin).declareBinding(binding)
}
"re-declare Exchanges and not re-declare Exchanges which were deleted" in {
//arrange
val deletedExchange: Exchange = new TopicExchange("DeletedExchange", true, false)
val listener = newStoringAddListener
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener)
//Act
reDeclaringAdmin.declareExchange(exchange)
reDeclaringAdmin.declareExchange(deletedExchange)
reDeclaringAdmin.deleteExchange(deletedExchange.getName)
listener.value.onCreate(null)
//Assert
there was two(mockAdmin).declareExchange(exchange)
there was one(mockAdmin).declareExchange(deletedExchange)
}
"re-declare Queues and not re-declare Queues which were deleted" in {
//arrange
val deletedQueue = new Queue("DeletedQueue")
val optionallyDeletedQueue = new Queue("OptionallyDeletedQueue")
val listener = newStoringAddListener
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener)
//Act
reDeclaringAdmin.declareQueue(queue)
reDeclaringAdmin.declareQueue(deletedQueue)
reDeclaringAdmin.declareQueue(optionallyDeletedQueue)
reDeclaringAdmin.deleteQueue(deletedQueue.getName)
reDeclaringAdmin.deleteQueue(optionallyDeletedQueue.getName, true, true)
listener.value.onCreate(null)
//Assert
there was two(mockAdmin).declareQueue(queue)
there was one(mockAdmin).declareQueue(deletedQueue)
there was one(mockAdmin).declareQueue(optionallyDeletedQueue)
}
"re-declare Bindings and not re-declare Bindings which were deleted" in {
//arrange
val deletedBinding = BindingBuilder bind(queue) to(exchange) `with`("DeletedBinding")
val listener = newStoringAddListener
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener)
//Act
reDeclaringAdmin.declareBinding(binding)
reDeclaringAdmin.declareBinding(deletedBinding)
reDeclaringAdmin.removeBinding(deletedBinding)
listener.value.onCreate(null)
//Assert
there was two(mockAdmin).declareBinding(binding)
there was one(mockAdmin).declareBinding(deletedBinding)
}
}
def newStoringAddListener = new StoringAddListener()
class StoringAddListener() {
var value: ConnectionListener = null
def apply(l: ConnectionListener) {
value = l
}
}
object StoringAddListener {
implicit def convertToAction(addListener: StoringAddListener): ConnectionListener => Unit = addListener(_)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment