Skip to content

Instantly share code, notes, and snippets.

@also
Created October 3, 2014 20:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save also/9f823d9eb9dc0a410796 to your computer and use it in GitHub Desktop.
Save also/9f823d9eb9dc0a410796 to your computer and use it in GitHub Desktop.
split kafka log files into chunks that kafka can actually read
# written in anger at 4AM. it's a bad idea to use this.
import struct
import sys
import glob
import os
source = sys.argv[1]
dest = sys.argv[2]
topic = sys.argv[3]
p_dirs = glob.glob('%s/%s-*' % (source, topic))
for p_dir in p_dirs:
files = glob.glob('%s/*.log' % p_dir)
p_dir_basename = os.path.basename(p_dir)
print p_dir_basename
dest_dir = '%s/%s' % (dest, p_dir_basename)
if not os.path.isdir(dest_dir):
os.mkdir(dest_dir)
for filename in files:
out = None
with open(filename) as f:
while True:
offset_s = f.read(8)
size_s = f.read(4)
if len(offset_s) == 0:
print 'done'
break # YAY
if len(offset_s) != 8:
raise Exception('bad offset size %d' % len(size_s))
if len(size_s) != 4:
raise Exception('bad size size %d' % len(size_s))
size = struct.unpack(">i", size_s)[0]
offset = struct.unpack('>q', offset_s)[0]
#print 'size %d,\toffset %d' % (size, offset)
if out is None or out.tell() > 1024*1024*1024:
if out is not None:
out.close()
out = open('%s/%020d.log' % (dest_dir, offset), 'w')
out.write(offset_s)
out.write(size_s)
out.write(f.read(size))
#print f.tell()
out.close()
@also
Copy link
Author

also commented May 20, 2021

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment