Skip to content

Instantly share code, notes, and snippets.

@apergos
Created February 14, 2019 14:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apergos/49c2812292c0ec26ccedc8bc9d5d69e5 to your computer and use it in GitHub Desktop.
Save apergos/49c2812292c0ec26ccedc8bc9d5d69e5 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
'''
recombine the multistream index file into an output
file path of your choice, for a given wiki and date
'''
import os
import re
import bz2
import getopt
import sys
BZIP2MARKER = b'\x42\x5a\x68\x39\x31\x41\x59\x26\x53\x59'
def get_header_offset(filename, marker):
'''
get the offset of where the mediawiki and siteinfo xml header
ends, and return it; because these are multistream files,
this will end on a byte-aligned boundary, yuppee!
'''
with open(filename, "rb") as infile:
# skip the first byte
try:
infile.seek(1, os.SEEK_SET)
max_offset = 1000000
buffer = infile.read(max_offset)
except IOError:
return None
buffer_offset = buffer.find(marker)
if buffer_offset >= 0:
# because we skipped the first byte, add that here
return buffer_offset + 1
return None
def get_footer_offset(filename, marker):
'''
get the offset of where the mediawiki close tag starts,
and return it; because these are multistream files,
this will end on a byte-aligned boundary, yuppee!
'''
with open(filename, "rb") as infile:
# empty files or files with only a footer will return None
# here (too short) and that's ok, we might as well fail out on them
# by now they should have already been moved out of the way
# by the previous job but, just in case...
max_offset = 100
try:
filesize = infile.seek(0, os.SEEK_END)
infile.seek(filesize - max_offset, os.SEEK_SET)
buffer = infile.read()
except IOError:
return None
buffer_offset = buffer.find(marker)
if buffer_offset >= 0:
return filesize - (len(buffer) - buffer_offset)
return None
def get_new_offset(content_file, offset, marker):
'''
this will be the cumulative offset into the combined index
file so far, given the old offset and the name of the content file
just processed
'''
footer_marker = get_footer_offset(content_file, marker)
header_marker = get_header_offset(content_file, marker)
body_size = footer_marker - header_marker
# offset in index file is relative to the specific file and includes its header
# we are modifying it. we must add the relative amount from the previous files
# (first header, all bodies)
# plus the current offset - the current file's header
# the filter command just adds something to the offset listed in that index file
return offset + body_size
def get_content_file_from_index(index_filename):
'''
given the full path to an index file, transform it into the
corresponding page content file
'''
# ..../elwikt-20190119-pages-articles-multistream-index1.txt.bz2 becomes
# ..../elwikt-20190119-pages-articles-multistream1.xml.bz2
# ..../nlwiki-20190201-pages-articles-multistream-index2.txt-p123352p450973.bz2 becomes
# ..../nlwiki-20190201-pages-articles-multistream2.xml-p123352p450973.bz2
return index_filename.replace('-index', '').replace('.txt.bz2', '.xml.bz2').replace(
'.txt-p', '.xml-p')
def get_output_path(output_dir, wikiname, rundate):
'''
generate and return full path to cmbined index file which will be written
'''
path = os.path.join(output_dir, '{w}-{d}-pages-articles-multistream-index.txt.bz2')
return path.format(w=wikiname, d=rundate)
# taken from a comment by user "Toothy" on Ned Batchelder's blog (no longer on the net)
def sort_dumpfilenames(mylist):
"""
Sort the given list in the way that humans expect.
args:
list of filenames
"""
convert = lambda text: int(text) if text.isdigit() else text
alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
mylist = sorted(mylist, key=alphanum_key)
return mylist
def get_index_filenames(wikiname, rundate, dumpsdir):
'''
get existing numbered multistream index files for specified wiki and date,
return sorted list
'''
basedir = os.path.join(dumpsdir, wikiname, rundate)
files = os.listdir(basedir)
index_filenames = [os.path.join(basedir, fname) for fname in files
if '-index' in fname and 'index.txt' not in fname and
fname.startswith(wikiname)]
return sort_dumpfilenames(index_filenames)
def index_files_recombine(index_files, output_file):
'''
recombine index files to produce a specified output file
with the combined index file having the correct offsets into the
combined content file that is produced separately
we do this without the usual progress callback that shows the file
size as it grows, because a) it's easier and b) as of this writing
it takes all of 2 minutes to write the combined index file so who cares.
'''
# initially the offset into the combined page content file is 0 plus whatever
# the first index file says, for any page; this will change as we move into the
# part of the combined page content file that has the contents of the second
# page content part and the corresponding index file that has offsets starting
# again from 0, etc.
offset = 0
first_content_file = get_content_file_from_index(index_files[0])
first_header_size = get_header_offset(first_content_file, BZIP2MARKER)
combined_indexfile = bz2.open(output_file, 'wt', encoding='utf-8')
for infile_counter, input_file in enumerate(index_files):
content_file = get_content_file_from_index(input_file)
if infile_counter == 1:
offset += first_header_size
if infile_counter:
header_size = get_header_offset(content_file, BZIP2MARKER)
else:
# include the header count from the first file, it gets written
header_size = 0
with bz2.open(input_file, mode='rt', encoding='utf-8') as partial_indexfile:
added_offset = offset - header_size
for line in partial_indexfile:
if line:
partial_offset, title = line.split(':', 1)
partial_offset = str(int(partial_offset) + added_offset)
# title will still have the newline on the end of it
combined_indexfile.write(partial_offset + ":" + title)
offset = get_new_offset(content_file, offset, BZIP2MARKER)
combined_indexfile.close()
def usage(message=None):
'''
display a helpful usage message with
an optional introductory message first
'''
if message is not None:
sys.stderr.write(message)
sys.stderr.write("\n")
usage_message = """
Usage: python3 recombine_index.py --dumpsdir path --wikilist path|--wikinames string
[--slowjobs num] [--jobs name,name...] [--rundate YYYYMMDD]
show-runtimes.py --help
This script shows the runtimes for each dump job for the most current run
of the specified wiki or list of wikis, by checking timestamps on filenames
associated with each dump step.
Note that if a job was interrupted during a run, and other jobs ran before
a retry picked up and completed the particular step, these times will not
be accurate.
Arguments:
--dumpsdir (-d): path to dumps directory tree
--outdir (-o): directory where output file will be written
default: cwd
--wikiname (-w): name of wiki database (e.g. enwiki)
--rundate (-r): date in YYYYMMDD format of dump run for which to do the recombine
--help (-h): display this help message
"""
sys.stderr.write(usage_message)
sys.exit(1)
def check_args(dumpsdir, wikiname, rundate, remainder):
'''
check that mandatory args are supplied and that there's
no extra cruft, or whine
'''
if dumpsdir is None:
usage("Mandatory argument 'dumpsdir' is missing")
if wikiname is None:
usage("Mandatory argument 'wikiname' is missing")
if rundate is None:
usage("Mandatory argument 'rundate' is missing")
if remainder:
usage("Unknown option(s) specified: <%s>" % remainder[0])
def do_main():
'''
entry point
'''
wikiname = None
rundate = None
dumpsdir = None
verbose = False
output_dir = "."
try:
(options, remainder) = getopt.gnu_getopt(
sys.argv[1:], "d:o:r:w:vh", [
"dumpsdir=", "outdir=", "rundate=", "wikiname=",
"verbose", "help"])
except getopt.GetoptError as err:
usage("Unknown option specified: " + str(err))
for (opt, val) in options:
if opt in ["-d", "--dumpsdir"]:
dumpsdir = val
elif opt in ["-o", "--outdir"]:
output_dir = val
elif opt in ["-r", "--rundate"]:
rundate = val
elif opt in ["-v", "--verbose"]:
verbose = True
elif opt in ["-w", "--wikiname"]:
wikiname = val
elif opt in ["-h", "--help"]:
usage("Help for this script")
check_args(dumpsdir, wikiname, rundate, remainder)
index_files = get_index_filenames(wikiname, rundate, dumpsdir)
if verbose:
print("index files are", index_files)
output_file = get_output_path(output_dir, wikiname, rundate)
if verbose:
print("output file is", output_file)
index_files_recombine(index_files, output_file)
if __name__ == '__main__':
do_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment