Skip to content

Instantly share code, notes, and snippets.

@kjkrol
Last active April 12, 2018 15:28
Show Gist options
  • Save kjkrol/619c4043c2d9d14423c83b8df88abc64 to your computer and use it in GitHub Desktop.
Save kjkrol/619c4043c2d9d14423c83b8df88abc64 to your computer and use it in GitHub Desktop.
EmbeddedAMQPBroker
package kjkrol.amqpbroker
import org.apache.qpid.server.SystemLauncher
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.core.RabbitTemplate
class EmbeddedAMQPBroker {
private static final String INITIAL_CONFIGURATION = "test-initial-config.json"
private SystemLauncher systemLauncher
EmbeddedAMQPBroker() {
systemLauncher = new SystemLauncher()
}
void start() throws Exception {
systemLauncher.startup(createSystemConfig())
}
void stop() {
systemLauncher.shutdown()
}
TopicExchange createExchange(String exchangeName) {
CachingConnectionFactory cf = new CachingConnectionFactory()
try {
RabbitAdmin admin = new RabbitAdmin(cf)
TopicExchange exchange = new TopicExchange(exchangeName)
admin.declareExchange(exchange)
return exchange
} finally {
cf.destroy()
}
}
org.springframework.amqp.core.Queue createQueue(String queueName) {
CachingConnectionFactory cf = new CachingConnectionFactory()
try {
RabbitAdmin admin = new RabbitAdmin(cf)
org.springframework.amqp.core.Queue queue = new org.springframework.amqp.core.Queue(queueName, false)
admin.declareQueue(queue)
return queue
} finally {
cf.destroy()
}
}
void createBinding(TopicExchange exchange, org.springframework.amqp.core.Queue queue, String routingQueue) {
CachingConnectionFactory cf = new CachingConnectionFactory()
try {
RabbitAdmin admin = new RabbitAdmin(cf)
admin.declareBinding(BindingBuilder
.bind(queue)
.to(exchange)
.with(routingQueue))
} finally {
cf.destroy()
}
}
void deleteExchange(String exchangeName) {
CachingConnectionFactory cf = new CachingConnectionFactory()
RabbitAdmin admin = new RabbitAdmin(cf)
admin.deleteExchange(exchangeName)
cf.destroy()
}
void deleteQueue(String queueName) {
CachingConnectionFactory cf = new CachingConnectionFactory()
RabbitAdmin admin = new RabbitAdmin(cf)
admin.deleteQueue(queueName)
cf.destroy()
}
void sendMessage(String exchangeName, String message) throws Exception {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory()
RabbitTemplate template = new RabbitTemplate(connectionFactory)
template.convertAndSend(exchangeName, "#", message)
connectionFactory.destroy()
// waitForMessageBeConsumed();
}
private static Map<String, Object> createSystemConfig() {
Map<String, Object> attributes = [] as Map
URL initialConfig = EmbeddedAMQPBroker.class.getClassLoader().getResource(INITIAL_CONFIGURATION)
attributes.put("type", "Memory")
attributes.put("initialConfigurationLocation", initialConfig.toExternalForm())
attributes.put("startupLoggedToSystemOut", true)
return attributes
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment