Created
February 14, 2019 14:55
-
-
Save apergos/49c2812292c0ec26ccedc8bc9d5d69e5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
''' | |
recombine the multistream index file into an output | |
file path of your choice, for a given wiki and date | |
''' | |
import os | |
import re | |
import bz2 | |
import getopt | |
import sys | |
BZIP2MARKER = b'\x42\x5a\x68\x39\x31\x41\x59\x26\x53\x59' | |
def get_header_offset(filename, marker): | |
''' | |
get the offset of where the mediawiki and siteinfo xml header | |
ends, and return it; because these are multistream files, | |
this will end on a byte-aligned boundary, yuppee! | |
''' | |
with open(filename, "rb") as infile: | |
# skip the first byte | |
try: | |
infile.seek(1, os.SEEK_SET) | |
max_offset = 1000000 | |
buffer = infile.read(max_offset) | |
except IOError: | |
return None | |
buffer_offset = buffer.find(marker) | |
if buffer_offset >= 0: | |
# because we skipped the first byte, add that here | |
return buffer_offset + 1 | |
return None | |
def get_footer_offset(filename, marker): | |
''' | |
get the offset of where the mediawiki close tag starts, | |
and return it; because these are multistream files, | |
this will end on a byte-aligned boundary, yuppee! | |
''' | |
with open(filename, "rb") as infile: | |
# empty files or files with only a footer will return None | |
# here (too short) and that's ok, we might as well fail out on them | |
# by now they should have already been moved out of the way | |
# by the previous job but, just in case... | |
max_offset = 100 | |
try: | |
filesize = infile.seek(0, os.SEEK_END) | |
infile.seek(filesize - max_offset, os.SEEK_SET) | |
buffer = infile.read() | |
except IOError: | |
return None | |
buffer_offset = buffer.find(marker) | |
if buffer_offset >= 0: | |
return filesize - (len(buffer) - buffer_offset) | |
return None | |
def get_new_offset(content_file, offset, marker): | |
''' | |
this will be the cumulative offset into the combined index | |
file so far, given the old offset and the name of the content file | |
just processed | |
''' | |
footer_marker = get_footer_offset(content_file, marker) | |
header_marker = get_header_offset(content_file, marker) | |
body_size = footer_marker - header_marker | |
# offset in index file is relative to the specific file and includes its header | |
# we are modifying it. we must add the relative amount from the previous files | |
# (first header, all bodies) | |
# plus the current offset - the current file's header | |
# the filter command just adds something to the offset listed in that index file | |
return offset + body_size | |
def get_content_file_from_index(index_filename): | |
''' | |
given the full path to an index file, transform it into the | |
corresponding page content file | |
''' | |
# ..../elwikt-20190119-pages-articles-multistream-index1.txt.bz2 becomes | |
# ..../elwikt-20190119-pages-articles-multistream1.xml.bz2 | |
# ..../nlwiki-20190201-pages-articles-multistream-index2.txt-p123352p450973.bz2 becomes | |
# ..../nlwiki-20190201-pages-articles-multistream2.xml-p123352p450973.bz2 | |
return index_filename.replace('-index', '').replace('.txt.bz2', '.xml.bz2').replace( | |
'.txt-p', '.xml-p') | |
def get_output_path(output_dir, wikiname, rundate): | |
''' | |
generate and return full path to cmbined index file which will be written | |
''' | |
path = os.path.join(output_dir, '{w}-{d}-pages-articles-multistream-index.txt.bz2') | |
return path.format(w=wikiname, d=rundate) | |
# taken from a comment by user "Toothy" on Ned Batchelder's blog (no longer on the net) | |
def sort_dumpfilenames(mylist): | |
""" | |
Sort the given list in the way that humans expect. | |
args: | |
list of filenames | |
""" | |
convert = lambda text: int(text) if text.isdigit() else text | |
alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)] | |
mylist = sorted(mylist, key=alphanum_key) | |
return mylist | |
def get_index_filenames(wikiname, rundate, dumpsdir): | |
''' | |
get existing numbered multistream index files for specified wiki and date, | |
return sorted list | |
''' | |
basedir = os.path.join(dumpsdir, wikiname, rundate) | |
files = os.listdir(basedir) | |
index_filenames = [os.path.join(basedir, fname) for fname in files | |
if '-index' in fname and 'index.txt' not in fname and | |
fname.startswith(wikiname)] | |
return sort_dumpfilenames(index_filenames) | |
def index_files_recombine(index_files, output_file): | |
''' | |
recombine index files to produce a specified output file | |
with the combined index file having the correct offsets into the | |
combined content file that is produced separately | |
we do this without the usual progress callback that shows the file | |
size as it grows, because a) it's easier and b) as of this writing | |
it takes all of 2 minutes to write the combined index file so who cares. | |
''' | |
# initially the offset into the combined page content file is 0 plus whatever | |
# the first index file says, for any page; this will change as we move into the | |
# part of the combined page content file that has the contents of the second | |
# page content part and the corresponding index file that has offsets starting | |
# again from 0, etc. | |
offset = 0 | |
first_content_file = get_content_file_from_index(index_files[0]) | |
first_header_size = get_header_offset(first_content_file, BZIP2MARKER) | |
combined_indexfile = bz2.open(output_file, 'wt', encoding='utf-8') | |
for infile_counter, input_file in enumerate(index_files): | |
content_file = get_content_file_from_index(input_file) | |
if infile_counter == 1: | |
offset += first_header_size | |
if infile_counter: | |
header_size = get_header_offset(content_file, BZIP2MARKER) | |
else: | |
# include the header count from the first file, it gets written | |
header_size = 0 | |
with bz2.open(input_file, mode='rt', encoding='utf-8') as partial_indexfile: | |
added_offset = offset - header_size | |
for line in partial_indexfile: | |
if line: | |
partial_offset, title = line.split(':', 1) | |
partial_offset = str(int(partial_offset) + added_offset) | |
# title will still have the newline on the end of it | |
combined_indexfile.write(partial_offset + ":" + title) | |
offset = get_new_offset(content_file, offset, BZIP2MARKER) | |
combined_indexfile.close() | |
def usage(message=None): | |
''' | |
display a helpful usage message with | |
an optional introductory message first | |
''' | |
if message is not None: | |
sys.stderr.write(message) | |
sys.stderr.write("\n") | |
usage_message = """ | |
Usage: python3 recombine_index.py --dumpsdir path --wikilist path|--wikinames string | |
[--slowjobs num] [--jobs name,name...] [--rundate YYYYMMDD] | |
show-runtimes.py --help | |
This script shows the runtimes for each dump job for the most current run | |
of the specified wiki or list of wikis, by checking timestamps on filenames | |
associated with each dump step. | |
Note that if a job was interrupted during a run, and other jobs ran before | |
a retry picked up and completed the particular step, these times will not | |
be accurate. | |
Arguments: | |
--dumpsdir (-d): path to dumps directory tree | |
--outdir (-o): directory where output file will be written | |
default: cwd | |
--wikiname (-w): name of wiki database (e.g. enwiki) | |
--rundate (-r): date in YYYYMMDD format of dump run for which to do the recombine | |
--help (-h): display this help message | |
""" | |
sys.stderr.write(usage_message) | |
sys.exit(1) | |
def check_args(dumpsdir, wikiname, rundate, remainder): | |
''' | |
check that mandatory args are supplied and that there's | |
no extra cruft, or whine | |
''' | |
if dumpsdir is None: | |
usage("Mandatory argument 'dumpsdir' is missing") | |
if wikiname is None: | |
usage("Mandatory argument 'wikiname' is missing") | |
if rundate is None: | |
usage("Mandatory argument 'rundate' is missing") | |
if remainder: | |
usage("Unknown option(s) specified: <%s>" % remainder[0]) | |
def do_main(): | |
''' | |
entry point | |
''' | |
wikiname = None | |
rundate = None | |
dumpsdir = None | |
verbose = False | |
output_dir = "." | |
try: | |
(options, remainder) = getopt.gnu_getopt( | |
sys.argv[1:], "d:o:r:w:vh", [ | |
"dumpsdir=", "outdir=", "rundate=", "wikiname=", | |
"verbose", "help"]) | |
except getopt.GetoptError as err: | |
usage("Unknown option specified: " + str(err)) | |
for (opt, val) in options: | |
if opt in ["-d", "--dumpsdir"]: | |
dumpsdir = val | |
elif opt in ["-o", "--outdir"]: | |
output_dir = val | |
elif opt in ["-r", "--rundate"]: | |
rundate = val | |
elif opt in ["-v", "--verbose"]: | |
verbose = True | |
elif opt in ["-w", "--wikiname"]: | |
wikiname = val | |
elif opt in ["-h", "--help"]: | |
usage("Help for this script") | |
check_args(dumpsdir, wikiname, rundate, remainder) | |
index_files = get_index_filenames(wikiname, rundate, dumpsdir) | |
if verbose: | |
print("index files are", index_files) | |
output_file = get_output_path(output_dir, wikiname, rundate) | |
if verbose: | |
print("output file is", output_file) | |
index_files_recombine(index_files, output_file) | |
if __name__ == '__main__': | |
do_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment