Skip to content

Instantly share code, notes, and snippets.

@tuxfight3r
Last active Apr 27, 2022
Embed
What would you like to do?
KafkaCat configuration for AWS MSK

KafkaCat Configuration for AWS MSK

Set the below environment variable with the following values

NOTE: Kafkacat is renamed to kcat recently and the config variable should be KCAT_CONFIG for version 1.7 onwards.

# you can export the variable or present the config with -F parameter for kafkacat
export KAFKACAT_CONFIG=/home/tools/persistent/kcat/kafkacat_config

Contents of kafkacat configuration

# cat kafkacat_config

security.protocol=ssl
ssl.key.password=SecretPassw0rd123
ssl.certificate.location=/home/tools/persistent/kcat/ssl/cert1.pem
ssl.key.location=/home/tools/persistent/kcat/ssl/cert1.key.pem
ssl.ca.location=/home/tools/persistent/kcat/ssl/cacerts.pem

Commands for creating PEM files from JKS Keystore

keytool -importkeystore -srckeystore keystore.jks -srcstoretype JKS -deststoretype PKCS12 -destkeystore keystore.p12
openssl pkcs12 -in keystore.p12 -out cert.pem
openssl pkcs12 -in keystore.p12 -nodes -nocerts -out cert.key.pem

Testing kafkacat

# List metadata for topic
./kafkacat -b $KAFKA_BROKER -L -t test_topic
./kafkacat -b $KAFKA_BROKER -F $KAFKACAT_CONFIG -L -t test_topic

# Consume 2 messages from the beginning
./kafkacat -b $KAFKA_BROKER -C -c2 -t test_topic -f 'Key: %k\nValue: %s\n'

# Consume 2 messages from the end.
./kafkacat -b $KAFKA_BROKER -C -c2 -o-2 -t test_topic -f 'Key: %k\nValue: %s\n'

# Consume 2 avro messages from the beginning
./kafkacat -b $KAFKA_BROKER -s avro -r $KAFKA_SCHEMA_REGISTRY_URL -C -c2 -o-2 -t test_topic -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n'

# Consume 2 avro messages from the end.
./kafkacat -b $KAFKA_BROKER -s avro -r $KAFKA_SCHEMA_REGISTRY_URL -C -c2 -o-2 -t test_topic -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n'

Kafkacat basic functions

export KCAT_CONFIG=/usr/local/kafkacat/kcat_config.properties
function kcat() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} "$@" ;}
function kcat_query_topic() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -L -t $1 ;}
function kcat_read_between_timestamps() {
    # expects $2 = s@timestamp and $3 e@timestamp
    /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -o $2 -o $3 ;}
function kcat_read_between_offsets() {
    # expects $2 = offset and $3 = count of messages
    /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -o $2 -c $3 ;}
function kcat_query_offset_for_timestamp() {
    # expects $1 in topic:partition:timestamp
    /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -Q -t $1 ;}

Read avro messages value

# Read all avro value messages from beginning
function kcat_read_avro_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -e ;}
# Read n avro value messages from beginning
function kcat_read_avro_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e ;}
# Read n avro value messages from end
function kcat_read_avro_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e ;}
# Read n avro value messages from beginning on specific partition
function kcat_read_avro_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e -p ${3} ;}
# Read n avro value messages from end on specific partition
function kcat_read_avro_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e -p ${3} ;}

Read text messages value

# Read all text messages from beginning
function kcat_read_txt_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -e ;}
# Read n text value messages from beginning
function kcat_read_txt_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e ;}
# Read n text value messages from end
function kcat_read_txt_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e ;}
# Read n text value messages from beginning on specific partition
function kcat_read_txt_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -p ${3} ;}
# Read n text value messages from end on specific partition
function kcat_read_txt_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -p ${3} ;}

Read avro messages with key=avro and value=avro

# Read all messages from beginning with key and value
function kcat_read_avro_from_begin_with_keys_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro  -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -e -f 'KEY: %k, VALUE: %s, PART: %p\n';}
# Read n messages from beginning with key and value
function kcat_read_avro_from_begin_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end with key and value
function kcat_read_avro_from_end_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_avro_from_begin_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end on specific partition with key and value
function kcat_read_avro_from_end_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t $1 -c $2 -o-${2} -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}

Read text messages with key=text and value=text

# Read all messages from beginning with key and value
function kcat_read_txt_from_begin_with_keys_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning with key and value
function kcat_read_txt_from_begin_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end with key and value
function kcat_read_txt_from_end_with_keys() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_txt_from_begin_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end on specific partition with key and value
function kcat_read_txt_from_end_with_keys_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -p ${3} -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}

Read messages with key=avro and value=text

# Read all messages from beginning with key and value
function kcat_read_keyavro_valtxt_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" ;}
# Read n messages from beginning with key and value
function kcat_read_keyavro_valtxt_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" ;}
# Read n messages from end with key and value
function kcat_read_keyavro_valtxt_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_keyavro_valtxt_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" -p ${3} ;}
# Read n messages from end on specific partition with key and value
function kcat_read_keyavro_valtxt_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -t $1 -c $2 -o-${2} -e -s key=avro -s value=s -r ${KAFKA_SCHEMA_REGISTRY_URL}  -f "KEY: %k, VALUE: %s, PART: %p\n" -p ${3} ;}

Read messages with key=txt and value=avro

# Read all messages from beginning with key and value
function kcat_read_keytxt_valavro_from_begin_all() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning with key and value
function kcat_read_keytxt_valavro_from_begin() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from end with key and value
function kcat_read_keytxt_valavro_from_end() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' ;}
# Read n messages from beginning on specific partition with key and value
function kcat_read_keytxt_valavro_from_begin_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' -p ${3} ;}
# Read n messages from end on specific partition with key and value
function kcat_read_keytxt_valavro_from_end_on_part() { /usr/local/kafkacat/kcat -F ${KCAT_CONFIG} -b ${KAFKA_BROKER} -C -s key=s -s value=avro -r ${KAFKA_SCHEMA_REGISTRY_URL} -t ${1} -c ${2} -o-${2} -e -f 'KEY: %k, VALUE: %s, PART: %p\n' -p ${3} ;}
@nelisSpotOn
Copy link

nelisSpotOn commented Apr 27, 2022

First, thank you for sharing this. I have been trying to get Kcat working from a Fargate instance that I have set up as a bastion to consume messages from MSK to ensure that an MSK Connect job is actually working. I would like to set up Kcat in a secure way instead of just allowing unauthenticated clients. I did have the following questions though.

Questions about this example:

  • What type of Client Authentication was set up on the MSK side? Sasl/IAM?
  • Where do the certificates come from?
    • Did you download the CA cert (ssl.ca.location) from AWS Certificate Manager?
    • The certificates you have listed in your kcat config seem to have different names than the ones you are generating.
  • What is the ssl.key.password password for? Is this a shared secret with the MSK side?

@tuxfight3r
Copy link
Author

tuxfight3r commented Apr 27, 2022

@nelisSpotOn hope the below answers helps.

  1. MSK is setup with TLS using a AWS private CA (https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html)
  2. you need to create CSR and get it signed by the AWS Private CA so it issues a certificate (follow the above document )
  3. Certnames are just for example, you need the certificate in pem format from step 2 and cacerts.pem can be downloaded here https://curl.se/docs/caextract.html
  4. ssl.key.password is the password for your certificate key which gets generated on step2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment