Created
March 4, 2013 22:52
-
-
Save mrflip/5086396 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.infochimps.kafka.consumers; | |
import java.io.IOException; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.InputStream; | |
import java.nio.ByteBuffer; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import scala.runtime.AbstractFunction0; | |
import kafka.consumer.ConsumerConfig; | |
import kafka.consumer.ConsumerIterator; | |
import kafka.consumer.KafkaStream; | |
import kafka.javaapi.consumer.ConsumerConnector; | |
import kafka.message.Message; | |
import kafka.utils.KafkaScheduler; | |
import static org.elasticsearch.node.NodeBuilder.nodeBuilder; | |
import org.elasticsearch.ElasticSearchParseException; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.client.transport.TransportClient; | |
import org.elasticsearch.cluster.ClusterName; | |
import org.elasticsearch.node.Node; | |
import org.elasticsearch.action.bulk.BulkRequestBuilder; | |
import org.elasticsearch.indices.IndexAlreadyExistsException; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.ExceptionsHelper; | |
import org.elasticsearch.client.Requests; | |
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; | |
import java.util.concurrent.atomic.AtomicLong; | |
import org.apache.log4j.Logger; | |
class ElasticsearchConsumer { | |
private Logger Log = Logger.getLogger(ElasticsearchConsumer.class.getName()); | |
// Kafka | |
private ConsumerConnector kafkaConsumer; | |
private String kafkaTopic; | |
private KafkaScheduler batchFlusherScheduler = new KafkaScheduler(1, "es-flusher-", false); | |
// Configuration | |
private String esIndex; | |
private String esObjType; | |
private int bulkSize = 1000; // Records to send at once | |
private int flushInterval = 15 * 1000; // Send partial batch anyway every N seconds | |
private int loggingInterval = 10 * 1000; // Space INFO-level progress reports by N seconds | |
// Elasticsearch API | |
private Node esNode; | |
private Client esClient; | |
private volatile BulkRequestBuilder currentRequest; | |
// Used for bookkeeping purposes | |
private AtomicLong totalBulkTime = new AtomicLong(); | |
private AtomicLong totalBulkItems = new AtomicLong(); | |
private long runStartTime = System.currentTimeMillis(); // FIXME: use kafka.utils.Time (http://kafka.apache.org/coding-guide.html) | |
private long lastFlushTime; | |
private long lastLogTime = 0; | |
// =========================================================================== | |
// | |
// Elasticsearch Consumer Set up | |
// | |
/* | |
* Construct internal Kafka consumer and Elasticsearch node and client | |
*/ | |
public ElasticsearchConsumer(String kafkaTopic, String esIndex, String esObjType) throws Exception { | |
try { | |
this.kafkaConsumer = buildKafkaConsumer(); | |
this.esNode = buildEsNode(); | |
this.esClient = esNode.client(); | |
this.currentRequest = esClient.prepareBulk(); | |
} catch (Exception ex) { | |
throw ex; | |
} | |
this.kafkaTopic = kafkaTopic; | |
this.esIndex = esIndex; | |
this.esObjType = esObjType; | |
} | |
/* | |
* Build and configure Kafka consumer, drawing configuration from the | |
* mandatory es.consumer.properties file location | |
*/ | |
public ConsumerConnector buildKafkaConsumer() throws IOException { | |
InputStream readIn = null; | |
Properties properties = new Properties(); | |
if (System.getProperty("es.consumer.properties") != null) { | |
Log.info("Reading consumer properties from file " + System.getProperty("es.consumer.properties")); | |
File propertiesFile = new File(System.getProperty("es.consumer.properties")); | |
if (!propertiesFile.exists() || !propertiesFile.canRead()) | |
throw new IOException("Cannot read file specified"); | |
readIn = new FileInputStream(propertiesFile); | |
} else { | |
Log.info("Reading consumer properties from es.consumer.properties file on classpath"); | |
readIn = getClass().getClassLoader().getResourceAsStream("es.consumer.properties"); | |
if (readIn == null) | |
throw new IOException("Add es.consumer.properties to classpath or specify location using -Des.consumer.properties"); | |
} | |
properties.load(readIn); | |
return kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); | |
} | |
/* | |
* Build and configure Elasticsearch Node, drawing configuration from the | |
* mandatory es.config file location | |
*/ | |
public Node buildEsNode() throws IOException { | |
if (System.getProperty("es.config") == null) | |
throw new IOException("Specify elasticsearch.yml locations using -Des.config option"); | |
Log.info("Joining Elasticsearch cluster"); | |
esNode = nodeBuilder().client(true).local(false).node(); | |
ClusterHealthResponse healthResponse = esNode.client().admin().cluster() | |
.prepareHealth().setWaitForYellowStatus().setTimeout("30s").execute().actionGet(30000); | |
if (healthResponse.isTimedOut()) | |
throw new IOException("cluster not healthy, cowardly refusing to continue with export"); | |
Log.info("Got cluster name " + ClusterName.clusterNameFromSettings(esNode.settings())); | |
return esNode; | |
} | |
// =========================================================================== | |
// | |
// Message Handling | |
// | |
/* | |
* * start the message stream | |
* * start a scheduled task to ensure events don't wait around too long in a partial bulk request | |
* * handle messages in a loop. | |
*/ | |
public void run() { | |
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); | |
topicCountMap.put(kafkaTopic, new Integer(1)); | |
Map<String, List<KafkaStream<Message>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap); | |
KafkaStream<Message> stream = consumerMap.get(kafkaTopic).get(0); | |
ConsumerIterator<Message> it = stream.iterator(); | |
Log.info("Kafka stream started"); | |
this.lastFlushTime = System.currentTimeMillis(); | |
// | |
startFlusher(); | |
// | |
while (it.hasNext()) { | |
String msg = getMessage(it.next().message()); | |
writeMessage(msg); | |
} | |
} | |
public String getMessage(Message message) { | |
ByteBuffer buffer = message.payload(); | |
byte[] bytes = new byte[buffer.remaining()]; | |
buffer.get(bytes); | |
return new String(bytes); | |
} | |
public void writeMessage(String message) { | |
currentRequest.add(Requests.indexRequest(esIndex).type(esObjType).source(message)); | |
totalBulkItems.incrementAndGet(); | |
processBulkIfNeeded(); | |
} | |
/* | |
* Indexes content to elasticsearch when <b>bulkSize</b> records have been accumulated. | |
*/ | |
private void processBulkIfNeeded() { | |
if (currentRequest.numberOfActions() >= bulkSize) { | |
processBulk(); | |
} | |
} | |
/* | |
* Send the batch | |
*/ | |
private void processBulk() { | |
if (currentRequest.numberOfActions() < 1){ return; } | |
long startTime = System.currentTimeMillis(); | |
BulkResponse response; | |
BulkRequestBuilder flushableRequest = this.currentRequest; | |
this.currentRequest = esClient.prepareBulk(); | |
try { | |
if (Log.isDebugEnabled()) | |
Log.debug("Sending " + flushableRequest.numberOfActions() + " items, " + ((System.currentTimeMillis() - lastFlushTime)/1000) + "s since last write"); | |
response = flushableRequest.execute().actionGet(); | |
totalBulkTime.addAndGet(System.currentTimeMillis() - startTime); | |
reportStatus(startTime, flushableRequest.numberOfActions()); | |
// | |
if (response.hasFailures()) { | |
Log.warn("Bulk request has failures"); | |
Log.info(response.buildFailureMessage().substring(0,2000)); | |
} | |
} catch (Exception err) { | |
Log.warn("Bulk request failed: " + err.getMessage()); | |
throw new RuntimeException(err); | |
} | |
this.lastFlushTime = System.currentTimeMillis(); | |
} | |
private void startFlusher() { | |
AbstractFunction0 flushCallback = new AbstractFunction0<Object>() { | |
public Object apply() { processBulk(); return true; } }; | |
Log.info("Starting batch flusher every " + (flushInterval/1000) + " secs"); | |
batchFlusherScheduler.scheduleWithRate(flushCallback, flushInterval, flushInterval); | |
} | |
private void reportStatus(long startTime, long actions) { | |
boolean loggable = ((System.currentTimeMillis() - this.lastLogTime >= this.loggingInterval) || Log.isDebugEnabled()); | |
if (loggable) { | |
Log.info("Indexed [" + (actions) + "]items " + | |
"in [" + ((System.currentTimeMillis() - startTime)/1000) + "]s; " + | |
"avg [" + (float)(1000.0*totalBulkItems.get())/(System.currentTimeMillis() - runStartTime) + "]rec/s " + | |
"(total [" + totalBulkItems.get() + "]items " + | |
"indexed in [" + (totalBulkTime.get()/1000) + "]s, " + | |
"wall clock [" + ((System.currentTimeMillis() - runStartTime)/1000) + "]s)"); | |
this.lastLogTime = System.currentTimeMillis(); | |
} | |
} | |
// =========================================================================== | |
// | |
// Lifecycle | |
// | |
public void close() { | |
batchFlusherScheduler.shutdown(); | |
this.lastLogTime = 0; // forces report | |
Log.info("Sending all pending records"); | |
processBulk(); | |
// | |
Log.info("Closing embedded elasticsearch node"); | |
if (esClient != null) esClient.close(); | |
if (esNode != null) esNode.close(); | |
Log.info("Shutting down"); | |
} | |
public static void main(String[] args) throws Exception { | |
ElasticsearchConsumer consumer = null; | |
try { | |
consumer = new ElasticsearchConsumer(args[0], args[1], args[2]); | |
consumer.run(); | |
} catch (Exception ex) { | |
if (consumer != null) consumer.close(); | |
ex.printStackTrace(); | |
System.exit(1); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment