Last active
June 17, 2020 06:52
-
-
Save FloMko/7adf2e00cd80fe7cc88bb587cde999ce to your computer and use it in GitHub Desktop.
some workaround over https://github.com/itadventurer/kafka-backup/issues/52
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
from kafka import KafkaConsumer | |
from kafka import TopicPartition | |
from kafka.structs import OffsetAndMetadata | |
import re | |
import logging | |
import threading | |
import time | |
import signal | |
from apscheduler.schedulers.background import BackgroundScheduler | |
brokerApi = 'backend-kafka:9092' | |
log_file = 'kafka-backup.log' | |
consumer = KafkaConsumer(bootstrap_servers=[brokerApi], group_id='connect-backup-sink') | |
def get_topics(): | |
# get the list of topics to backup | |
topics = consumer.topics() | |
# return topics, consumer | |
return topics | |
def save_topics(topics): | |
topics_str = ', '.join(topics) | |
with open("/kafka-backup/topics.txt", "w") as text_file: | |
text_file.write(topics_str) | |
return topics_str | |
def back_up(): | |
global backup_completed | |
cmd = f"exec backup-standalone.sh --bootstrap-server {brokerApi} --target-dir /kafka-backup/ --topics '{topics_str}' --debug > {log_file}" | |
backup = subprocess.Popen(cmd, stdout=subprocess.PIPE, | |
shell=True, preexec_fn=os.setsid) | |
while not backup_completed: | |
logging.info(f"Back_up status {backup_completed}") | |
time.sleep(5) | |
if backup_completed: | |
os.killpg(os.getpgid(backup.pid), signal.SIGTERM) | |
logging.info("Back_up task completed") | |
# get currently available offsets for all partitions and topics | |
# {'example_topic1': {0: 0}, 'example_topic2': {0: 337}} | |
def get_offsets(topics): | |
partitions = {} | |
offsets = {} | |
# get list of all available partitions for topics | |
for topic in topics: | |
partitions[topic] = consumer.partitions_for_topic(topic) | |
for topic in partitions: | |
topic_offsets = {} | |
for partition in partitions[topic]: | |
topicType = TopicPartition(topic=topic, partition=partition) | |
# get exact offset number unstead of TopicPartition dict type | |
topic_offsets[partition] = consumer.end_offsets([topicType])[topicType] | |
offsets[topic] = topic_offsets | |
return offsets | |
def get_current_offsets(topics): | |
partitions = {} | |
current_offsets = {} | |
# get list of all available partitions for topics | |
for topic in topics: | |
partitions[topic] = consumer.partitions_for_topic(topic) | |
for topic in partitions: | |
topic_offsets = {} | |
for partition in partitions[topic]: | |
topicPart = TopicPartition(topic=topic, partition=partition) | |
consumer.assign([topicPart]) | |
# get exact offset number unstead of TopicPartition dict type | |
topic_offsets[partition] = consumer.position(topicPart) | |
current_offsets[topic] = topic_offsets | |
return current_offsets | |
# create dict of backed up offsets-partitions | |
# ex: {'example_topic1': {0: False}, 'example_topic2': {0: False}} | |
def create_backed_offsets(offsets): | |
backed_offsets = {} | |
for topic in offsets: | |
backed_offsets[topic] = {} | |
for partition in offsets[topic]: | |
backed_offsets[topic][partition] = False | |
return backed_offsets | |
# commit zero offsets for all topics | |
def commit_zero_offset(offsets): | |
# consumer = KafkaConsumer(bootstrap_servers=[brokerApi], group_id='connect-backup-sink') | |
for topic in offsets: | |
for partition in offsets[topic]: | |
TopicPart = TopicPartition(topic=topic, partition=partition) | |
consumer.assign([TopicPart]) | |
consumer.commit({ | |
TopicPart: OffsetAndMetadata(0, None) | |
}) | |
logging.info(f"commit zero offset for {topic} wit partition {partition}") | |
def parse_log_file(log_file): | |
with open(log_file, 'r') as logfile: | |
text = logfile.readlines() | |
return text | |
def watch_function(): | |
global backup_completed | |
global backup_offsets | |
logging.info("backup check started") | |
logging.info("check for comleted backup") | |
text = parse_log_file(log_file) | |
for line in text: | |
if "Committing offsets asynchronously using sequence number" in line: | |
print('find') | |
for topic in offsets: | |
for partition in offsets[topic]: | |
offset = offsets[topic][partition] | |
if int(parse_line_offset(topic, partition, line)) >= offset: | |
backup_offsets[topic][partition] = True | |
if is_backuped(backup_offsets): | |
logging.info("backup check completed") | |
backup_completed = True | |
return backup_completed | |
# Find a offset from line | |
def parse_line_offset(topic, partition, line=''): | |
pattern = f".+:.*{topic}-{partition}=OffsetAndMetadata{{offset=(\d*)" | |
m = re.search(pattern, line) | |
if m: | |
return m.group(1) | |
# check is all partitions backed up | |
def is_backuped(backed_offsets): | |
for topic in backed_offsets: | |
for partition in backed_offsets[topic]: | |
if not backed_offsets[topic][partition]: | |
return False | |
return True | |
if __name__ == "__main__": | |
format = "%(asctime)s: %(message)s" | |
logging.basicConfig(format=format, level=logging.INFO, | |
datefmt="%H:%M:%S") | |
backup_completed = False | |
topics = get_topics() | |
offsets = get_offsets(topics) | |
logging.info(get_current_offsets(offsets)) | |
commit_zero_offset(offsets) | |
backup_offsets = create_backed_offsets(offsets) | |
topics_str = save_topics(topics) | |
backup = threading.Thread(target=back_up) | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(watch_function, 'interval', seconds=3) | |
scheduler.start() | |
logging.info("Main : before running thread") | |
backup.daemon = True | |
backup.start() | |
logging.info("wait for the backup to finish") | |
backup.join() | |
logging.info("Main : all done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment