Skip to content

Instantly share code, notes, and snippets.

@charliek
Created October 22, 2011 05:13
Show Gist options
  • Save charliek/1305660 to your computer and use it in GitHub Desktop.
Save charliek/1305660 to your computer and use it in GitHub Desktop.
simple rabbitmq json listener
#!/usr/bin/env groovy
@Grapes([
@Grab(group = 'com.rabbitmq', module = 'amqp-client', version = '2.6.1'),
@Grab(group = 'org.codehaus.jackson', module = 'jackson-mapper-asl', version = '1.8.3')
])
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer
import org.codehaus.jackson.map.ObjectMapper;
class Test {
static ObjectMapper mapper = new ObjectMapper()
def go(args){
def o = parseArgs(args)
listenToQueue(*o)
}
def parseArgs(args) {
def exchange = null
def routingKey = '#'
def prettyPrint = true
def errors = []
def cli = new CliBuilder(usage: 'rabbit-listen.groovy [-e exchange] [-k routingKey] [-p prettyprint]')
cli.e(longOpt: 'exchange', args: 1, argName: 'exchange', 'The exchange to bind to')
cli.k(longOpt: 'routingkey', args: 1, argName: 'routing key', 'Routing key to use [#]')
cli.p(longOpt: 'prettyprint', args: 0, argName: 'pretty print', 'To disable pretty print of json message')
def options = cli.parse(args)
if (options.'exchange') exchange = options.'exchange'
if (options.'routingkey') routingKey = options.'routingkey'
if (options.'pretty') prettyPrint = false
if(exchange == null){
errors << "Exchange is a required parameter!"
}
if (options.arguments() || errors) {
errors.each {
println "${it}\n"
}
cli.usage()
System.exit(-1)
}
return [exchange, routingKey, prettyPrint]
}
def listenToQueue(exchange, routingKey, prettyPrint) {
ConnectionFactory factory = new ConnectionFactory()
factory.setHost("localhost")
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
// def exchange = 'sync.domain';
def queueName = channel.queueDeclare().getQueue();
// channel.queueDeclare queueName
println "Binding to exchange ${exchange} with routing key ${routingKey}"
try {
channel.queueBind(queueName, exchange, routingKey)
} catch (IOException e){
println "Error binding to exchange"
throw e
}
def consumer = new QueueingConsumer(channel)
println "[*] Waiting for messages. To exit press CTRL+C"
println "+" * 100
channel.basicConsume(queueName, true, consumer)
try {
while (true) {
def delivery = consumer.nextDelivery()
def rk = delivery.envelope.routingKey
println "Routing Key : ${rk}"
def exc = delivery.envelope.exchange
println "Exchange : ${exc}"
println "=" * 100
def body = new String(delivery.body)
if(prettyPrint){
def node = mapper.readTree(body)
println mapper.defaultPrettyPrintingWriter().writeValueAsString(node)
} else {
println body
}
println "+" * 100
}
} finally {
channel.close()
conn.close()
}
}
}
new Test().go(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment