Last active
December 14, 2015 12:19
-
-
Save mrflip/5085980 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.infochimps.kafka.consumers; | |
import java.io.IOException; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.InputStream; | |
import java.nio.ByteBuffer; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import kafka.consumer.ConsumerConfig; | |
import kafka.consumer.ConsumerIterator; | |
import kafka.consumer.KafkaStream; | |
import kafka.javaapi.consumer.ConsumerConnector; | |
import kafka.message.Message; | |
import kafka.utils.KafkaScheduler; | |
import static org.elasticsearch.node.NodeBuilder.nodeBuilder; | |
import org.elasticsearch.ElasticSearchParseException; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.client.transport.TransportClient; | |
import org.elasticsearch.cluster.ClusterName; | |
import org.elasticsearch.node.Node; | |
import org.elasticsearch.action.bulk.BulkRequestBuilder; | |
import org.elasticsearch.indices.IndexAlreadyExistsException; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.ExceptionsHelper; | |
import org.elasticsearch.client.Requests; | |
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; | |
import java.util.concurrent.atomic.AtomicLong; | |
import org.apache.log4j.Logger; | |
class ElasticsearchConsumer { | |
private Logger Log = Logger.getLogger(ElasticsearchConsumer.class.getName()); | |
// Kafka | |
private ConsumerConnector kafkaConsumer; | |
private String kafkaTopic; | |
private KafkaScheduler batchFlusherScheduler = new KafkaScheduler(1, "es-flusher-", false); | |
// Configuration | |
private String esIndex; | |
private String esObjType; | |
private int bulkSize = 200; | |
private int flushInterval = 10 * 1000; | |
// Elasticsearch API | |
private Node esNode; | |
private Client esClient; | |
private volatile BulkRequestBuilder currentRequest; | |
// Used for bookkeeping purposes | |
private AtomicLong totalBulkTime = new AtomicLong(); | |
private AtomicLong totalBulkItems = new AtomicLong(); | |
private long runStartTime = System.currentTimeMillis(); // FIXME: use kafka.utils.Time (http://kafka.apache.org/coding-guide.html) | |
private long lastFlushTime; | |
private long lastLogTime = 0; | |
// =========================================================================== | |
// | |
// Elasticsearch Consumer Set up | |
// | |
/* | |
* Construct internal Kafka consumer and Elasticsearch node and client | |
*/ | |
public ElasticsearchConsumer(String kafkaTopic, String esIndex, String esObjType) throws Exception { | |
try { | |
this.kafkaConsumer = buildKafkaConsumer(); | |
this.esNode = buildEsNode(); | |
this.esClient = esNode.client(); | |
this.currentRequest = esClient.prepareBulk(); | |
} catch (Exception ex) { | |
throw ex; | |
} | |
this.kafkaTopic = kafkaTopic; | |
this.esIndex = esIndex; | |
this.esObjType = esObjType; | |
} | |
/* | |
* Build and configure Kafka consumer, drawing configuration from the | |
* mandatory es.consumer.properties file location | |
*/ | |
public ConsumerConnector buildKafkaConsumer() throws IOException { | |
InputStream readIn = null; | |
Properties properties = new Properties(); | |
if (System.getProperty("es.consumer.properties") != null) { | |
Log.info("Reading consumer properties from file " + System.getProperty("es.consumer.properties")); | |
File propertiesFile = new File(System.getProperty("es.consumer.properties")); | |
if (!propertiesFile.exists() || !propertiesFile.canRead()) | |
throw new IOException("Cannot read file specified"); | |
readIn = new FileInputStream(propertiesFile); | |
} else { | |
Log.info("Reading consumer properties from es.consumer.properties file on classpath"); | |
readIn = getClass().getClassLoader().getResourceAsStream("es.consumer.properties"); | |
if (readIn == null) | |
throw new IOException("Add es.consumer.properties to classpath or specify location using -Des.consumer.properties"); | |
} | |
properties.load(readIn); | |
return kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); | |
} | |
/* | |
* Build and configure Elasticsearch Node, drawing configuration from the | |
* mandatory es.config file location | |
*/ | |
public Node buildEsNode() throws IOException { | |
if (System.getProperty("es.config") == null) | |
throw new IOException("Specify elasticsearch.yml locations using -Des.config option"); | |
Log.info("Joining Elasticsearch cluster"); | |
esNode = nodeBuilder().client(true).local(false).node(); | |
ClusterHealthResponse healthResponse = esNode.client().admin().cluster() | |
.prepareHealth().setWaitForYellowStatus().setTimeout("30s").execute().actionGet(30000); | |
if (healthResponse.isTimedOut()) | |
throw new IOException("cluster not healthy, cowardly refusing to continue with export"); | |
Log.info("Got cluster name " + ClusterName.clusterNameFromSettings(esNode.settings())); | |
return esNode; | |
} | |
// =========================================================================== | |
// | |
// Message Handling | |
// | |
/* | |
* Start the message stream, and handle messages in a loop. | |
*/ | |
public void run() { | |
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); | |
topicCountMap.put(kafkaTopic, new Integer(1)); | |
Map<String, List<KafkaStream<Message>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap); | |
KafkaStream<Message> stream = consumerMap.get(kafkaTopic).get(0); | |
ConsumerIterator<Message> it = stream.iterator(); | |
Log.info("Kafka stream started"); | |
this.lastFlushTime = System.currentTimeMillis(); | |
// | |
Log.info("Starting batch flusher every " + (flushInterval/1000) + " secs"); | |
batchFlusherScheduler.scheduleWithRate("processBulk", 3, 3); // flushInterval, flushInterval) | |
// | |
while (it.hasNext()) { | |
String msg = getMessage(it.next().message()); | |
writeMessage(msg); | |
} | |
} | |
public String getMessage(Message message) { | |
ByteBuffer buffer = message.payload(); | |
byte[] bytes = new byte[buffer.remaining()]; | |
buffer.get(bytes); | |
return new String(bytes); | |
} | |
public void writeMessage(String message) { | |
currentRequest.add(Requests.indexRequest(esIndex).type(esObjType).source(message)); | |
totalBulkItems.incrementAndGet(); | |
processBulkIfNeeded(); | |
} | |
/* | |
* Indexes content to elasticsearch when <b>bulkSize</b> records have been accumulated. | |
*/ | |
private void processBulkIfNeeded() { | |
if ((currentRequest.numberOfActions() >= bulkSize) || | |
((System.currentTimeMillis() - lastFlushTime) > flushInterval)) { | |
processBulk(); | |
} | |
} | |
/* | |
* Send the batch | |
*/ | |
private void processBulk() { | |
Log.debug("Hiya processBulk"); | |
if (currentRequest.numberOfActions() < 1){ return; } | |
long startTime = System.currentTimeMillis(); | |
BulkResponse response; | |
try { | |
if (Log.isDebugEnabled()) | |
Log.debug("Sending " + currentRequest.numberOfActions() + " items, " + ((System.currentTimeMillis() - lastFlushTime)/1000) + "s since last write"); | |
response = currentRequest.execute().actionGet(); | |
totalBulkTime.addAndGet(System.currentTimeMillis() - startTime); | |
reportStatus(startTime); | |
// | |
if (response.hasFailures()) { | |
Log.warn("Bulk request has failures"); | |
Log.info(response.buildFailureMessage().substring(0,2000)); | |
} | |
} catch (Exception err) { | |
Log.warn("Bulk request failed: " + err.getMessage()); | |
throw new RuntimeException(err); | |
} | |
this.lastFlushTime = System.currentTimeMillis(); | |
this.currentRequest = esClient.prepareBulk(); | |
} | |
private void reportStatus(long startTime) { | |
boolean loggable = ((System.currentTimeMillis() - this.lastLogTime >= 10000) || Log.isDebugEnabled()); | |
if (loggable) { | |
Log.info("Indexed [" + (currentRequest.numberOfActions()) + "]items " + | |
"in [" + ((System.currentTimeMillis() - startTime)/1000) + "]s; " + | |
"avg [" + (float)(1000.0*totalBulkItems.get())/(System.currentTimeMillis() - runStartTime) + "]rec/s " + | |
"(total [" + totalBulkItems.get() + "]items " + | |
"indexed in [" + (totalBulkTime.get()/1000) + "]s, " + | |
"wall clock [" + ((System.currentTimeMillis() - runStartTime)/1000) + "]s)"); | |
this.lastLogTime = System.currentTimeMillis(); | |
} | |
} | |
// =========================================================================== | |
// | |
// Lifecycle | |
// | |
public void close() { | |
batchFlusherScheduler.shutdown(); | |
this.lastLogTime = 0; // forces report | |
Log.info("Sending all pending records"); | |
processBulk(); | |
// | |
Log.info("Closing embedded elasticsearch node"); | |
if (esClient != null) esClient.close(); | |
if (esNode != null) esNode.close(); | |
Log.info("Shutting down"); | |
} | |
public static void main(String[] args) throws Exception { | |
ElasticsearchConsumer consumer = null; | |
try { | |
consumer = new ElasticsearchConsumer(args[0], args[1], args[2]); | |
consumer.run(); | |
} catch (Exception ex) { | |
if (consumer != null) consumer.close(); | |
ex.printStackTrace(); | |
System.exit(1); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package kafka.utils | |
import java.util.concurrent._ | |
import atomic._ | |
import collection.mutable.HashMap | |
/** | |
* A scheduler for running jobs in the background | |
*/ | |
class KafkaScheduler(val numThreads: Int) extends Logging { | |
private var executor:ScheduledThreadPoolExecutor = null | |
private val daemonThreadFactory = new ThreadFactory() { | |
def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, true) | |
} | |
private val nonDaemonThreadFactory = new ThreadFactory() { | |
def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, false) | |
} | |
private val threadNamesAndIds = new HashMap[String, AtomicInteger]() | |
def startup() = { | |
executor = new ScheduledThreadPoolExecutor(numThreads) | |
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) | |
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) | |
} | |
def hasShutdown: Boolean = executor.isShutdown | |
private def ensureExecutorHasStarted = { | |
if(executor == null) | |
throw new IllegalStateException("Kafka scheduler has not been started") | |
} | |
def scheduleWithRate(fun: () => Unit, name: String, delayMs: Long, periodMs: Long, isDaemon: Boolean = true) = { | |
ensureExecutorHasStarted | |
if(isDaemon) | |
executor.setThreadFactory(daemonThreadFactory) | |
else | |
executor.setThreadFactory(nonDaemonThreadFactory) | |
val threadId = threadNamesAndIds.getOrElseUpdate(name, new AtomicInteger(0)) | |
executor.scheduleAtFixedRate(Utils.loggedRunnable(fun, name + threadId.incrementAndGet()), delayMs, periodMs, | |
TimeUnit.MILLISECONDS) | |
} | |
def shutdownNow() { | |
ensureExecutorHasStarted | |
executor.shutdownNow() | |
info("Forcing shutdown of Kafka scheduler") | |
} | |
def shutdown() { | |
ensureExecutorHasStarted | |
executor.shutdown() | |
info("Shutdown Kafka scheduler") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment