Created
October 22, 2011 05:13
-
-
Save charliek/1305660 to your computer and use it in GitHub Desktop.
simple rabbitmq json listener
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env groovy | |
@Grapes([ | |
@Grab(group = 'com.rabbitmq', module = 'amqp-client', version = '2.6.1'), | |
@Grab(group = 'org.codehaus.jackson', module = 'jackson-mapper-asl', version = '1.8.3') | |
]) | |
import com.rabbitmq.client.ConnectionFactory; | |
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.Channel; | |
import com.rabbitmq.client.QueueingConsumer | |
import org.codehaus.jackson.map.ObjectMapper; | |
class Test { | |
static ObjectMapper mapper = new ObjectMapper() | |
def go(args){ | |
def o = parseArgs(args) | |
listenToQueue(*o) | |
} | |
def parseArgs(args) { | |
def exchange = null | |
def routingKey = '#' | |
def prettyPrint = true | |
def errors = [] | |
def cli = new CliBuilder(usage: 'rabbit-listen.groovy [-e exchange] [-k routingKey] [-p prettyprint]') | |
cli.e(longOpt: 'exchange', args: 1, argName: 'exchange', 'The exchange to bind to') | |
cli.k(longOpt: 'routingkey', args: 1, argName: 'routing key', 'Routing key to use [#]') | |
cli.p(longOpt: 'prettyprint', args: 0, argName: 'pretty print', 'To disable pretty print of json message') | |
def options = cli.parse(args) | |
if (options.'exchange') exchange = options.'exchange' | |
if (options.'routingkey') routingKey = options.'routingkey' | |
if (options.'pretty') prettyPrint = false | |
if(exchange == null){ | |
errors << "Exchange is a required parameter!" | |
} | |
if (options.arguments() || errors) { | |
errors.each { | |
println "${it}\n" | |
} | |
cli.usage() | |
System.exit(-1) | |
} | |
return [exchange, routingKey, prettyPrint] | |
} | |
def listenToQueue(exchange, routingKey, prettyPrint) { | |
ConnectionFactory factory = new ConnectionFactory() | |
factory.setHost("localhost") | |
Connection conn = factory.newConnection(); | |
Channel channel = conn.createChannel(); | |
// def exchange = 'sync.domain'; | |
def queueName = channel.queueDeclare().getQueue(); | |
// channel.queueDeclare queueName | |
println "Binding to exchange ${exchange} with routing key ${routingKey}" | |
try { | |
channel.queueBind(queueName, exchange, routingKey) | |
} catch (IOException e){ | |
println "Error binding to exchange" | |
throw e | |
} | |
def consumer = new QueueingConsumer(channel) | |
println "[*] Waiting for messages. To exit press CTRL+C" | |
println "+" * 100 | |
channel.basicConsume(queueName, true, consumer) | |
try { | |
while (true) { | |
def delivery = consumer.nextDelivery() | |
def rk = delivery.envelope.routingKey | |
println "Routing Key : ${rk}" | |
def exc = delivery.envelope.exchange | |
println "Exchange : ${exc}" | |
println "=" * 100 | |
def body = new String(delivery.body) | |
if(prettyPrint){ | |
def node = mapper.readTree(body) | |
println mapper.defaultPrettyPrintingWriter().writeValueAsString(node) | |
} else { | |
println body | |
} | |
println "+" * 100 | |
} | |
} finally { | |
channel.close() | |
conn.close() | |
} | |
} | |
} | |
new Test().go(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment