Skip to content

Instantly share code, notes, and snippets.

@mrflip
Created March 4, 2013 22:52
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mrflip/5086396 to your computer and use it in GitHub Desktop.
Save mrflip/5086396 to your computer and use it in GitHub Desktop.
package com.infochimps.kafka.consumers;
import java.io.IOException;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import scala.runtime.AbstractFunction0;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.utils.KafkaScheduler;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.node.Node;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.client.Requests;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
class ElasticsearchConsumer {
private Logger Log = Logger.getLogger(ElasticsearchConsumer.class.getName());
// Kafka
private ConsumerConnector kafkaConsumer;
private String kafkaTopic;
private KafkaScheduler batchFlusherScheduler = new KafkaScheduler(1, "es-flusher-", false);
// Configuration
private String esIndex;
private String esObjType;
private int bulkSize = 1000; // Records to send at once
private int flushInterval = 15 * 1000; // Send partial batch anyway every N seconds
private int loggingInterval = 10 * 1000; // Space INFO-level progress reports by N seconds
// Elasticsearch API
private Node esNode;
private Client esClient;
private volatile BulkRequestBuilder currentRequest;
// Used for bookkeeping purposes
private AtomicLong totalBulkTime = new AtomicLong();
private AtomicLong totalBulkItems = new AtomicLong();
private long runStartTime = System.currentTimeMillis(); // FIXME: use kafka.utils.Time (http://kafka.apache.org/coding-guide.html)
private long lastFlushTime;
private long lastLogTime = 0;
// ===========================================================================
//
// Elasticsearch Consumer Set up
//
/*
* Construct internal Kafka consumer and Elasticsearch node and client
*/
public ElasticsearchConsumer(String kafkaTopic, String esIndex, String esObjType) throws Exception {
try {
this.kafkaConsumer = buildKafkaConsumer();
this.esNode = buildEsNode();
this.esClient = esNode.client();
this.currentRequest = esClient.prepareBulk();
} catch (Exception ex) {
throw ex;
}
this.kafkaTopic = kafkaTopic;
this.esIndex = esIndex;
this.esObjType = esObjType;
}
/*
* Build and configure Kafka consumer, drawing configuration from the
* mandatory es.consumer.properties file location
*/
public ConsumerConnector buildKafkaConsumer() throws IOException {
InputStream readIn = null;
Properties properties = new Properties();
if (System.getProperty("es.consumer.properties") != null) {
Log.info("Reading consumer properties from file " + System.getProperty("es.consumer.properties"));
File propertiesFile = new File(System.getProperty("es.consumer.properties"));
if (!propertiesFile.exists() || !propertiesFile.canRead())
throw new IOException("Cannot read file specified");
readIn = new FileInputStream(propertiesFile);
} else {
Log.info("Reading consumer properties from es.consumer.properties file on classpath");
readIn = getClass().getClassLoader().getResourceAsStream("es.consumer.properties");
if (readIn == null)
throw new IOException("Add es.consumer.properties to classpath or specify location using -Des.consumer.properties");
}
properties.load(readIn);
return kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
/*
* Build and configure Elasticsearch Node, drawing configuration from the
* mandatory es.config file location
*/
public Node buildEsNode() throws IOException {
if (System.getProperty("es.config") == null)
throw new IOException("Specify elasticsearch.yml locations using -Des.config option");
Log.info("Joining Elasticsearch cluster");
esNode = nodeBuilder().client(true).local(false).node();
ClusterHealthResponse healthResponse = esNode.client().admin().cluster()
.prepareHealth().setWaitForYellowStatus().setTimeout("30s").execute().actionGet(30000);
if (healthResponse.isTimedOut())
throw new IOException("cluster not healthy, cowardly refusing to continue with export");
Log.info("Got cluster name " + ClusterName.clusterNameFromSettings(esNode.settings()));
return esNode;
}
// ===========================================================================
//
// Message Handling
//
/*
* * start the message stream
* * start a scheduled task to ensure events don't wait around too long in a partial bulk request
* * handle messages in a loop.
*/
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(kafkaTopic, new Integer(1));
Map<String, List<KafkaStream<Message>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap);
KafkaStream<Message> stream = consumerMap.get(kafkaTopic).get(0);
ConsumerIterator<Message> it = stream.iterator();
Log.info("Kafka stream started");
this.lastFlushTime = System.currentTimeMillis();
//
startFlusher();
//
while (it.hasNext()) {
String msg = getMessage(it.next().message());
writeMessage(msg);
}
}
public String getMessage(Message message) {
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
public void writeMessage(String message) {
currentRequest.add(Requests.indexRequest(esIndex).type(esObjType).source(message));
totalBulkItems.incrementAndGet();
processBulkIfNeeded();
}
/*
* Indexes content to elasticsearch when <b>bulkSize</b> records have been accumulated.
*/
private void processBulkIfNeeded() {
if (currentRequest.numberOfActions() >= bulkSize) {
processBulk();
}
}
/*
* Send the batch
*/
private void processBulk() {
if (currentRequest.numberOfActions() < 1){ return; }
long startTime = System.currentTimeMillis();
BulkResponse response;
BulkRequestBuilder flushableRequest = this.currentRequest;
this.currentRequest = esClient.prepareBulk();
try {
if (Log.isDebugEnabled())
Log.debug("Sending " + flushableRequest.numberOfActions() + " items, " + ((System.currentTimeMillis() - lastFlushTime)/1000) + "s since last write");
response = flushableRequest.execute().actionGet();
totalBulkTime.addAndGet(System.currentTimeMillis() - startTime);
reportStatus(startTime, flushableRequest.numberOfActions());
//
if (response.hasFailures()) {
Log.warn("Bulk request has failures");
Log.info(response.buildFailureMessage().substring(0,2000));
}
} catch (Exception err) {
Log.warn("Bulk request failed: " + err.getMessage());
throw new RuntimeException(err);
}
this.lastFlushTime = System.currentTimeMillis();
}
private void startFlusher() {
AbstractFunction0 flushCallback = new AbstractFunction0<Object>() {
public Object apply() { processBulk(); return true; } };
Log.info("Starting batch flusher every " + (flushInterval/1000) + " secs");
batchFlusherScheduler.scheduleWithRate(flushCallback, flushInterval, flushInterval);
}
private void reportStatus(long startTime, long actions) {
boolean loggable = ((System.currentTimeMillis() - this.lastLogTime >= this.loggingInterval) || Log.isDebugEnabled());
if (loggable) {
Log.info("Indexed [" + (actions) + "]items " +
"in [" + ((System.currentTimeMillis() - startTime)/1000) + "]s; " +
"avg [" + (float)(1000.0*totalBulkItems.get())/(System.currentTimeMillis() - runStartTime) + "]rec/s " +
"(total [" + totalBulkItems.get() + "]items " +
"indexed in [" + (totalBulkTime.get()/1000) + "]s, " +
"wall clock [" + ((System.currentTimeMillis() - runStartTime)/1000) + "]s)");
this.lastLogTime = System.currentTimeMillis();
}
}
// ===========================================================================
//
// Lifecycle
//
public void close() {
batchFlusherScheduler.shutdown();
this.lastLogTime = 0; // forces report
Log.info("Sending all pending records");
processBulk();
//
Log.info("Closing embedded elasticsearch node");
if (esClient != null) esClient.close();
if (esNode != null) esNode.close();
Log.info("Shutting down");
}
public static void main(String[] args) throws Exception {
ElasticsearchConsumer consumer = null;
try {
consumer = new ElasticsearchConsumer(args[0], args[1], args[2]);
consumer.run();
} catch (Exception ex) {
if (consumer != null) consumer.close();
ex.printStackTrace();
System.exit(1);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment