Skip to content

Instantly share code, notes, and snippets.

@FloMko
Last active June 17, 2020 06:52
Show Gist options
  • Save FloMko/7adf2e00cd80fe7cc88bb587cde999ce to your computer and use it in GitHub Desktop.
Save FloMko/7adf2e00cd80fe7cc88bb587cde999ce to your computer and use it in GitHub Desktop.
import os
import subprocess
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
import re
import logging
import threading
import time
import signal
from apscheduler.schedulers.background import BackgroundScheduler
brokerApi = 'backend-kafka:9092'
log_file = 'kafka-backup.log'
consumer = KafkaConsumer(bootstrap_servers=[brokerApi], group_id='connect-backup-sink')
def get_topics():
# get the list of topics to backup
topics = consumer.topics()
# return topics, consumer
return topics
def save_topics(topics):
topics_str = ', '.join(topics)
with open("/kafka-backup/topics.txt", "w") as text_file:
text_file.write(topics_str)
return topics_str
def back_up():
global backup_completed
cmd = f"exec backup-standalone.sh --bootstrap-server {brokerApi} --target-dir /kafka-backup/ --topics '{topics_str}' --debug > {log_file}"
backup = subprocess.Popen(cmd, stdout=subprocess.PIPE,
shell=True, preexec_fn=os.setsid)
while not backup_completed:
logging.info(f"Back_up status {backup_completed}")
time.sleep(5)
if backup_completed:
os.killpg(os.getpgid(backup.pid), signal.SIGTERM)
logging.info("Back_up task completed")
# get currently available offsets for all partitions and topics
# {'example_topic1': {0: 0}, 'example_topic2': {0: 337}}
def get_offsets(topics):
partitions = {}
offsets = {}
# get list of all available partitions for topics
for topic in topics:
partitions[topic] = consumer.partitions_for_topic(topic)
for topic in partitions:
topic_offsets = {}
for partition in partitions[topic]:
topicType = TopicPartition(topic=topic, partition=partition)
# get exact offset number unstead of TopicPartition dict type
topic_offsets[partition] = consumer.end_offsets([topicType])[topicType]
offsets[topic] = topic_offsets
return offsets
def get_current_offsets(topics):
partitions = {}
current_offsets = {}
# get list of all available partitions for topics
for topic in topics:
partitions[topic] = consumer.partitions_for_topic(topic)
for topic in partitions:
topic_offsets = {}
for partition in partitions[topic]:
topicPart = TopicPartition(topic=topic, partition=partition)
consumer.assign([topicPart])
# get exact offset number unstead of TopicPartition dict type
topic_offsets[partition] = consumer.position(topicPart)
current_offsets[topic] = topic_offsets
return current_offsets
# create dict of backed up offsets-partitions
# ex: {'example_topic1': {0: False}, 'example_topic2': {0: False}}
def create_backed_offsets(offsets):
backed_offsets = {}
for topic in offsets:
backed_offsets[topic] = {}
for partition in offsets[topic]:
backed_offsets[topic][partition] = False
return backed_offsets
# commit zero offsets for all topics
def commit_zero_offset(offsets):
# consumer = KafkaConsumer(bootstrap_servers=[brokerApi], group_id='connect-backup-sink')
for topic in offsets:
for partition in offsets[topic]:
TopicPart = TopicPartition(topic=topic, partition=partition)
consumer.assign([TopicPart])
consumer.commit({
TopicPart: OffsetAndMetadata(0, None)
})
logging.info(f"commit zero offset for {topic} wit partition {partition}")
def parse_log_file(log_file):
with open(log_file, 'r') as logfile:
text = logfile.readlines()
return text
def watch_function():
global backup_completed
global backup_offsets
logging.info("backup check started")
logging.info("check for comleted backup")
text = parse_log_file(log_file)
for line in text:
if "Committing offsets asynchronously using sequence number" in line:
print('find')
for topic in offsets:
for partition in offsets[topic]:
offset = offsets[topic][partition]
if int(parse_line_offset(topic, partition, line)) >= offset:
backup_offsets[topic][partition] = True
if is_backuped(backup_offsets):
logging.info("backup check completed")
backup_completed = True
return backup_completed
# Find a offset from line
def parse_line_offset(topic, partition, line=''):
pattern = f".+:.*{topic}-{partition}=OffsetAndMetadata{{offset=(\d*)"
m = re.search(pattern, line)
if m:
return m.group(1)
# check is all partitions backed up
def is_backuped(backed_offsets):
for topic in backed_offsets:
for partition in backed_offsets[topic]:
if not backed_offsets[topic][partition]:
return False
return True
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
backup_completed = False
topics = get_topics()
offsets = get_offsets(topics)
logging.info(get_current_offsets(offsets))
commit_zero_offset(offsets)
backup_offsets = create_backed_offsets(offsets)
topics_str = save_topics(topics)
backup = threading.Thread(target=back_up)
scheduler = BackgroundScheduler()
scheduler.add_job(watch_function, 'interval', seconds=3)
scheduler.start()
logging.info("Main : before running thread")
backup.daemon = True
backup.start()
logging.info("wait for the backup to finish")
backup.join()
logging.info("Main : all done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment