Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Custom Message Handler for Splunk Kafka Modular Input that outputs the raw message received only
package com.splunk.modinput.kafka;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.splunk.modinput.Stream;
import com.splunk.modinput.StreamEvent;
import com.splunk.modinput.kafka.KafkaModularInput.MessageReceiver;
public class BodyOnlyMessageHandler extends AbstractMessageHandler {
String charset = Charset.defaultCharset().name();
@Override
public Stream handleMessage(byte[] messageContents,MessageReceiver context)
throws Exception {
String text = getMessageBody(messageContents,charset);
Stream stream = new Stream();
ArrayList<StreamEvent> list = new ArrayList<StreamEvent>();
List<String> chunks = chunkData(text, 1024);
for (int i = 0; i < chunks.size(); i++) {
StreamEvent event = new StreamEvent();
event.setUnbroken("1");
event.setData(chunks.get(i));
event.setStanza(context.stanzaName);
// if we are seeing the last chunk, set the "done" element
if (i == chunks.size() - 1)
event.setDone(" ");
list.add(event);
}
stream.setEvents(list);
return stream;
}
public static List<String> chunkData(String text, int size) {
List<String> ret = new ArrayList<String>((text.length() + size - 1)
/ size);
for (int start = 0; start < text.length(); start += size) {
ret.add(text.substring(start, Math.min(text.length(), start + size)));
}
return ret;
}
@Override
public void setParams(Map<String, String> params) {
if(params.containsKey("charset"))
this.charset = params.get("charset");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.