Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Custom Message Handler for AMQP Modular Input that just dumps the raw received JSON to Splunk
package com.splunk.modinput.amqp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import com.splunk.modinput.Stream;
import com.splunk.modinput.StreamEvent;
import com.splunk.modinput.amqp.AMQPModularInput.MessageReceiver;
public class JSONMessageHandler extends AbstractMessageHandler {
@Override
public Stream handleMessage(byte[] messageContents, Envelope envelope,
AMQP.BasicProperties messageProperties, MessageReceiver context)
throws Exception {
String jsonBody = getMessageBody(messageContents);
String text = jsonBody.toString();
Stream stream = new Stream();
ArrayList<StreamEvent> list = new ArrayList<StreamEvent>();
List<String> chunks = chunkData(text, 1024);
for (int i = 0; i < chunks.size(); i++) {
StreamEvent event = new StreamEvent();
event.setUnbroken("1");
event.setData(chunks.get(i));
event.setStanza(context.stanzaName);
// if we are seeing the last chunk, set the "done" element
if (i == chunks.size() - 1)
event.setDone(" ");
list.add(event);
}
stream.setEvents(list);
return stream;
}
public static List<String> chunkData(String text, int size) {
List<String> ret = new ArrayList<String>((text.length() + size - 1)
/ size);
for (int start = 0; start < text.length(); start += size) {
ret.add(text.substring(start, Math.min(text.length(), start + size)));
}
return ret;
}
@Override
public void setParams(Map<String, String> params) {
// Do nothing , params not used
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.