Skip to content

Instantly share code, notes, and snippets.

@ableasdale
Created May 9, 2024 20:44
Show Gist options
  • Save ableasdale/13db7bc68fde836638bb1143503b0413 to your computer and use it in GitHub Desktop.
Save ableasdale/13db7bc68fde836638bb1143503b0413 to your computer and use it in GitHub Desktop.
Creating a number of topics on a given Kafka Cluster using a ThreadPoolExecutor
import subprocess
from threading import current_thread
from threading import get_ident
from threading import get_native_id
from concurrent.futures import ThreadPoolExecutor
def process_topic(filepath):
thread = current_thread()
#print(f'Worker thread: name={thread.name}, ident={get_ident()}, id={get_native_id()}')
result = subprocess.run(["docker", "exec", "-t", "broker1", "/bin/bash", "-c", f'kafka-topics --bootstrap-server broker1:9092 --topic topic-{filepath} --replication-factor 3 --partitions 3 --create --config min.insync.replicas=2'], text=True)
#print("Have {} bytes in stdout: {}".format(len(result.stdout), result.stdout.strip(' \t\n\r')))
# initialise a Thread Pool (16 worker threads) for concurrent operations
with ThreadPoolExecutor(16) as executor:
# submit some tasks
_ = executor.map(process_topic, range(32))
@ableasdale
Copy link
Author

To test, use this cluster: https://github.com/ableasdale/confluent-dockerfiles/tree/main/simple-3-brokers-single-zk

To run: python3 create-topics.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment