Skip to content

Instantly share code, notes, and snippets.

@charliek
Last active December 10, 2015 21:48
Show Gist options
  • Save charliek/4497399 to your computer and use it in GitHub Desktop.
Save charliek/4497399 to your computer and use it in GitHub Desktop.
#!/usr/bin/env groovy
@Grapes([
@Grab(group = 'com.rabbitmq', module = 'amqp-client', version = '2.6.1'),
@Grab(group = 'org.codehaus.jackson', module = 'jackson-mapper-asl', version = '1.8.3')
])
import com.rabbitmq.client.ConnectionFactory
import com.rabbitmq.client.Connection
import com.rabbitmq.client.Channel
import com.rabbitmq.client.QueueingConsumer
import org.codehaus.jackson.map.ObjectMapper
import org.codehaus.jackson.JsonNode
class Test {
static ObjectMapper mapper = new ObjectMapper()
public static final String ANSI_RESET = "\u001B[0m"
public static final String ANSI_BLACK = "\u001B[30m"
public static final String ANSI_RED = "\u001B[31m"
public static final String ANSI_GREEN = "\u001B[32m"
public static final String ANSI_YELLOW = "\u001B[33m"
public static final String ANSI_BLUE = "\u001B[34m"
public static final String ANSI_PURPLE = "\u001B[35m"
public static final String ANSI_CYAN = "\u001B[36m"
public static final String ANSI_WHITE = "\u001B[37m"
def go(args){
def o = parseArgs(args)
listenToQueue(*o)
}
def parseArgs(args) {
def errors = []
def cli = new CliBuilder(usage: 'rabbit-listen.groovy [-e exchange] [-k routingKey]')
cli.e(longOpt: 'exchange', args: 1, argName: 'exchange', 'The exchange to bind to [log]')
cli.k(longOpt: 'routingkey', args: 1, argName: 'routing key', 'Routing key to use [#]')
cli.v(longOpt: 'virtualhost', args: 1, argName: 'virtual host', 'Virtual Host to byind to [logging]')
def options = cli.parse(args)
String exchange = options.'exchange' ?: 'log'
String routingkey = options.'routingkey' ?: '#'
String virtualHost = options.'virtualhost' ?: '/logging'
if (options.arguments() || errors) {
errors.each {
println "${it}\n"
}
cli.usage()
System.exit(-1)
}
return [exchange, routingkey, virtualHost]
}
def listenToQueue(exchange, routingKey, virtualHost) {
println "Binding to exchange ${exchange} with routing key ${routingKey} on ${virtualHost}"
ConnectionFactory factory = new ConnectionFactory()
factory.setHost("localhost")
factory.setVirtualHost(virtualHost)
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
def queueName = channel.queueDeclare().getQueue();
try {
channel.queueBind(queueName, exchange, routingKey)
} catch (IOException e){
println "Error binding to exchange"
throw e
}
def consumer = new QueueingConsumer(channel)
println "[*] Waiting for messages. To exit press CTRL+C"
println "+" * 100
channel.basicConsume(queueName, true, consumer)
try {
while (true) {
def delivery = consumer.nextDelivery()
String body = new String(delivery.body)
JsonNode node = mapper.readTree(body)
String type = determineLinePrefix(node)
String message = node.get('@message').textValue
println "${ANSI_GREEN}${type}${ANSI_YELLOW} => ${ANSI_RESET}${message}"
}
} finally {
channel.close()
conn.close()
}
}
String determineLinePrefix(JsonNode node) {
String path = node.get('@source_path').textValue
return path[path.lastIndexOf('/')+1..-5]
}
}
new Test().go(args)
input {
# http://logstash.net/docs/1.1.5/inputs/file
file {
format => "plain"
path => "/var/log/radiant/*.log"
type => "radiant"
}
}
output {
# http://logstash.net/docs/1.1.5/outputs/amqp
amqp {
name => "log"
host => "localhost"
exchange_type => "topic"
vhost => "/logging"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment