Last active
June 7, 2024 13:31
-
-
Save ram-pi/dd99df941820fab263e48d7f5cfa26d5 to your computer and use it in GitHub Desktop.
Change Replication Factor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
# Usage: change_replication_factor.sh <bootstrap-server> <broker-list> <blacklist-prefixes> <min-isr> <extra-var> | |
# Example: ./change_replication_factor.sh localhost:9092 1,2,3 _confluent,_schemas 2 --comment-config client.properties | |
# Example: export PATH=$PATH:/path/to/kafka/cli/bin ; ./change_replication_factor.sh localhost:9092 1,2,3 _confluent,_schemas 2 --comment-config client.properties | |
# Requires: jq, kafka-topics, kafka-reassign-partitions, kafka-configs | |
# Author: @ram-pi | |
# Version: 0.1 | |
# Date: 2023-11-06 | |
# Description: Change replication factor and min.insync.replicas for all topics in a Kafka cluster | |
# NOTE: It will not work for topics with a replication factor greater then the number of brokers in the broker-list | |
if [ $# -eq 0 ]; then | |
echo "Usage: change_replication_factor.sh <bootstrap-server> <broker-list> <blacklist-prefixes> <min-isr> <extra-var>" | |
echo "Example: ./change_replication_factor.sh localhost:9092 1,2,3 _confluent,_schemas 2 --comment-config client.properties" | |
echo "Example: export PATH=\$PATH:/path/to/kafka/cli/bin ; ./change_replication_factor.sh localhost:9092 1,2,3 _confluent,_schemas 2 --comment-config client.properties" | |
exit 1 | |
fi | |
# Create topics template | |
echo '{"topics":[{"topic":"!TOPIC!"}],"version":1}' | jq > topics.template.json | |
BOOTSTRAP_SERVER=$1 | |
shift | |
BROKER_LIST=$1 | |
shift | |
BLACKLIST_PREFIXES=$1 | |
shift | |
MIN_ISR=$1 | |
shift | |
# shuffle array | |
# https://stackoverflow.com/questions/5533569/simple-method-to-shuffle-the-elements-of-an-array-in-bash-shell | |
shuffle_array() { | |
local arr=("$@") | |
local i | |
for ((i=${#arr[@]}-1; i>0; i--)); do | |
local j=$((RANDOM % (i+1))) | |
local temp=${arr[i]} | |
arr[i]=${arr[j]} | |
arr[j]=$temp | |
done | |
echo "${arr[@]}" | |
} | |
EXTRA_VAR=("$@") | |
echo "Bootstrap server: $BOOTSTRAP_SERVER" | |
echo "Broker list: $BROKER_LIST" | |
echo "Blacklist: $BLACKLIST_PREFIXES" | |
echo "Min ISR: $MIN_ISR" | |
echo "EXTRA_VAR:" "${EXTRA_VAR[@]}" | |
# Get list of topics | |
topics=$(kafka-topics --bootstrap-server "$BOOTSTRAP_SERVER" --list "${EXTRA_VAR[@]}") | |
# Iterate over topics | |
for topic in $topics; do | |
# Split string into array | |
IFS=',' read -ra prefixes <<< "$BLACKLIST_PREFIXES" | |
# Loop over array | |
for prefix in "${prefixes[@]}" | |
do | |
# Skip blacklisted topics | |
if [[ $topic =~ ^"$prefix".* ]]; then | |
echo "Skipping blacklisted topic: $topic" | |
continue 2 | |
fi | |
done | |
# Reassign partitions for topic | |
echo "Reassign topic: $topic..." | |
sed -i'.bak' "s/\!TOPIC\!/$topic/" topics.template.json | |
mv topics.template.json topics.json ; mv topics.template.json.bak topics.template.json | |
# generate proposed assignement | |
kafka-reassign-partitions --bootstrap-server "$BOOTSTRAP_SERVER" --topics-to-move-json-file topics.json --broker-list "$BROKER_LIST" --generate "${EXTRA_VAR[@]}" | \ | |
tee >(awk -F: '/Current partition replica assignment/ { getline; print $0 }' | \ | |
jq > /tmp/topic.current.json) >(awk -F: '/Proposed partition reassignment configuration/ { getline; print $0 }' | jq > /tmp/topic.proposed.json) | |
jq 'del(.partitions[] | .log_dirs)' /tmp/topic.proposed.json > /tmp/topic.reassign.json | |
# initialize replicas array | |
jq '.partitions[].replicas |= []' /tmp/topic.reassign.json > /tmp/tmp.json ; mv /tmp/tmp.json /tmp/topic.reassign.json | |
# Split string into array | |
IFS=',' read -ra array <<< "$BROKER_LIST" | |
# Loop over array | |
shuffled_array=($(shuffle_array "${array[@]}")) | |
for element in "${shuffled_array[@]}" | |
do | |
jq --arg broker "$element" '.partitions[].replicas += [$broker|tonumber]' /tmp/topic.reassign.json > /tmp/tmp.json ; mv /tmp/tmp.json /tmp/topic.reassign.json | |
done | |
# Reassign partitions | |
kafka-reassign-partitions --bootstrap-server "$BOOTSTRAP_SERVER" --reassignment-json-file /tmp/topic.reassign.json --execute "${EXTRA_VAR[@]}" | |
kafka-reassign-partitions --bootstrap-server "$BOOTSTRAP_SERVER" --reassignment-json-file /tmp/topic.reassign.json --verify "${EXTRA_VAR[@]}" | |
# update min.insync.replicas | |
kafka-configs --bootstrap-server "$BOOTSTRAP_SERVER" --alter --entity-type topics --entity-name "$topic" --add-config min.insync.replicas="$MIN_ISR" "${EXTRA_VAR[@]}" | |
done |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SASL_SSL PLAIN client properties sample | |
security.protocol=SASL_SSL | |
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='client' password='client-secret'; | |
sasl.mechanism=PLAIN |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment