Created
January 23, 2012 19:34
-
-
Save drstevens/1665118 to your computer and use it in GitHub Desktop.
Simple Implementenation of AmqpAdmin which maintains all declared Amqp objects in memory allowing automatic redeclaration of them on reconnect
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.bostontechnologies.cscore.amqp | |
import com.weiglewilczek.slf4s.Logging | |
import collection.JavaConversions._ | |
import collection.mutable.{SynchronizedSet, HashSet, ConcurrentMap} | |
import java.lang.String | |
import java.util.concurrent.ConcurrentHashMap | |
import java.util.concurrent.atomic.AtomicBoolean | |
import org.springframework.amqp.core.{Queue, Binding, AmqpAdmin, Exchange} | |
import org.springframework.amqp.rabbit.core.RabbitAdmin | |
import org.springframework.amqp.rabbit.connection.{ChannelListener, Connection, ConnectionListener} | |
import com.rabbitmq.client.Channel | |
class ReDeclaringRabbitAdmin(wrappedRabbitAdmin: RabbitAdmin, | |
addConnectionListener: ConnectionListener => Unit, | |
addChannelListener: ChannelListener => Unit = cl => {}) | |
extends AmqpAdmin with Logging { | |
val exchanges: ConcurrentMap[String, Exchange] = new ConcurrentHashMap[String, Exchange] | |
val queues: ConcurrentMap[String, Queue] = new ConcurrentHashMap[String, Queue] | |
val bindings = new HashSet[Binding] with SynchronizedSet[Binding] | |
//Add listener to re-declare the AMQP objects on reconnect | |
addConnectionListener(new ConnectionListener { | |
def onCreate(connection: Connection) { | |
//reDeclareAmqpObjects() | |
} | |
def onClose(connection: Connection) {} | |
}) | |
addChannelListener(new ChannelListener { | |
def onCreate(channel: Channel, transactional: Boolean) { | |
reDeclareAmqpObjects() | |
} | |
}) | |
def declareExchange(exchange: Exchange) { | |
logger.info("Declaring Exchange: " + exchange.getName) | |
wrappedRabbitAdmin.declareExchange(exchange) | |
exchanges += exchange.getName -> exchange | |
} | |
def deleteExchange(exchangeName: String) = { | |
logger.info("Deleting Exchange: " + exchangeName + ". It will not be redeclared on reconnect") | |
exchanges -= exchangeName | |
wrappedRabbitAdmin.deleteExchange(exchangeName) | |
} | |
def declareQueue(): Queue = { | |
val q = wrappedRabbitAdmin.declareQueue() | |
logger.info("Declared Queue: " + q.getName) | |
queues += q.getName -> q | |
q | |
} | |
def declareQueue(queue: Queue) { | |
logger.info("Declaring Queue: " + queue.getName) | |
wrappedRabbitAdmin.declareQueue(queue) | |
queues += queue.getName -> queue | |
} | |
def deleteQueue(queueName: String) = { | |
logger.info("Deleting Queue: " + queueName + ". It will not be redeclared on reconnect") | |
queues -= queueName | |
wrappedRabbitAdmin.deleteQueue(queueName) | |
} | |
def deleteQueue(queueName: String, unused: Boolean, empty: Boolean) { | |
logger.info("Conditionally Deleting Queue: " + queueName + ". It will not be redeclared on reconnect") | |
//Assume queue is deleted and prevent it from being declared on reconnect | |
queues -= queueName | |
wrappedRabbitAdmin.deleteQueue(queueName, unused, empty) | |
} | |
def purgeQueue(queueName: String, noWait: Boolean) { | |
wrappedRabbitAdmin.purgeQueue(queueName, noWait) | |
} | |
def declareBinding(binding: Binding) { | |
logger.info(bindingDeclarationInfo(binding)) | |
bindings += binding //Don't care if it already exists | |
wrappedRabbitAdmin.declareBinding(binding) | |
} | |
def removeBinding(binding: Binding) { | |
bindings -= binding | |
wrappedRabbitAdmin.removeBinding(binding) | |
} | |
private val reDeclareAmqpObjects = noOpIfInProgress(() => { | |
logger.debug("Redeclaring Amqp Objects") | |
val localExchanges = exchanges.values.toArray | |
val localQueues = queues.values.toArray | |
val localBindings = bindings.clone() | |
for (exchange <- localExchanges) { | |
logger.info("Redeclaring Exchange: " + exchange.getName) | |
if (!exchange.isDurable) { | |
logger.warn("Auto-declaring a non-durable Exchange (" + exchange.getName + | |
"). It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.") | |
} | |
if (exchange.isAutoDelete) { | |
logger.warn("Auto-declaring an auto-delete Exchange (" + exchange.getName + | |
"). It will be deleted by the broker if not in use (if all bindings are deleted), but will only be redeclared if the connection is closed and reopened.") | |
} | |
wrappedRabbitAdmin.declareExchange(exchange) | |
} | |
for (queue <- localQueues) { | |
logger.info("Redeclaring Queue: " + queue.getName) | |
if (!queue.isDurable) { | |
logger.warn("Redeclaring a non-durable Queue (" + queue.getName + | |
"). It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.") | |
} | |
if (queue.isAutoDelete) { | |
logger.warn("Redeclaring an auto-delete Queue (" + queue.getName + | |
"). It will be deleted by the broker if not in use, and all messages will be lost. Redeclared when the connection is closed and reopened.") | |
} | |
if (queue.isExclusive) { | |
logger.warn("Redeclaring an exclusive Queue (" + queue.getName + | |
"). It cannot be accessed by consumers on another connection, and will be redeclared if the connection is reopened.") | |
} | |
wrappedRabbitAdmin.declareQueue(queue) | |
} | |
for (binding <- localBindings) { | |
logger.info(bindingDeclarationInfo(binding)) | |
wrappedRabbitAdmin.declareBinding(binding) | |
} | |
logger.debug("ReDeclarations finished") | |
}) | |
private def bindingDeclarationInfo(b: Binding) = | |
"Declaring Binding [" + | |
b.getDestination + " (" + b.getDestinationType + ")] to exchange [" + | |
b.getExchange + "] with routing key [" + b.getRoutingKey + "]" | |
/** | |
* This wraps the action in a function which will do nothing | |
* if called while another is in progress | |
*/ | |
private def noOpIfInProgress(action: () => Unit): () => Unit = { | |
val inProgress: AtomicBoolean = new AtomicBoolean(false) | |
() => { | |
if (inProgress.compareAndSet(false, true)) { | |
try { | |
action() | |
} | |
finally { | |
inProgress.compareAndSet(true, false) | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Ignore these tests for now!!!!!!!!!!!!!!!!!! | |
* I am experimenting with getting this to redeclare things after consumer reconsumes | |
package com.bostontechnologies.cscore.amqp | |
import org.specs.SpecificationWithJUnit | |
import org.specs.mock.Mockito | |
import org.springframework.amqp.rabbit.connection.ConnectionListener | |
import org.springframework.amqp.rabbit.core.RabbitAdmin | |
import org.springframework.amqp.core.{BindingBuilder, Queue, Exchange, TopicExchange} | |
class ReDeclaringRabbitAdminSpec extends SpecificationWithJUnit with Mockito { | |
"A ReDeclaringRabbitAdminSpec" should { | |
val queue = new Queue("QueueName") | |
val exchange = new TopicExchange("ExchangeName") | |
val binding = BindingBuilder bind (queue) to (exchange) `with` ("Binding") | |
val mockAdmin = mock[RabbitAdmin] | |
val mockAddListener = mock[ConnectionListener => Unit] | |
"Add a connection listener" in { | |
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener) | |
there was one(mockAddListener).apply(any[ConnectionListener]) | |
} | |
"declareExchanges" in { | |
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareExchange(exchange) | |
there was one(mockAdmin).declareExchange(exchange) | |
} | |
"deleteExchanges" in { | |
mockAdmin.deleteExchange(exchange.getName) returns false | |
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteExchange(exchange.getName) must beFalse | |
there was one(mockAdmin).deleteExchange(exchange.getName) | |
} | |
"declareQueue with no params" in { | |
mockAdmin.declareQueue() returns queue | |
val actualQueue = new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareQueue() | |
actualQueue must be equalTo (queue) | |
} | |
"declare specific queue" in { | |
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareQueue(queue) | |
there was one(mockAdmin).declareQueue(queue) | |
} | |
"delete a queue" in { | |
mockAdmin.deleteQueue(queue.getName) returns false | |
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteQueue(queue.getName) must beFalse | |
there was one(mockAdmin).deleteQueue(queue.getName) | |
} | |
"optionally delete a queue" in { | |
val unused = true | |
val empty = false | |
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).deleteQueue(queue.getName, unused, empty) | |
there was one(mockAdmin).deleteQueue(queue.getName, unused, empty) | |
} | |
"purgeQueue" in { | |
val noWait = true | |
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).purgeQueue(queue.getName, noWait) | |
there was one(mockAdmin).purgeQueue(queue.getName, noWait) | |
} | |
"declareBinding" in { | |
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareBinding(binding) | |
there was one(mockAdmin).declareBinding(binding) | |
} | |
"removeBinding" in { | |
new ReDeclaringRabbitAdmin(mockAdmin, mockAddListener).declareBinding(binding) | |
there was one(mockAdmin).declareBinding(binding) | |
} | |
"re-declare Exchanges and not re-declare Exchanges which were deleted" in { | |
//arrange | |
val deletedExchange: Exchange = new TopicExchange("DeletedExchange", true, false) | |
val listener = newStoringAddListener | |
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener) | |
//Act | |
reDeclaringAdmin.declareExchange(exchange) | |
reDeclaringAdmin.declareExchange(deletedExchange) | |
reDeclaringAdmin.deleteExchange(deletedExchange.getName) | |
listener.value.onCreate(null) | |
//Assert | |
there was two(mockAdmin).declareExchange(exchange) | |
there was one(mockAdmin).declareExchange(deletedExchange) | |
} | |
"re-declare Queues and not re-declare Queues which were deleted" in { | |
//arrange | |
val deletedQueue = new Queue("DeletedQueue") | |
val optionallyDeletedQueue = new Queue("OptionallyDeletedQueue") | |
val listener = newStoringAddListener | |
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener) | |
//Act | |
reDeclaringAdmin.declareQueue(queue) | |
reDeclaringAdmin.declareQueue(deletedQueue) | |
reDeclaringAdmin.declareQueue(optionallyDeletedQueue) | |
reDeclaringAdmin.deleteQueue(deletedQueue.getName) | |
reDeclaringAdmin.deleteQueue(optionallyDeletedQueue.getName, true, true) | |
listener.value.onCreate(null) | |
//Assert | |
there was two(mockAdmin).declareQueue(queue) | |
there was one(mockAdmin).declareQueue(deletedQueue) | |
there was one(mockAdmin).declareQueue(optionallyDeletedQueue) | |
} | |
"re-declare Bindings and not re-declare Bindings which were deleted" in { | |
//arrange | |
val deletedBinding = BindingBuilder bind(queue) to(exchange) `with`("DeletedBinding") | |
val listener = newStoringAddListener | |
val reDeclaringAdmin = new ReDeclaringRabbitAdmin(mockAdmin, listener) | |
//Act | |
reDeclaringAdmin.declareBinding(binding) | |
reDeclaringAdmin.declareBinding(deletedBinding) | |
reDeclaringAdmin.removeBinding(deletedBinding) | |
listener.value.onCreate(null) | |
//Assert | |
there was two(mockAdmin).declareBinding(binding) | |
there was one(mockAdmin).declareBinding(deletedBinding) | |
} | |
} | |
def newStoringAddListener = new StoringAddListener() | |
class StoringAddListener() { | |
var value: ConnectionListener = null | |
def apply(l: ConnectionListener) { | |
value = l | |
} | |
} | |
object StoringAddListener { | |
implicit def convertToAction(addListener: StoringAddListener): ConnectionListener => Unit = addListener(_) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment