Skip to content

Instantly share code, notes, and snippets.

@ryanvgates
Created March 26, 2024 18:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryanvgates/d5988fe790581e2915d775f4b0963a23 to your computer and use it in GitHub Desktop.
Save ryanvgates/d5988fe790581e2915d775f4b0963a23 to your computer and use it in GitHub Desktop.
AWS MSK Create & List Topics
from confluent_kafka.admin import AdminClient, NewTopic
import os
a = AdminClient(
{
'bootstrap.servers': os.environ.get("AWS_KAFKA_BOOTSTRAP_SERVERS"),
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'AWS_MSK_IAM',
'sasl.aws.access.key.id': os.environ.get("AWS_ACCESS_KEY_ID"),
'sasl.aws.secret.access.key': os.environ.get("AWS_SECRET_ACCESS_KEY"),
'sasl.aws.security.token': os.environ.get("AWS_SESSION_TOKEN"),
'sasl.aws.region': 'us-east-1',
'ssl.ca.location': '/etc/ssl/certs/ca-certificates.crt',
}
)
new_topics = [NewTopic(topic, num_partitions=3) for topic in ["topic1", "topic2"]]
# Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.
# Call create_topics to asynchronously create topics. A dict
# of <topic,future> is returned.
fs = a.create_topics(new_topics)
# Wait for each operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
topics = a.list_topics().topics
print("There are currently {} topics. They are {}".format(len(topics), topics))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment