Skip to content

Instantly share code, notes, and snippets.

@kbkaran
Created November 30, 2012 08:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kbkaran/4174494 to your computer and use it in GitHub Desktop.
Save kbkaran/4174494 to your computer and use it in GitHub Desktop.
package com.kiru.flume.sink.splunk;
import java.io.IOException;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flume.event.EventHelper;
import com.splunk.Service;
import com.splunk.Index;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.util.Calendar;
import java.util.HashMap;
import java.text.SimpleDateFormat;
/**
*
*/
public class SplunkSink extends AbstractSink implements Configurable {
private static final String INDEX = "flumeindex";
private static final Logger logger = LoggerFactory
.getLogger(SplunkSink.class);
static SimpleDateFormat dateFormat =
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
static Calendar cal = Calendar.getInstance();
private Socket stream;
private OutputStreamWriter splunkStream;
private Index index;
public SplunkSink() {
}
@Override
public void start() {
super.start();
HashMap<String, Object> arg0 = new HashMap<String, Object>();
arg0.put("index", INDEX);
arg0.put("itype", "submit");
arg0.put("host", "localhost");
arg0.put("username", "admin");
arg0.put("password", "changeme");
Service service = Service.connect(arg0);
this.index = service.getIndexes().get(INDEX);
try {
this.stream = index.attach();
splunkStream = new OutputStreamWriter(stream.getOutputStream(),
"UTF8");
} catch (IOException ioe) {
throw new RuntimeException("Unable to open a connection to Splunk");
}
}
@Override
public void stop() {
try {
splunkStream.flush();
splunkStream.close();
} catch (IOException ioe) {
System.out.println("Problem closing Splunk streams");
}
}
@SuppressWarnings("unchecked")
@Override
public void configure(Context context) {
}
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
try {
transaction.begin();
event = channel.take();
if (event != null) {
String eventstr = EventHelper.dumpEvent(event);
System.out.println("SplunkSink: " + eventstr);
index.submit(String.format("%s %s", dateFormat.format(cal.getTime()), eventstr)) ;
} else {
// No event found, request back-off semantics from the sink
// runner
result = Status.BACKOFF;
}
transaction.commit();
} catch (Exception ex) {
transaction.rollback();
throw new EventDeliveryException("Failed to log event: " + event,
ex);
} finally {
transaction.close();
}
return result;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment