Skip to content

Instantly share code, notes, and snippets.

@pmanvi
Last active July 26, 2019 13:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pmanvi/10615eac383c9156ae3809ff4f5d4301 to your computer and use it in GitHub Desktop.
Save pmanvi/10615eac383c9156ae3809ff4f5d4301 to your computer and use it in GitHub Desktop.
For resetting offset command
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.zeroturnaround.exec.ProcessExecutor;
public class OffsetCommand {
public static void main(String[] args) throws Exception {
String output = new ProcessExecutor().command("java", "-version")
.readOutput(true).execute()
.outputUTF8();
System.out.println(output);
if(args.length < 3 ) {
System.out.println("java -cp aos-command-executor.jar OffsetCommand " +
"<consumerGruopId> <broker> <topic>:<partition>:<targetOffset>,<topic>:<partition>:<targetOffset>");
System.out.println("Ex: connect-elasticsearch-Clickstream_april_08, uat-kafka01.cloud.operative.com:9091, api-clickstream:0:147116901,");
System.out.println("=====");
System.exit(0);
}
System.out.println(Arrays.asList(args).toString());
String consumerGroupId = args[0];
String kafkaHost = args[1]; //"localhost:8091";
String[] topicPartitions = args[2].split(",");
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for(String tp: topicPartitions) {
String[] values = tp.split(":");
String topic = values[0];
int partition = Integer.parseInt(values[1]);
long targetOffset = Integer.parseInt(values[2]);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(targetOffset);
TopicPartition topicPartition_0 = new TopicPartition(topic, partition);
offsets.put(topicPartition_0, offsetAndMetadata);
changeOffset(consumerGroupId, kafkaHost, offsets);
}
}
static KafkaConsumer<String, String> consumer = null;
private static void changeOffset(String consumerGroupId, String kafkaHost,
Map<TopicPartition, OffsetAndMetadata> offsets) {
if(consumer == null) {
consumer = getKafkaConsumer(consumerGroupId, kafkaHost);
}
try {
consumer.commitSync(offsets);
}catch (Exception exp) {
exp.printStackTrace();
}
}
private static KafkaConsumer<String, String> getKafkaConsumer(String consumerGroupId, String kafkaHost) {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaHost);
props.put("group.id", consumerGroupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", true);
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
return new KafkaConsumer<>(props);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment