Skip to content

Instantly share code, notes, and snippets.

@gomathi
Created April 29, 2015 22:17
Show Gist options
  • Save gomathi/0d63e29385017577ce3a to your computer and use it in GitHub Desktop.
Save gomathi/0d63e29385017577ce3a to your computer and use it in GitHub Desktop.
KafkaOffsetManager
package com.yahoo.mail.force.asyncf.consumer;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import kafka.api.ConsumerMetadataRequest;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.common.TopicAndPartition;
import kafka.javaapi.ConsumerMetadataResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.network.BlockingChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.utils.ClientUtils;
import com.google.common.base.Preconditions;
/**
* An offset manager which updates kafka partitions' offset in background.
*
*/
public class OffsetManager {
private final static Log LOG = LogFactory.getLog(OffsetManager.class);
private final ConcurrentMap<Integer, ConcurrentSkipListMap<Long, JobStatus>> partitionsAndOffsets = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduledExecutorService = Executors
.newScheduledThreadPool(1);
private final static String CLIENT_ID = "test";
private final static int NO_OF_RETRIES = 5;
private final long delay;
private final TimeUnit unit;
private final String topicName;
private final List<InetSocketAddress> brokerNames;
private final String consumerName;
public OffsetManager(
String brokerNames, String topicName, String consumerName,
long delay, TimeUnit unit) {
Preconditions.checkNotNull(brokerNames);
this.delay = delay;
this.unit = unit;
this.brokerNames = ClientUtils
.parseAndValidateAddresses(getUrls(brokerNames));
Preconditions.checkArgument(this.brokerNames.size() > 0);
this.topicName = topicName;
this.consumerName = consumerName;
}
private static List<String> getUrls(String brokerNames) {
String[] brokers = brokerNames.split(",");
return Arrays.asList(brokers);
}
public void startOffsetUpdates() {
scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
commitOffsetsWithBroker();
}
catch (InterruptedException e) {
LOG.error("Exception occurred while updating offset.", e);
}
}
}, 0, delay, unit);
}
public void stopOffsetUpdates() {
scheduledExecutorService.shutdown();
}
public void addOffsetForPartition(int partitionId, long offset) {
if (!partitionsAndOffsets.containsKey(partitionId))
this.partitionsAndOffsets.putIfAbsent(partitionId,
new ConcurrentSkipListMap<>());
this.partitionsAndOffsets.get(partitionId).put(offset,
JobStatus.RUNNING);
}
public void updateOffsetForPartition(int partitionId, long offset) {
if (partitionsAndOffsets.containsKey(partitionId))
this.partitionsAndOffsets.putIfAbsent(partitionId,
new ConcurrentSkipListMap<>());
this.partitionsAndOffsets.get(partitionId).put(offset,
JobStatus.FINISHED);
}
private void commitOffsetsWithBroker() throws InterruptedException {
LOG.info("Updating offset - starting.");
for (Map.Entry<Integer, ConcurrentSkipListMap<Long, JobStatus>> entry : partitionsAndOffsets
.entrySet()) {
int partitionId = entry.getKey();
ConcurrentSkipListMap<Long, JobStatus> sMap = entry.getValue();
Long offset = null;
for (Map.Entry<Long, JobStatus> offsets : sMap.entrySet()) {
if (offsets.getValue() == JobStatus.FINISHED) {
offset = offsets.getKey();
sMap.remove(offsets.getKey());
}
else
break;
}
if (offset != null) {
long sleepTime = 1000;
int brokerIndex = 0;
for (int i = 1; i <= NO_OF_RETRIES; i++) {
boolean successful = commitOffset(
brokerNames.get(brokerIndex).getHostName(),
brokerNames.get(brokerIndex).getPort(),
consumerName, topicName, partitionId, offset);
if (successful)
break;
Thread.sleep(sleepTime);
sleepTime *= 2;
brokerIndex = (brokerIndex + 1) / brokerNames.size();
}
LOG.info("Updated offset for partitionId=" + partitionId
+ " offset=" + offset);
}
}
LOG.info("Updating offset - finished.");
}
/**
* Returns true on successful commit or false on failure on commit.
*
* @param brokerName
* @param portNo
* @param consumerGroupName
* @param topicName
* @param partition
* @param offset
* @return
*/
private static boolean commitOffset(String brokerName, int portNo,
String consumerGroupName, String topicName, int partition,
long offset) {
BlockingChannel channel = new BlockingChannel(brokerName, portNo,
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(), 5000);
channel.connect();
int correlationId = 0;
final TopicAndPartition topicAndPartition = new TopicAndPartition(
topicName, partition);
channel.send(new ConsumerMetadataRequest(consumerGroupName,
ConsumerMetadataRequest.CurrentVersion(), correlationId++,
CLIENT_ID));
ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse
.readFrom(channel.receive().buffer());
if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
Broker offsetManager = metadataResponse.coordinator();
// if the coordinator is different, from the above channel's host then reconnect
channel.disconnect();
channel = new BlockingChannel(offsetManager.host(),
offsetManager.port(),
BlockingChannel.UseDefaultBufferSize(),
BlockingChannel.UseDefaultBufferSize(), 5000 /* read timeout in millis */);
channel.connect();
}
long now = System.currentTimeMillis();
Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>();
offsets.put(topicAndPartition, new OffsetAndMetadata(offset,
"associated metadata", now));
OffsetCommitRequest commitRequest = new OffsetCommitRequest(
consumerGroupName, offsets, correlationId++, CLIENT_ID,
(short) 1);
channel.send(commitRequest.underlying());
OffsetCommitResponse commitResponse = OffsetCommitResponse
.readFrom(channel.receive().buffer());
if (commitResponse.hasError()) {
LOG.error("Error occurred while committing offset.");
for (Object partitionErrorCode : commitResponse.errors().values()) {
if ((short) partitionErrorCode == ErrorMapping
.NotCoordinatorForConsumerCode()
|| (short) partitionErrorCode == ErrorMapping
.ConsumerCoordinatorNotAvailableCode()) {
channel.disconnect();
}
else {
LOG.error("Unable to commit offset to kafka topic="
+ topicName + "for partition=" + partition
+ " due to errorCode=" + partitionErrorCode);
}
}
return false;
}
LOG.info("Committed successfully the offset.");
return true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment