// 限制:消费组必须是存储在__offset主题中存活的,不然不会生产checkpoint从而导致没法转换!
RemoteClusterUtils.translateOffsets(new MirrorSourceConfig(props).targetAdminConfig("ROLE_ID"), sourceClusterAlias, "消费组ID", Duration.ofSeconds(10));
OptionalLong translateDownstream(String group, TopicPartition sourceTopicPartition, long upstreamOffset) {
if (!readToEnd) {
// If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
// This prevents emitting stale offsets while initially reading the offset syncs topic.
log.debug("translateDownstream({},{},{}): Skipped (initial offset syncs read still in progress)",
group, sourceTopicPartition, upstreamOffset);
return OptionalLong.empty();
}
Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition, upstreamOffset);
if (offsetSync.isPresent()) {
if (offsetSync.get().upstreamOffset() > upstreamOffset) {
// Offset is too far in the past to translate accurately
log.debug("translateDownstream({},{},{}): Skipped ({} is ahead of upstream consumer group {})",
group, sourceTopicPartition, upstreamOffset,
offsetSync.get(), upstreamOffset);
return OptionalLong.of(-1L);
}
// If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
// downstream offset past the offset sync itself. This is because we know that future records must appear
// ahead of the offset sync, but we cannot estimate how many offsets from the upstream topic
// will be written vs dropped. If we overestimate, then we may skip the correct offset and have data loss.
// This also handles consumer groups at the end of a topic whose offsets point past the last valid record.
// This may cause re-reading of records depending on the age of the offset sync.
// s=offset sync pair, ?=record may or may not be replicated, g=consumer group offset, r=re-read record
// source |-s?????r???g-|
// | ______/
// | /
// vv
// target |-sg----r-----|
long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
log.debug("translateDownstream({},{},{}): Translated {} (relative to {})",
group, sourceTopicPartition, upstreamOffset,
offsetSync.get().downstreamOffset() + upstreamStep,
offsetSync.get()
);
return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
} else {
log.debug("translateDownstream({},{},{}): Skipped (offset sync not found)",
group, sourceTopicPartition, upstreamOffset);
return OptionalLong.empty();
}
}