Skip to content

Instantly share code, notes, and snippets.

@ram-pi
Last active June 7, 2024 13:31
Show Gist options
  • Save ram-pi/dd99df941820fab263e48d7f5cfa26d5 to your computer and use it in GitHub Desktop.
Save ram-pi/dd99df941820fab263e48d7f5cfa26d5 to your computer and use it in GitHub Desktop.
Change Replication Factor
#!/usr/bin/env bash
# Usage: change_replication_factor.sh <bootstrap-server> <broker-list> <blacklist-prefixes> <min-isr> <extra-var>
# Example: ./change_replication_factor.sh localhost:9092 1,2,3 _confluent,_schemas 2 --comment-config client.properties
# Example: export PATH=$PATH:/path/to/kafka/cli/bin ; ./change_replication_factor.sh localhost:9092 1,2,3 _confluent,_schemas 2 --comment-config client.properties
# Requires: jq, kafka-topics, kafka-reassign-partitions, kafka-configs
# Author: @ram-pi
# Version: 0.1
# Date: 2023-11-06
# Description: Change replication factor and min.insync.replicas for all topics in a Kafka cluster
# NOTE: It will not work for topics with a replication factor greater then the number of brokers in the broker-list
if [ $# -eq 0 ]; then
echo "Usage: change_replication_factor.sh <bootstrap-server> <broker-list> <blacklist-prefixes> <min-isr> <extra-var>"
echo "Example: ./change_replication_factor.sh localhost:9092 1,2,3 _confluent,_schemas 2 --comment-config client.properties"
echo "Example: export PATH=\$PATH:/path/to/kafka/cli/bin ; ./change_replication_factor.sh localhost:9092 1,2,3 _confluent,_schemas 2 --comment-config client.properties"
exit 1
fi
# Create topics template
echo '{"topics":[{"topic":"!TOPIC!"}],"version":1}' | jq > topics.template.json
BOOTSTRAP_SERVER=$1
shift
BROKER_LIST=$1
shift
BLACKLIST_PREFIXES=$1
shift
MIN_ISR=$1
shift
# shuffle array
# https://stackoverflow.com/questions/5533569/simple-method-to-shuffle-the-elements-of-an-array-in-bash-shell
shuffle_array() {
local arr=("$@")
local i
for ((i=${#arr[@]}-1; i>0; i--)); do
local j=$((RANDOM % (i+1)))
local temp=${arr[i]}
arr[i]=${arr[j]}
arr[j]=$temp
done
echo "${arr[@]}"
}
EXTRA_VAR=("$@")
echo "Bootstrap server: $BOOTSTRAP_SERVER"
echo "Broker list: $BROKER_LIST"
echo "Blacklist: $BLACKLIST_PREFIXES"
echo "Min ISR: $MIN_ISR"
echo "EXTRA_VAR:" "${EXTRA_VAR[@]}"
# Get list of topics
topics=$(kafka-topics --bootstrap-server "$BOOTSTRAP_SERVER" --list "${EXTRA_VAR[@]}")
# Iterate over topics
for topic in $topics; do
# Split string into array
IFS=',' read -ra prefixes <<< "$BLACKLIST_PREFIXES"
# Loop over array
for prefix in "${prefixes[@]}"
do
# Skip blacklisted topics
if [[ $topic =~ ^"$prefix".* ]]; then
echo "Skipping blacklisted topic: $topic"
continue 2
fi
done
# Reassign partitions for topic
echo "Reassign topic: $topic..."
sed -i'.bak' "s/\!TOPIC\!/$topic/" topics.template.json
mv topics.template.json topics.json ; mv topics.template.json.bak topics.template.json
# generate proposed assignement
kafka-reassign-partitions --bootstrap-server "$BOOTSTRAP_SERVER" --topics-to-move-json-file topics.json --broker-list "$BROKER_LIST" --generate "${EXTRA_VAR[@]}" | \
tee >(awk -F: '/Current partition replica assignment/ { getline; print $0 }' | \
jq > /tmp/topic.current.json) >(awk -F: '/Proposed partition reassignment configuration/ { getline; print $0 }' | jq > /tmp/topic.proposed.json)
jq 'del(.partitions[] | .log_dirs)' /tmp/topic.proposed.json > /tmp/topic.reassign.json
# initialize replicas array
jq '.partitions[].replicas |= []' /tmp/topic.reassign.json > /tmp/tmp.json ; mv /tmp/tmp.json /tmp/topic.reassign.json
# Split string into array
IFS=',' read -ra array <<< "$BROKER_LIST"
# Loop over array
shuffled_array=($(shuffle_array "${array[@]}"))
for element in "${shuffled_array[@]}"
do
jq --arg broker "$element" '.partitions[].replicas += [$broker|tonumber]' /tmp/topic.reassign.json > /tmp/tmp.json ; mv /tmp/tmp.json /tmp/topic.reassign.json
done
# Reassign partitions
kafka-reassign-partitions --bootstrap-server "$BOOTSTRAP_SERVER" --reassignment-json-file /tmp/topic.reassign.json --execute "${EXTRA_VAR[@]}"
kafka-reassign-partitions --bootstrap-server "$BOOTSTRAP_SERVER" --reassignment-json-file /tmp/topic.reassign.json --verify "${EXTRA_VAR[@]}"
# update min.insync.replicas
kafka-configs --bootstrap-server "$BOOTSTRAP_SERVER" --alter --entity-type topics --entity-name "$topic" --add-config min.insync.replicas="$MIN_ISR" "${EXTRA_VAR[@]}"
done
# SASL_SSL PLAIN client properties sample
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='client' password='client-secret';
sasl.mechanism=PLAIN
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment