Skip to content

Instantly share code, notes, and snippets.

@jayv
Last active October 6, 2017 01:12
Show Gist options
  • Save jayv/a598fffdd24365d2c2524e6f18ee64f4 to your computer and use it in GitHub Desktop.
Save jayv/a598fffdd24365d2c2524e6f18ee64f4 to your computer and use it in GitHub Desktop.
import re
import sys
import subprocess
from collections import defaultdict
def move_tail(topic, groups, path="/opt/kafka/bin/", env="staging"):
command = r"%skafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-seed-1.%s.livefyre.com:9092 --topic %s --time -1 "
cli = command % (path, env, topic)
proc = subprocess.Popen(cli, stdout=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
p = re.compile(r"%s:(\d+):(\d+)" % topic)
offsets = []
for line in out.split("\n"):
match = p.match(line)
if not match: continue
(part, off) = match.groups()
for group in groups:
offsets.append("/consumers/%s/offsets/%s/%s:%s" % (group, topic, part, off))
offsetlines = "\n".join(offsets)
print "Setting offsets for %s \n%s" % (topic, offsetlines)
command = r"%skafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect zk-seed-1.%s.livefyre.com --input-file /dev/stdin"
cli = command % (path, env)
proc = subprocess.Popen(cli, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
(out, err) = proc.communicate(input=offsetlines)
print "Done\n%s\n%s" % (out, err)
def get_all_group_offsets(path="/opt/kafka/bin/", env="staging"):
command = r"%skafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect zk-seed-1.%s.livefyre.com --output-file /dev/stdout"
cli = command % (path, env)
proc = subprocess.Popen(cli, stdout=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
topics = defaultdict(set)
topic_re = re.compile(r"/consumers/([a-zA-Z0-9\-._]+)/offsets/([a-zA-Z0-9\-._]+)/\d+:\d+")
for line in out.split("\n"):
match = topic_re.match(line)
if not match: continue
(group, topic) = match.groups()
topics[topic].add(group)
return topics
if __name__ == '__main__':
for topic, groups in get_all_group_offsets(env="").iteritems():
move_tail(topic, groups, env="")
@jayv
Copy link
Author

jayv commented Apr 7, 2016

Specify env in __main__

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment