Last active
November 25, 2020 18:51
-
-
Save adamkotwasinski/4c88727d1f1fdbcaffcd519624d9ae0f to your computer and use it in GitHub Desktop.
KafkaMirrorMakerRewriterExample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example; | |
import java.time.Duration; | |
import java.util.Arrays; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.Properties; | |
import org.apache.kafka.clients.CommonClientConfigs; | |
import org.apache.kafka.clients.admin.AdminClient; | |
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult; | |
import org.apache.kafka.clients.admin.KafkaAdminClient; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.consumer.OffsetAndMetadata; | |
import org.apache.kafka.common.TopicPartition; | |
import org.apache.kafka.common.serialization.ByteArrayDeserializer; | |
import org.apache.kafka.connect.mirror.Checkpoint; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.google.common.collect.HashBasedTable; | |
import com.google.common.collect.Table; | |
public class KafkaMirrorMakerRewriterExample { | |
private static final Logger LOG = LoggerFactory.getLogger(KafkaMirrorMakerRewriterExample.class); | |
// Target cluster. | |
private static final String TARGET_CLUSTER_SERVERS = "localhost:9092"; | |
// Source cluster alias. | |
private static final String SOURCE_CLUSTER_ALIAS = "source"; | |
private static final String CHECKPOINTS_TOPIC_SUFFIX = ".checkpoints.internal"; | |
// Checkpoint topic partition. By design, this topic is going to have only one partition. | |
private static final TopicPartition CHECKPOINT_PARTITION = new TopicPartition(SOURCE_CLUSTER_ALIAS + CHECKPOINTS_TOPIC_SUFFIX, 0); | |
public static void main(final String[] args) throws Exception { | |
final Table<String, TopicPartition, OffsetAndMetadata> offsets = computeConsumerGroupOffsets(); | |
// 'offsets' contains the most recent mappings [group, partition] -> offset. | |
updateConsumerGroupOffsets(offsets); | |
} | |
private static Table<String, TopicPartition, OffsetAndMetadata> computeConsumerGroupOffsets() { | |
// A simple table that keeps mapping from [group, partition] pair to offset data. | |
final Table<String, TopicPartition, OffsetAndMetadata> groupAndPartitionToOffset = HashBasedTable.create(); | |
final Properties targetClusterConfig = new Properties(); | |
targetClusterConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, TARGET_CLUSTER_SERVERS); | |
// Compare: org.apache.kafka.connect.mirror.MirrorClient.remoteConsumerOffsets(String, String, Duration) | |
try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(targetClusterConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())) { | |
consumer.assign(Arrays.asList(CHECKPOINT_PARTITION)); | |
consumer.seekToBeginning(Arrays.asList(CHECKPOINT_PARTITION)); | |
final Long endOffset = consumer.endOffsets(Arrays.asList(CHECKPOINT_PARTITION)).get(CHECKPOINT_PARTITION); | |
if (null == endOffset) { | |
throw new IllegalStateException("Could not find end offset for checkpoint partition"); | |
} | |
while (true) { | |
// No time limit here. | |
final ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); | |
for (final ConsumerRecord<byte[], byte[]> record : records) { | |
final Checkpoint checkpoint = Checkpoint.deserializeRecord(record); | |
// As the checkpoint topic has only one partition, and records are received in order, it is safe | |
// to just update the table with the most recent offsets. | |
groupAndPartitionToOffset.put( | |
checkpoint.consumerGroupId(), | |
checkpoint.topicPartition(), | |
checkpoint.offsetAndMetadata()); | |
} | |
// Whether we have consumed all messages up to end offset. | |
if (consumer.position(CHECKPOINT_PARTITION) >= endOffset) { | |
break; | |
} | |
} | |
} | |
return groupAndPartitionToOffset; | |
} | |
private static void updateConsumerGroupOffsets(final Table<String, TopicPartition, OffsetAndMetadata> groupAndPartitionToOffset) { | |
final Properties adminConfig = new Properties(); | |
adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, TARGET_CLUSTER_SERVERS); | |
try (final AdminClient admin = KafkaAdminClient.create(adminConfig)) { | |
final List<AlterConsumerGroupOffsetsResult> alterResults = new LinkedList<>(); | |
// Let's update offsets for each group, group by group. | |
for (final Entry<String, Map<TopicPartition, OffsetAndMetadata>> entry : groupAndPartitionToOffset.rowMap().entrySet()) { | |
final String groupId = entry.getKey(); | |
final Map<TopicPartition, OffsetAndMetadata> partitionOffsets = entry.getValue(); | |
LOG.info("Altering group [{}], partitions: {}", groupId, partitionOffsets.keySet()); | |
final AlterConsumerGroupOffsetsResult alterResult = admin.alterConsumerGroupOffsets(groupId, partitionOffsets); | |
alterResults.add(alterResult); | |
} | |
// Wait for each request to finish. | |
for (final AlterConsumerGroupOffsetsResult ar : alterResults) { | |
try { | |
// No time limit here. | |
ar.all().get(); | |
} | |
catch (final Exception e) { | |
LOG.error("Could not alter offsets", e); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment