Skip to content

Instantly share code, notes, and snippets.

@miguno
Last active July 6, 2023 19:53
Show Gist options
  • Star 30 You must be signed in to star a gist
  • Fork 20 You must be signed in to fork a gist
  • Save miguno/87d5b2411e3f93e80866 to your computer and use it in GitHub Desktop.
Save miguno/87d5b2411e3f93e80866 to your computer and use it in GitHub Desktop.
A simple Ops helper script for Apache Kafka to generate a partition reassignment JSON snippet for moving partition leadership away from a given Kafka broker. Use cases include 1) safely restarting a broker while minimizing risk of data loss, 2) replacing a broker, 3) preparing a broker for maintenance.
#!/usr/bin/env bash
#
# File: kafka-move-leadership.sh
#
# Description
# ===========
#
# Generates a Kafka partition reassignment JSON snippet to STDOUT to move the leadership
# of any replicas away from the provided "source" broker to different, randomly selected
# "target" brokers. Run this script with `-h` to show detailed usage instructions.
#
#
# Requirements
# ============
# - Kafka 0.8.1.1 and later.
#
#
# Usage
# =====
#
# To show usage instructions run this script with `-h` or `--help`.
#
#
# Full workflow
# =============
#
# High-level overview
# -------------------
#
# 1. Use this script to generate a partition reassignment JSON file.
# 2. Start the actual reassignment operation via Kafka's `kafka-reassign-partitions.sh` script and this JSON file.
# 3. Monitor the progress of the reassignment operation with Kafka's `kafka-reassign-partitions.sh` script.
#
# Example
# -------
#
# NOTE: If you have installed the Confluent package of Kafka, then the CLI tool
# `kafka-reassign-partitions.sh` is called `kafka-reassign-partitions`.
#
# Step 1 (generate reassignment JSON; this script):
#
# $ kafka-move-leadership.sh --broker-id 4 --first-broker-id 0 --last-broker-id 8 --zookeeper zookeeper1:2181 > partitions-to-move.json
#
# Step 2 (start reassignment process; Kafka built-in script):
#
# $ kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file partitions-to-move.json --execute
#
# Step 3 (monitor progress of reassignment process; Kafka built-in script):
#
# $ kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file partitions-to-move.json --verify
declare -r MYSELF=`basename $0`
print_usage() {
echo "$MYSELF - generates a Kafka partition reassignment JSON snippet to move partition leadership away from a broker (details below)"
echo
echo "Usage: $MYSELF [OPTION]..."
echo
echo " --broker-id Move leadership of all replicas, if any, from this broker"
echo " to different, randomly selected brokers. Example: 4"
echo " --first-broker-id First (= lowest) Kafka broker ID in the cluster. Used as"
echo " the start index for the range of broker IDs from which"
echo " replacement brokers will be randomly selected. Example: 0"
echo " --last-broker-id Last (= highest) Kafka broker ID in the cluster. Used as"
echo " the end index for the range of broker IDs from which"
echo " replacement brokers will be randomly selected. Example: 8"
echo " --zookeeper Comma-separated list of ZK servers with which the brokers"
echo " are registered. Example: zookeeper1:2181,zookeeper2:2181"
echo " -h, --help Print this help message and exit."
echo
echo "Example"
echo "-------"
echo
echo "The following example prints a partition reassignment JSON snippet to STDOUT that moves leadership"
echo "from the broker with ID 4 to brokers randomly selected from the ID range 0,1,2,3,4,5,6,7,8"
echo "(though 4 itself will be excluded from the range automatically):"
echo
echo " $ $MYSELF --broker-id 4 --first-broker-id 0 --last-broker-id 8 --zookeeper zookeeper1:2181"
echo
echo "Use cases include:"
echo "------------------"
echo " 1. Safely restarting a broker while minimizing risk of data loss."
echo " 2. Replacing a broker."
echo " 3. Preparing a broker for maintenance."
echo
echo "Detailed description"
echo "--------------------"
echo "Generates a Kafka partition reassignment JSON snippet to STDOUT"
echo "to move the leadership of any replicas from the provided broker ID to"
echo "different, randomly selected broker IDs."
echo
echo "This JSON snippet can be saved to a file and then be used as an argument for:"
echo
echo " $ kafka-reassign-partitions.sh --reassignment-json-file my.json"
echo
echo "Further information"
echo "-------------------"
echo "- http://kafka.apache.org/documentation.html#basic_ops_cluster_expansion"
echo "- https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool"
}
if [[ $# -eq 0 ]]; then
print_usage
exit 97
fi
while [[ $# -gt 0 ]]; do
case "$1" in
--broker-id)
shift
declare -r BROKER="$1"
shift
;;
--zookeeper)
shift
declare -r ZOOKEEPER_CONNECT="$1"
shift
;;
--first-broker-id)
shift
declare -r KAFKA_FIRST_BROKER_ID="$1"
shift
;;
--last-broker-id)
shift
declare -r KAFKA_LAST_BROKER_ID="$1"
shift
;;
-h|--help)
print_usage
exit 98
;;
*)
echo "ERROR: Unexpected option ${1}"
echo
print_usage
exit 99
;;
esac
done
# Input validation
if [ -z "$BROKER" ]; then
echo "ERROR: You must set the parameter --broker-id"
exit 80
fi
if [ -z "$ZOOKEEPER_CONNECT" ]; then
echo "ERROR: You must set the parameter --zookeeper"
exit 81
fi
if [ -z "$KAFKA_FIRST_BROKER_ID" ]; then
echo "ERROR: You must set the parameter --first-broker-id"
exit 82
fi
if [ -z "$KAFKA_LAST_BROKER_ID" ]; then
echo "ERROR: You must set the parameter --last-broker-id"
exit 83
fi
###############################################################################
### DEPENDENCIES
###############################################################################
declare -r KAFKA_TOPICS_SCRIPT_NAME_APACHE="kafka-topics.sh"
declare -r KAFKA_TOPICS_SCRIPT_NAME_CONFLUENT="kafka-topics"
declare -r FALLBACK_PATH="/opt/kafka/bin"
which "$KAFKA_TOPICS_SCRIPT_NAME_CONFLUENT" &>/dev/null
if [ $? -ne 0 ]; then
which "$KAFKA_TOPICS_SCRIPT_NAME_APACHE" &>/dev/null
if [ $? -ne 0 ]; then
declare -r FALLBACK_BIN="$FALLBACK_PATH/$KAFKA_TOPICS_SCRIPT_NAME_APACHE"
which "$FALLBACK_BIN" &>/dev/null
if [ $? -ne 0 ]; then
echo "ERROR: kafka-topics CLI tool (ships with Kafka) not found in PATH."
exit 70
else
declare -r KAFKA_TOPICS_BIN="$FALLBACK_BIN"
fi
else
declare -r KAFKA_TOPICS_BIN="$KAFKA_TOPICS_SCRIPT_NAME_APACHE"
fi
else
declare -r KAFKA_TOPICS_BIN="$KAFKA_TOPICS_SCRIPT_NAME_CONFLUENT"
fi
###############################################################################
### MISC CONFIGURATION - DO NOT TOUCH UNLESS YOU KNOW WHAT YOU ARE DOING
###############################################################################
declare -r OLD_IFS="$IFS"
###############################################################################
### UTILITY FUNCTIONS
###############################################################################
# Checks whether an array (first param) contains an element (second param).
# Returns 0 if the array contains the element, and 1 if it does not.
#
# Usage: array_contains myArray myElement
function array_contains {
local array="$1[@]"
local seeking=$2
local in=1
for element in "${!array}"; do
if [[ $element == $seeking ]]; then
in=0
break
fi
done
return $in
}
# Randomly selects a broker ID in the range specified by
# KAFKA_FIRST_BROKER_ID (including) and KAFKA_LAST_BROKER_ID (including).
#
# Usage: random_broker => may return e.g. "6"
function random_broker {
shuf -i ${KAFKA_FIRST_BROKER_ID}-${KAFKA_LAST_BROKER_ID} -n 1
}
# Randomly selects, from the list of available brokers (range specified by
# KAFKA_FIRST_BROKER_ID and KAFKA_LAST_BROKER_ID), a broker ID that is not
# already listed in the provided brokers (first param).
#
# Usage: other_broker "1,4,6" => may return e.g. "2"
#
# Note: Do NOT put spaces in the string. "1,2" is ok, "1, 2" is not.
function other_broker {
local brokers_string=$1
local all_brokers_string=`seq -s "," ${KAFKA_FIRST_BROKER_ID} ${KAFKA_LAST_BROKER_ID}`
if [ ${#brokers_string} -ge ${#all_brokers_string} ]; then
local no_other_broker_available=""
echo $no_other_broker_available
else
IFS=$',' read -a brokers <<< "$brokers_string"
local new_broker=`random_broker`
while array_contains brokers $new_broker; do
new_broker=`random_broker`
done
echo $new_broker
fi
}
# Returns a list of broker IDs by removing the provided broker ID (second param)
# from the provided list of original broker IDs (first param). If the original
# broker list does not contain the provided broker, the list is returned as is.
#
# The list of broker IDs must be a comma-separated list of numbers, e.g. "1,2".
#
# Usage: all_but_broker "1,2,3" "3" => returns "1,2"
#
# Note: Do NOT put spaces in the string. "1,2" is ok, "1, 2" is not.
function all_but_broker {
local brokers_string=$1
local broker=$2
IFS=$',' read -a brokers <<< "$brokers_string"
local new_brokers=""
for curr_broker in "${brokers[@]}"; do
if [ "$curr_broker" != "$broker" ]; then
new_brokers="$new_brokers,$curr_broker"
fi
done
# Remove leading comma, if any.
new_brokers=${new_brokers#","}
echo $new_brokers
}
# Returns a list of broker IDs based on a provided list of broker IDs (first
# param), where the provided broker ID (second param) is replaced by a
# randomly selected broker ID that is not already in the original list.
#
# Usage: replace_broker "1,2,3" "2" => may return e.g. "1,3,4"
#
# Note: Do NOT put spaces in the string. "1,2" is ok, "1, 2" is not.
function replace_broker {
local brokers_string=$1
local broker=$2
local remaining_brokers=`all_but_broker $brokers_string $broker`
local replacement_broker=`other_broker $brokers_string`
new_brokers="$remaining_brokers,$replacement_broker"
# Remove leading comma, if any.
new_brokers=${new_brokers#","}
# Remove trailing comma, if any.
new_brokers=${new_brokers%","}
echo $new_brokers
}
###############################################################################
### MAIN
###############################################################################
# "Header" of JSON file for Kafka partition reassignment
json="{\n"
json="$json \"partitions\": [\n"
# Actual partition reassignments
for topicPartitionReplicas in `$KAFKA_TOPICS_BIN --zookeeper $ZOOKEEPER_CONNECT --describe | grep "Leader: $BROKER" | awk '{ print $2"#"$4"#"$8 }'`; do
# Note: We use '#' as field separator in awk (see above) and here
# because it is not a valid character for a Kafka topic name.
IFS=$'#' read -a array <<< "$topicPartitionReplicas"
topic="${array[0]}" # e.g. "zerg.hydra"
partition="${array[1]}" # e.g. "4"
replicas="${array[2]}" # e.g. "0,8" (= comma-separated list of broker IDs)
new_replicas=`replace_broker $replicas $BROKER`
if [ -z "$new_replicas" ]; then
echo "ERROR: Cannot find any replacement broker. Maybe you have only a single broker in your cluster?"
exit 60
fi
json="$json {\"topic\": \"${topic}\", \"partition\": ${partition}, \"replicas\": [${new_replicas}] },\n"
done
# Remove tailing comma, if any.
json=${json%",\n"}
json="${json}\n"
# "Footer" of JSON file
json="$json ],\n"
json="$json \"version\": 1\n"
json="${json}}\n"
# Print JSON to STDOUT
echo -e $json
###############################################################################
### CLEANUP
###############################################################################
IFS="$OLD_IFS"
@yashk
Copy link

yashk commented Apr 9, 2016

@miguno say if we use this script for safe rolling restart of kafka cluster , how do i restore the leadership of broker after successful broker restart

some options i see are
via

kafka-preferred-replica-election.sh

or

kafka-reassign-partitions.sh

@miguno
Copy link
Author

miguno commented May 4, 2016

In general there's no need to restore leadership to the "original" one. The point is that brokers are interchangeable, and they may come and go at any time (either temporarily because of a failure, or permanently because new machines were added, or ...).

@eswara-reddy
Copy link

eswara-reddy commented Aug 10, 2016

there's no need to restore leadership

You mean leader.imbalance.check.interval.seconds takes care of this automatically and hence no need for manual restoration? Otherwise, I think these is a possibility of too few brokers becoming leaders of too many partitions.

@orshemesh
Copy link

Hi,
Did you have issues with your echo variable ($json) that isn't quoted?
echo variable unquoted
I got a problem that the replica section in the JSON file in not in array but one free number
i.e. {"topic": "first", "partition": 1, "replicas": 1 } instead of {"topic": "first", "partition": 1, "replicas": [1,2] }

Why not change the one row before the last from: echo -e $json to: echo -e "$json"?

@orshemesh
Copy link

Second question,
Why you not also move the replica partitions of the decommissioned node to the other brokers? Is there a reason to this?

@simonwahlgren
Copy link

@orshemesh I had the same question, I solved it by updating row 307 to this:

TOPIC_PARTITION_REPLICAS=$($KAFKA_TOPICS_BIN --zookeeper $ZOOKEEPER_CONNECT --describe | egrep "Leader: $BROKER|Replicas: .*$BROKER.*Isr" | awk '{ print $2"#"$4"#"$8 }')
for topicPartitionReplicas in $TOPIC_PARTITION_REPLICAS; do

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment