Skip to content

Instantly share code, notes, and snippets.

@mleuthold
Created February 15, 2020 22:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mleuthold/c565db834bed4f927947740364c16122 to your computer and use it in GitHub Desktop.
Save mleuthold/c565db834bed4f927947740364c16122 to your computer and use it in GitHub Desktop.
How to copy Kafka messages from one to another Kafka cluster
#!/usr/bin/env python3
import os
from kafka import KafkaConsumer
from kafka import KafkaProducer
# Access all environment variables
print('*----------------------------------*')
print(os.environ)
print('*----------------------------------*')
INPUT_TOPIC = 'collector-good-prod' # os.environ['INPUT_TOPIC']
OUTPUT_TOPIC = 'collector-good-dev' # os.environ['OUTPUT_TOPIC']
GROUP_ID = 'jira-123-migrate-kafka-messages' # os.environ['GROUP_ID']
SEEK_TIMESTAMP = 'yes' # {yes/no} in case of script abortions we dont't want to consume from begin_timestamp, instead rely on consumer-group commits. Set to any value <> 'no'
INPUT_BOOTSTRAP_SERVERS = 'broker-01.dev.io:9091' # os.environ['INPUT_BOOTSTRAP_SERVERS']
OUTPUT_BOOTSTRAP_SERVERS = 'broker-01.prod.io:9091' # os.environ['OUTPUT_BOOTSTRAP_SERVERS']
begin_timestamp = 1548327000000 # Your time zone: Donnerstag, 24. Januar 2019 11:50:00 GMT+01:00
end_timestamp = 1549465000000 # Your time zone: Mittwoch, 6. Februar 2019 15:56:40 GMT+01:00
###############################################
# Prepare Kafka
###############################################
offset_reset_strategy = 'latest'
consumer = KafkaConsumer(INPUT_TOPIC,
group_id=GROUP_ID,
bootstrap_servers=INPUT_BOOTSTRAP_SERVERS,
auto_offset_reset=offset_reset_strategy,
enable_auto_commit=True)
producer = KafkaProducer(bootstrap_servers=OUTPUT_BOOTSTRAP_SERVERS)
consumer.poll() # dummy poll to get partition assignements
assigned_partitions = consumer.assignment()
# seek to offset by time
if SEEK_TIMESTAMP == 'yes':
offsets = consumer.offsets_for_times(dict.fromkeys(assigned_partitions, begin_timestamp))
print(f"Start consuming all messages between begin_timestamp {begin_timestamp} and end-timestamp {end_timestamp} using this start-offset-list {offsets}")
for k,v in offsets.items():
consumer.seek(k, v.offset)
number_consumed_messages = 0
consumed_partitions = []
for message in consumer:
if len(consumed_partitions) != len(assigned_partitions):
try:
message_value = message.value
message_key = message.key
message_time = message.timestamp
message_partition = message.partition
if message_time <= end_timestamp and message_partition not in consumed_partitions:
try:
# print (f"key is {message_key}, timestamp is {message_time} and value is {message_value} ")
number_consumed_messages = number_consumed_messages + 1
producer.send(topic=OUTPUT_TOPIC, value=message_value, key=message_key, timestamp_ms=message_time)
if number_consumed_messages % 100000 == 0:
print (f"Consumed {number_consumed_messages} messages so far")
except Exception as e:
print (f"This message was not produced to kafka: Offset: {message.offset}, timestamp: {message_time} Key: {message_key}, value: {message_value}")
print(str(e))
else:
if message_partition not in consumed_partitions:
print (f"Stop producing records from partition {message_partition} because timestamp {end_timestamp} has been reached ({number_consumed_messages} messages consumed so far).")
consumed_partitions.append(message_partition)
print (f"ignored partitions: {consumed_partitions}")
except Exception as e:
print(e)
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
else:
print (f"Stop consuming after {number_consumed_messages} messages because end-timestamp has been reached for all {len(assigned_partitions)} partitions")
consumer.commit()
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment