Skip to content

Instantly share code, notes, and snippets.

@adamkotwasinski
Last active November 25, 2020 18:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamkotwasinski/4c88727d1f1fdbcaffcd519624d9ae0f to your computer and use it in GitHub Desktop.
Save adamkotwasinski/4c88727d1f1fdbcaffcd519624d9ae0f to your computer and use it in GitHub Desktop.
KafkaMirrorMakerRewriterExample
package example;
import java.time.Duration;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.connect.mirror.Checkpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
public class KafkaMirrorMakerRewriterExample {
private static final Logger LOG = LoggerFactory.getLogger(KafkaMirrorMakerRewriterExample.class);
// Target cluster.
private static final String TARGET_CLUSTER_SERVERS = "localhost:9092";
// Source cluster alias.
private static final String SOURCE_CLUSTER_ALIAS = "source";
private static final String CHECKPOINTS_TOPIC_SUFFIX = ".checkpoints.internal";
// Checkpoint topic partition. By design, this topic is going to have only one partition.
private static final TopicPartition CHECKPOINT_PARTITION = new TopicPartition(SOURCE_CLUSTER_ALIAS + CHECKPOINTS_TOPIC_SUFFIX, 0);
public static void main(final String[] args) throws Exception {
final Table<String, TopicPartition, OffsetAndMetadata> offsets = computeConsumerGroupOffsets();
// 'offsets' contains the most recent mappings [group, partition] -> offset.
updateConsumerGroupOffsets(offsets);
}
private static Table<String, TopicPartition, OffsetAndMetadata> computeConsumerGroupOffsets() {
// A simple table that keeps mapping from [group, partition] pair to offset data.
final Table<String, TopicPartition, OffsetAndMetadata> groupAndPartitionToOffset = HashBasedTable.create();
final Properties targetClusterConfig = new Properties();
targetClusterConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, TARGET_CLUSTER_SERVERS);
// Compare: org.apache.kafka.connect.mirror.MirrorClient.remoteConsumerOffsets(String, String, Duration)
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(targetClusterConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
consumer.assign(Arrays.asList(CHECKPOINT_PARTITION));
consumer.seekToBeginning(Arrays.asList(CHECKPOINT_PARTITION));
final Long endOffset = consumer.endOffsets(Arrays.asList(CHECKPOINT_PARTITION)).get(CHECKPOINT_PARTITION);
if (null == endOffset) {
throw new IllegalStateException("Could not find end offset for checkpoint partition");
}
while (true) {
// No time limit here.
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
for (final ConsumerRecord<byte[], byte[]> record : records) {
final Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
// As the checkpoint topic has only one partition, and records are received in order, it is safe
// to just update the table with the most recent offsets.
groupAndPartitionToOffset.put(
checkpoint.consumerGroupId(),
checkpoint.topicPartition(),
checkpoint.offsetAndMetadata());
}
// Whether we have consumed all messages up to end offset.
if (consumer.position(CHECKPOINT_PARTITION) >= endOffset) {
break;
}
}
}
return groupAndPartitionToOffset;
}
private static void updateConsumerGroupOffsets(final Table<String, TopicPartition, OffsetAndMetadata> groupAndPartitionToOffset) {
final Properties adminConfig = new Properties();
adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, TARGET_CLUSTER_SERVERS);
try (final AdminClient admin = KafkaAdminClient.create(adminConfig)) {
final List<AlterConsumerGroupOffsetsResult> alterResults = new LinkedList<>();
// Let's update offsets for each group, group by group.
for (final Entry<String, Map<TopicPartition, OffsetAndMetadata>> entry : groupAndPartitionToOffset.rowMap().entrySet()) {
final String groupId = entry.getKey();
final Map<TopicPartition, OffsetAndMetadata> partitionOffsets = entry.getValue();
LOG.info("Altering group [{}], partitions: {}", groupId, partitionOffsets.keySet());
final AlterConsumerGroupOffsetsResult alterResult = admin.alterConsumerGroupOffsets(groupId, partitionOffsets);
alterResults.add(alterResult);
}
// Wait for each request to finish.
for (final AlterConsumerGroupOffsetsResult ar : alterResults) {
try {
// No time limit here.
ar.all().get();
}
catch (final Exception e) {
LOG.error("Could not alter offsets", e);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment