Skip to content

Instantly share code, notes, and snippets.

@C0urante
Created May 25, 2023 20:39
Show Gist options
  • Select an option

  • Save C0urante/30dba7b9dce567f33df0526d68765860 to your computer and use it in GitHub Desktop.

Select an option

Save C0urante/30dba7b9dce567f33df0526d68765860 to your computer and use it in GitHub Desktop.
Resets the offsets for a topic that is being replicated by MirrorMaker 2
#! /usr/bin/env bash
usage() {
cat << EOM
Usage: $0 -b <bootstrap_server> -c <client_config> -a <source_cluster_alias> -t <topic> -p <partitions> [-o <offsets_topic>]
-h Print this message, and then exit
-b Address for the Kafka cluster that hosts the offsets topic for MirrorMaker (by default, this is the target cluster in the replication flow)
-c Client configuration file with certificates and credentials to access Kafka
-a Alias for the source cluster in the MirrorMaker replication flow
-t Topic to reset
-p Number of partitions in the topic
-o Offsets topic for MirrorMaker; defaults to mm2-offsets.\$ALIAS.internal, where \$ALIAS is the source cluster alias
EOM
}
while getopts 'b:c:a:t:p:o:h' o; do
case "$o" in
b)
BOOTSTRAP_SERVER="$OPTARG"
;;
c)
CLIENT_CONFIG="$OPTARG"
;;
a)
SOURCE_CLUSTER_ALIAS="$OPTARG"
;;
t)
TOPIC="$OPTARG"
;;
p)
PARTITIONS="$OPTARG"
;;
o)
OFFSETS_TOPIC="$OPTARG"
;;
h)
usage
exit 0
;;
*)
usage
exit 1
;;
esac
done
if [[ -z "$BOOTSTRAP_SERVER" ]]; then
echo "Missing required option: -b"
usage
exit 1
fi
if [[ -z "$CLIENT_CONFIG" ]]; then
echo "Missing required option: -c"
usage
exit 1
fi
if [[ -z "$SOURCE_CLUSTER_ALIAS" ]]; then
echo "Missing required option: -a"
usage
exit 1
fi
if [[ -z "$TOPIC" ]]; then
echo "Missing required option: -t"
usage
exit 1
fi
if [[ -z "$PARTITIONS" ]]; then
echo "Missing required option: -p"
usage
exit 1
fi
if [[ -z "$OFFSETS_TOPIC" ]]; then
OFFSETS_TOPIC="mm2-offsets.$SOURCE_CLUSTER_ALIAS.internal"
fi
reset_partition() {
partition=$1
echo '["MirrorSourceConnector",{"cluster":"'"$SOURCE_CLUSTER_ALIAS"'","partition":'"$partition"',"topic":"'"$TOPIC"'"}]|null'
}
reset_partitions() {
partitions=$1
for partition in $(seq 0 $((PARTITIONS - 1))); do
reset_partition $partition
done
}
write_offsets() {
kafka-console-producer \
--bootstrap-server $BOOTSTRAP_SERVER \
--producer.config $CLIENT_CONFIG \
--topic $OFFSETS_TOPIC \
--property parse.key=true \
--property null.marker=null \
--property key.separator='|'
}
reset_partitions $PARTITIONS | write_offsets
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment