Skip to content

Instantly share code, notes, and snippets.

@mrflip
Last active December 14, 2015 12:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrflip/5085980 to your computer and use it in GitHub Desktop.
Save mrflip/5085980 to your computer and use it in GitHub Desktop.
package com.infochimps.kafka.consumers;
import java.io.IOException;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.utils.KafkaScheduler;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.node.Node;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.client.Requests;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
class ElasticsearchConsumer {
private Logger Log = Logger.getLogger(ElasticsearchConsumer.class.getName());
// Kafka
private ConsumerConnector kafkaConsumer;
private String kafkaTopic;
private KafkaScheduler batchFlusherScheduler = new KafkaScheduler(1, "es-flusher-", false);
// Configuration
private String esIndex;
private String esObjType;
private int bulkSize = 200;
private int flushInterval = 10 * 1000;
// Elasticsearch API
private Node esNode;
private Client esClient;
private volatile BulkRequestBuilder currentRequest;
// Used for bookkeeping purposes
private AtomicLong totalBulkTime = new AtomicLong();
private AtomicLong totalBulkItems = new AtomicLong();
private long runStartTime = System.currentTimeMillis(); // FIXME: use kafka.utils.Time (http://kafka.apache.org/coding-guide.html)
private long lastFlushTime;
private long lastLogTime = 0;
// ===========================================================================
//
// Elasticsearch Consumer Set up
//
/*
* Construct internal Kafka consumer and Elasticsearch node and client
*/
public ElasticsearchConsumer(String kafkaTopic, String esIndex, String esObjType) throws Exception {
try {
this.kafkaConsumer = buildKafkaConsumer();
this.esNode = buildEsNode();
this.esClient = esNode.client();
this.currentRequest = esClient.prepareBulk();
} catch (Exception ex) {
throw ex;
}
this.kafkaTopic = kafkaTopic;
this.esIndex = esIndex;
this.esObjType = esObjType;
}
/*
* Build and configure Kafka consumer, drawing configuration from the
* mandatory es.consumer.properties file location
*/
public ConsumerConnector buildKafkaConsumer() throws IOException {
InputStream readIn = null;
Properties properties = new Properties();
if (System.getProperty("es.consumer.properties") != null) {
Log.info("Reading consumer properties from file " + System.getProperty("es.consumer.properties"));
File propertiesFile = new File(System.getProperty("es.consumer.properties"));
if (!propertiesFile.exists() || !propertiesFile.canRead())
throw new IOException("Cannot read file specified");
readIn = new FileInputStream(propertiesFile);
} else {
Log.info("Reading consumer properties from es.consumer.properties file on classpath");
readIn = getClass().getClassLoader().getResourceAsStream("es.consumer.properties");
if (readIn == null)
throw new IOException("Add es.consumer.properties to classpath or specify location using -Des.consumer.properties");
}
properties.load(readIn);
return kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
/*
* Build and configure Elasticsearch Node, drawing configuration from the
* mandatory es.config file location
*/
public Node buildEsNode() throws IOException {
if (System.getProperty("es.config") == null)
throw new IOException("Specify elasticsearch.yml locations using -Des.config option");
Log.info("Joining Elasticsearch cluster");
esNode = nodeBuilder().client(true).local(false).node();
ClusterHealthResponse healthResponse = esNode.client().admin().cluster()
.prepareHealth().setWaitForYellowStatus().setTimeout("30s").execute().actionGet(30000);
if (healthResponse.isTimedOut())
throw new IOException("cluster not healthy, cowardly refusing to continue with export");
Log.info("Got cluster name " + ClusterName.clusterNameFromSettings(esNode.settings()));
return esNode;
}
// ===========================================================================
//
// Message Handling
//
/*
* Start the message stream, and handle messages in a loop.
*/
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(kafkaTopic, new Integer(1));
Map<String, List<KafkaStream<Message>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap);
KafkaStream<Message> stream = consumerMap.get(kafkaTopic).get(0);
ConsumerIterator<Message> it = stream.iterator();
Log.info("Kafka stream started");
this.lastFlushTime = System.currentTimeMillis();
//
Log.info("Starting batch flusher every " + (flushInterval/1000) + " secs");
batchFlusherScheduler.scheduleWithRate("processBulk", 3, 3); // flushInterval, flushInterval)
//
while (it.hasNext()) {
String msg = getMessage(it.next().message());
writeMessage(msg);
}
}
public String getMessage(Message message) {
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return new String(bytes);
}
public void writeMessage(String message) {
currentRequest.add(Requests.indexRequest(esIndex).type(esObjType).source(message));
totalBulkItems.incrementAndGet();
processBulkIfNeeded();
}
/*
* Indexes content to elasticsearch when <b>bulkSize</b> records have been accumulated.
*/
private void processBulkIfNeeded() {
if ((currentRequest.numberOfActions() >= bulkSize) ||
((System.currentTimeMillis() - lastFlushTime) > flushInterval)) {
processBulk();
}
}
/*
* Send the batch
*/
private void processBulk() {
Log.debug("Hiya processBulk");
if (currentRequest.numberOfActions() < 1){ return; }
long startTime = System.currentTimeMillis();
BulkResponse response;
try {
if (Log.isDebugEnabled())
Log.debug("Sending " + currentRequest.numberOfActions() + " items, " + ((System.currentTimeMillis() - lastFlushTime)/1000) + "s since last write");
response = currentRequest.execute().actionGet();
totalBulkTime.addAndGet(System.currentTimeMillis() - startTime);
reportStatus(startTime);
//
if (response.hasFailures()) {
Log.warn("Bulk request has failures");
Log.info(response.buildFailureMessage().substring(0,2000));
}
} catch (Exception err) {
Log.warn("Bulk request failed: " + err.getMessage());
throw new RuntimeException(err);
}
this.lastFlushTime = System.currentTimeMillis();
this.currentRequest = esClient.prepareBulk();
}
private void reportStatus(long startTime) {
boolean loggable = ((System.currentTimeMillis() - this.lastLogTime >= 10000) || Log.isDebugEnabled());
if (loggable) {
Log.info("Indexed [" + (currentRequest.numberOfActions()) + "]items " +
"in [" + ((System.currentTimeMillis() - startTime)/1000) + "]s; " +
"avg [" + (float)(1000.0*totalBulkItems.get())/(System.currentTimeMillis() - runStartTime) + "]rec/s " +
"(total [" + totalBulkItems.get() + "]items " +
"indexed in [" + (totalBulkTime.get()/1000) + "]s, " +
"wall clock [" + ((System.currentTimeMillis() - runStartTime)/1000) + "]s)");
this.lastLogTime = System.currentTimeMillis();
}
}
// ===========================================================================
//
// Lifecycle
//
public void close() {
batchFlusherScheduler.shutdown();
this.lastLogTime = 0; // forces report
Log.info("Sending all pending records");
processBulk();
//
Log.info("Closing embedded elasticsearch node");
if (esClient != null) esClient.close();
if (esNode != null) esNode.close();
Log.info("Shutting down");
}
public static void main(String[] args) throws Exception {
ElasticsearchConsumer consumer = null;
try {
consumer = new ElasticsearchConsumer(args[0], args[1], args[2]);
consumer.run();
} catch (Exception ex) {
if (consumer != null) consumer.close();
ex.printStackTrace();
System.exit(1);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.utils
import java.util.concurrent._
import atomic._
import collection.mutable.HashMap
/**
* A scheduler for running jobs in the background
*/
class KafkaScheduler(val numThreads: Int) extends Logging {
private var executor:ScheduledThreadPoolExecutor = null
private val daemonThreadFactory = new ThreadFactory() {
def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, true)
}
private val nonDaemonThreadFactory = new ThreadFactory() {
def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, false)
}
private val threadNamesAndIds = new HashMap[String, AtomicInteger]()
def startup() = {
executor = new ScheduledThreadPoolExecutor(numThreads)
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
}
def hasShutdown: Boolean = executor.isShutdown
private def ensureExecutorHasStarted = {
if(executor == null)
throw new IllegalStateException("Kafka scheduler has not been started")
}
def scheduleWithRate(fun: () => Unit, name: String, delayMs: Long, periodMs: Long, isDaemon: Boolean = true) = {
ensureExecutorHasStarted
if(isDaemon)
executor.setThreadFactory(daemonThreadFactory)
else
executor.setThreadFactory(nonDaemonThreadFactory)
val threadId = threadNamesAndIds.getOrElseUpdate(name, new AtomicInteger(0))
executor.scheduleAtFixedRate(Utils.loggedRunnable(fun, name + threadId.incrementAndGet()), delayMs, periodMs,
TimeUnit.MILLISECONDS)
}
def shutdownNow() {
ensureExecutorHasStarted
executor.shutdownNow()
info("Forcing shutdown of Kafka scheduler")
}
def shutdown() {
ensureExecutorHasStarted
executor.shutdown()
info("Shutdown Kafka scheduler")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment