Skip to content

Instantly share code, notes, and snippets.

@mgedmin
Created September 6, 2019 06:05
Show Gist options
  • Save mgedmin/522305fb3a716654c1a2d641dc452825 to your computer and use it in GitHub Desktop.
Save mgedmin/522305fb3a716654c1a2d641dc452825 to your computer and use it in GitHub Desktop.
A script we use to compare a bunch of PDFs pairwise
#!/usr/bin/env python
"""
Compare PDF reports pixel-wise.
When reportgen functional tests fail, you may want to see visually what
exactly changed. To do that:
(1) set your changes aside, e.g.
git stash
(2) run bin/compare-reportgen-output to generate pristine PDFs in the
reportgen-output directory
(3) move your changes back
git stash pop
(4) run bin/compare-reportgen-output to compare pristine PDFs with new ones
If you make further changes in your sandbox, to re-run the comparison do
(5) bin/compare-reportgen-output -r
To compare the current output with some older version (if you've got the PDFs
for that older version pregenerated, run
bin/compare-reportgen-output oldver
To compare the outputs of two versions (if you've got PDFs for them), run
bin/compare-reportgen-output ver1 ver2
Use Git revspecs to specify versions.
To compare the outputs of two ReportLab versions (if you've got PDFs for them),
run
bin/compare-reportgen-output reportlab-ver1 reportlab-ver2
Before you can do that you'll have to generate the PDFs for a particular
ReportLab version, which is a bit complicated:
.tox/py27-latest/bin/pip install reportlab==rlver
.tox/py27-latest/bin/python scripts/compare_reportgen_output.py
You'll need imagemagick and xdg-utils installed.
"""
# TODO:
# - figure out how to check out a fresh copy of the sw to get pristine PDFs
# of a non-current version (virtualenv or something)
from __future__ import print_function
import os
import sys
import glob
import hashlib
import tempfile
import shutil
import subprocess
import filecmp
import optparse
from six.moves import cStringIO as StringIO
try:
# Python 3
from html import escape
from urllib.request import pathname2url
except ImportError:
# Python 2
from cgi import escape
from urllib import pathname2url
# Note: *no* ReportLab/PIL/lxml imports here since we fiddle with sys.path in
# main()
class EmptyReportMaker(object):
def start_pdf(self, one_pdf_file, another_pdf_file):
pass
def start_page(self, one_png_file, another_png_file):
pass
def end_page(self):
pass
def different_page_counts(self, one_count, another_count):
pass
def end_pdf(self):
pass
def done(self, total_pdfs, differing_pdfs):
pass
class ReportMaker(EmptyReportMaker):
stylesheet = "summary > h2, summary > h3 { display: inline; cursor: pointer; }"
def __init__(self, title, output_dir=None, launch_browser=False):
self.title = title
self.output_dir = output_dir
self.output_file = None
self.out = None
self.cur_pdf = None
self.page_differences = 0
self.total_page_differences = 0
self.pending_pdf_end = False
self.launch_browser = launch_browser
def start_writing(self):
if self.output_dir is None:
self.output_dir = tempfile.mkdtemp(prefix='cmp-reportgen-output-')
try:
os.makedirs(self.output_dir)
except OSError:
pass
self.output_file = os.path.join(self.output_dir, 'index.html')
self.out = open(self.output_file, 'w')
print("creating %s" % self.output_file)
def write_header(self):
print("<html><head><title>%s</title><style>%s</style></head>" %
(escape(self.title), self.stylesheet), file=self.out)
print("<body><h1>%s</h1>" %
escape(self.title), file=self.out)
def write_stats(self, total_pdfs, differences):
print("<h2>Stats</h2>", file=self.out)
print("<p>Total PDFs compared: %d</p>" % total_pdfs, file=self.out)
print("<p>PDFs with differences: %d</p>" % differences, file=self.out)
print("<p>Total pages with differences: %d</p>"
% self.total_page_differences, file=self.out)
def write_footer(self):
print("</body></html>", file=self.out)
def urlize(self, filename):
# XXX: there's a Linux vs Windows difference wrt the necessity to add
# // after file:, but I don't care about Windows.
return 'file://' + pathname2url(os.path.abspath(filename))
def write_pdf_title(self, one_pdf_file, another_pdf_file):
if not self.out:
self.start_writing()
self.write_header()
elif self.pending_pdf_end:
self.write_pdf_end()
one_title = os.path.basename(one_pdf_file)
another_title = os.path.basename(another_pdf_file)
one_url = self.urlize(one_pdf_file)
another_url = self.urlize(another_pdf_file)
print("<details open>", file=self.out)
print("<summary>", file=self.out)
print("<h2>%s</h2>" % escape(one_title), file=self.out)
print("</summary>", file=self.out)
print('<p>Before: <a href="%s">%s</a></p>' % (
escape(one_url, True), escape(one_title)), file=self.out)
print('<p>After: <a href="%s">%s</a></p>' % (
escape(another_url, True), escape(another_title)), file=self.out)
self.pending_pdf_end = True
def write_pdf_end(self):
if self.page_differences:
self.write_page_differences()
print("</details>", file=self.out)
self.pending_pdf_end = False
def write_page_thumbnail(self, one_png_file, another_png_file, diff_file):
one_url = self.urlize(one_png_file)
another_url = self.urlize(another_png_file)
diff_url = self.urlize(diff_file)
page_number = one_png_file[:-len('.png')].rsplit('-', 1)[-1]
page_number = int(page_number) + 1
w, h = 595, 842 # A4 PDF rendered to png at imagemagick's standard dpi
scale_factor = 3
size = ' width="%d" height="%d"' % (w // scale_factor, h // scale_factor)
print("<details open>", file=self.out)
print("<summary>", file=self.out)
print("<h3>Page %d</h3>" % page_number, file=self.out)
print("</summary>", file=self.out)
print('<p><a href="%s"><img src="%s" %s alt="before" /></a>' % (
escape(one_url, True), escape(one_url, True), size), file=self.out)
print(' <a href="%s"><img src="%s" %s alt="after" /></a>' % (
escape(another_url, True), escape(another_url, True), size), file=self.out)
print(' <a href="%s"><img src="%s" %s alt="diff" /></a>' % (
escape(diff_url, True), escape(diff_url, True), size), file=self.out)
print('<br />Page %d: before, after, difference.</p>' % page_number, file=self.out)
print("</details>", file=self.out)
self.out.flush()
self.page_differences += 1
self.total_page_differences += 1
def write_page_counts(self, one_count, another_count):
print('<p>Before: %d pages.</p>' % one_count, file=self.out)
print('<p>After: %d pages.</p>' % another_count, file=self.out)
self.out.flush()
def write_page_differences(self):
print('<p>Differeces in %d page%s.</p>' % (
self.page_differences, 's' if self.page_differences != 1 else ''), file=self.out)
self.out.flush()
self.page_differences = 0
def start_pdf(self, one_pdf_file, another_pdf_file):
self.cur_pdf = one_pdf_file, another_pdf_file
def start_page(self, one_png_file, another_png_file):
if self.cur_pdf:
self.write_pdf_title(*self.cur_pdf)
self.cur_pdf = None
diff_file = os.path.join(self.output_dir,
os.path.basename(one_png_file))
self.write_page_thumbnail(one_png_file, another_png_file, diff_file)
return diff_file
def different_page_counts(self, one_count, another_count):
if self.cur_pdf:
self.write_pdf_title(*self.cur_pdf)
self.cur_pdf = None
self.write_page_counts(one_count, another_count)
def done(self, total_pdfs, differing_pdfs):
if self.out:
if self.pending_pdf_end:
self.write_pdf_end()
self.write_stats(total_pdfs, differing_pdfs)
self.write_footer()
self.out.close()
self.out = None
if self.launch_browser:
print("Launching %s in a browser..." % self.output_file)
subprocess.call(["xdg-open", self.output_file])
print("Please 'rm -r %s' when done" % self.output_dir)
else:
print("Output in %s" % self.output_dir)
class Job(object):
def __init__(self, name=None):
self.name = name
def __call__(self):
# possibly returns JobError()
pass
class JobError(object):
def __init__(self, message):
self.message = message
class GeneratePdfJob(Job):
def __init__(self, input_file, output_file):
Job.__init__(self, 'building %s' % output_file)
self.input_file = input_file
self.output_file = output_file
def __call__(self):
from ivija.reportgen import engine, zLOG
input_file = self.input_file
output_file = self.output_file
cwd = os.getcwd()
temp_dir = tempfile.mkdtemp(prefix='cmp-reportgen-output-')
errs = StringIO()
success = False
try:
input_file = os.path.abspath(input_file)
output_file = os.path.abspath(output_file)
os.chdir(os.path.dirname(input_file))
engine.run_as_command = True # hack :(
zLOG._set_log_dest(errs)
rg = engine.ReportGenerator(temp_dir, '.', input_file,
date='2007-06-22',
output_callback=lambda filename:
shutil.move(filename, output_file))
rg.generate()
success = os.path.exists(output_file)
finally:
os.chdir(cwd)
shutil.rmtree(temp_dir)
if not success:
return JobError("could not build %s\n\n%s" % (output_file, errs.getvalue()))
class ConvertPdfToPngJob(Job):
def __init__(self, pdf_file):
Job.__init__(self, "converting %s to PNG files" % pdf_file)
self.pdf_file = pdf_file
def __call__(self):
png_file = os.path.splitext(self.pdf_file)[0] + '.png'
subprocess.call(["convert", self.pdf_file, "-alpha", "remove", png_file])
class CompareImagesJob(Job):
def __init__(self, one_file, another_file, diff_file):
Job.__init__(self)
self.one_file = one_file
self.another_file = another_file
self.diff_file = diff_file
def __call__(self):
one_file = self.one_file
another_file = self.another_file
diff_file = self.diff_file
subprocess.call(["compare", one_file, another_file, diff_file])
class TheGreatPdfComparator(object):
output_cache = 'reportgen-output'
verbose = False
quiet = False
report = EmptyReportMaker()
def chatter(self, msg):
if self.verbose:
print(msg)
def info(self, msg):
if not self.quiet:
print(msg)
def warn(self, msg):
print(msg, file=sys.stderr)
def error(self, msg):
print(msg, file=sys.stderr)
def find_input_files(self):
return sorted(glob.glob('src/ivija/reportgen/ftests/*.xml'))
def determine_reportlab_version(self):
import reportlab
return reportlab.Version
def perform_jobs(self, jobs):
if not jobs:
return
from multiprocessing import Pool
pool = Pool()
outstanding = []
for job in jobs:
result = pool.apply_async(job)
outstanding.append((job, result))
pool.close()
for job, result in outstanding:
self._perform_job(job, result)
pool.join()
def _perform_job(self, job, result=None):
if job.name:
self.info(job.name)
if result is not None:
retval = result.wait()
else:
retval = job()
if isinstance(retval, JobError):
self.error(retval.message)
def pipe(self, cmd):
with subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout as p:
return p.read().strip().decode('UTF-8')
def resolve_version(self, revspec):
return self.pipe(['git', 'describe', '--match', '^$', '--always', revspec])
def resolve_branch(self, revspec):
branch = self.pipe(['git', 'name-rev', '--name-only', '--refs=refs/heads/*', revspec])
return branch.partition('~')[0]
def determine_version(self):
return self.pipe(['git', 'describe', '--match', '^$', '--always', '--dirty'])
def determine_branch(self):
return self.pipe(['git', 'symbolic-ref', '--short', 'HEAD'])
def produce_a_pdf(self, input_file, output_file):
self._perform_job(GeneratePdfJob(input_file, output_file))
def list_known_reportlab_versions(self):
return sorted(n[len('reportlab-'):]
for n in os.listdir(self.output_cache)
if n.startswith('reportlab-'))
def make_output_dir_name(self, ivija_branch=None, reportlab_version=None, ivija_version=None):
if reportlab_version is None:
reportlab_version = self.determine_reportlab_version()
if ivija_version is None:
ivija_version = self.determine_version()
ivija_branch = self.determine_branch()
if ivija_branch is None:
ivija_branch = self.resolve_branch(ivija_version) or self.determine_branch()
return os.path.join(self.output_cache,
'reportlab-%s' % reportlab_version,
'ivija-%s' % ivija_version if ivija_branch is None
else 'ivija-%s-%s' % (ivija_branch, ivija_version))
def make_output_file_name(self, input_file):
basename = os.path.basename(input_file)
name = os.path.splitext(basename)[0]
output_dir = self.make_output_dir_name()
return os.path.join(output_dir, name + '.pdf')
def wipe_current_pdf_cache(self):
output_dir = self.make_output_dir_name()
try:
shutil.rmtree(output_dir)
except OSError:
pass
def populate_pdf_cache(self):
self.info("building PDFs for rev %s with ReportLab %s"
% (self.determine_version(),
self.determine_reportlab_version()))
pdf_jobs = []
png_jobs = []
for input_file in self.find_input_files():
output_file = self.make_output_file_name(input_file)
if not os.path.exists(output_file):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
pdf_jobs.append(GeneratePdfJob(input_file, output_file))
png_jobs.append(ConvertPdfToPngJob(output_file))
self.perform_jobs(pdf_jobs)
self.perform_jobs(png_jobs)
def generate_pngs(self, pdf_file):
self._perform_job(ConvertPdfToPngJob(pdf_file))
def glob_pages(self, pdf_file):
dirname = os.path.dirname(pdf_file)
prefix = os.path.splitext(os.path.basename(pdf_file))[0] + '-'
suffix = '.png'
# I apologize for writing this
return [
os.path.join(dirname, fn)
for fn in sorted((
fn for fn in os.listdir(dirname)
if fn.startswith(prefix) and fn.endswith(suffix)
and fn[len(prefix):-len(suffix)].isdigit()
), key=lambda fn: int(fn[len(prefix):-len(suffix)]))
]
def get_pngs(self, pdf_file):
all_files = self.glob_pages(pdf_file)
if not all_files:
self.generate_pngs(pdf_file)
all_files = self.glob_pages(pdf_file)
return all_files
def compare_two_pdfs(self, one_file, another_file):
self.chatter("comparing %s and %s" % (one_file, another_file))
self.report.start_pdf(one_file, another_file)
pages_of_one_file = self.get_pngs(one_file)
pages_of_another_file = self.get_pngs(another_file)
diffs = False
jobs = []
for (one, another) in zip(pages_of_one_file, pages_of_another_file):
diffs = self.compare_two_pngs(one, another, jobs) or diffs
self.perform_jobs(jobs)
if len(pages_of_one_file) != len(pages_of_another_file):
diffs = True
self.report.different_page_counts(len(pages_of_one_file), len(pages_of_another_file))
self.warn('%s has %d pages, but %s has %d pages' % (one_file, len(pages_of_one_file),
another_file, len(pages_of_another_file)))
self.report.end_pdf()
return diffs
def pixels_of(self, filename):
from PIL import Image
return Image.open(filename).tobytes()
def pixel_hash_of(self, filename):
hashfile = filename + ".pixelhash"
if os.path.exists(hashfile):
with open(hashfile) as fp:
return fp.read().strip()
pixeldata = self.pixels_of(filename)
hashvalue = hashlib.sha256(pixeldata).hexdigest()
with open(hashfile, 'w') as fp:
print(hashvalue, file=fp)
return hashvalue
def compare_two_pngs(self, one_file, another_file, jobs=None):
self.chatter("comparing %s and %s" % (one_file, another_file))
if filecmp.cmp(one_file, another_file):
# files are bitwise identical
return False
# make sure it's not some irrelevant metadata
if self.pixel_hash_of(one_file) == self.pixel_hash_of(another_file):
self.chatter("%s and %s differ in metadata only" % (one_file, another_file))
return False
self.warn("%s and %s differ" % (one_file, another_file))
diff_file = self.report.start_page(one_file, another_file)
if not os.path.exists(diff_file):
# NB: this helps only if the user passes -o to reuse the output
# directory
job = CompareImagesJob(one_file, another_file, diff_file)
if jobs is not None:
jobs.append(job)
else:
self._perform_job(job)
self.report.end_page()
return True
def find_all_pdfs(self, dir):
return sorted(fn for fn in os.listdir(dir) if fn.endswith('.pdf'))
def compare_all_pdfs(self, one_dir, another_dir):
files_in_one_dir = self.find_all_pdfs(one_dir)
files_in_another_dir = self.find_all_pdfs(another_dir)
if files_in_one_dir != files_in_another_dir:
self.warn('%s and %s have different PDF files'
% (one_dir, another_dir))
only_in_one_dir = sorted(set(files_in_one_dir) - set(files_in_another_dir))
if only_in_one_dir:
self.warn('only in %s: %s' % (one_dir, ' '.join(only_in_one_dir)))
only_in_another_dir = sorted(set(files_in_another_dir) - set(files_in_one_dir))
if only_in_another_dir:
self.warn('only in %s: %s' % (another_dir, ' '.join(only_in_another_dir)))
diffs = 0
for filename in files_in_one_dir:
one_pdf = os.path.join(one_dir, filename)
another_pdf = os.path.join(another_dir, filename)
diffs += self.compare_two_pdfs(one_pdf, another_pdf)
self.info("%d PDFs compared, %d PDFs differ" % (len(files_in_one_dir), diffs))
self.report.done(len(files_in_one_dir), diffs)
def populate_pdf_cache_for_other_ver(self, ver):
if ver == self.determine_version():
return self.populate_pdf_cache()
cache_dir = self.make_output_dir_name(ivija_version=ver)
# XXX: hard to do, not implemented yet
self.error("no cache of version %s (%s)" % (ver, cache_dir))
self.info("please check out a fresh copy of changeset %s and run this script to produce pristine PDF files to compare against" % ver)
self.info("e.g. git stash; git checkout %s; bin/compare-reportgen-output -r; git checkout master; git stash pop" % ver)
self.info("then try again")
self.info("or specify some other version to compare against (--list shows available)")
sys.exit(1)
def main():
parser = optparse.OptionParser(
"usage: %prog [options] [revno|rl-ver|dir] [revno|rl-ver|dir]")
parser.add_option('-v', '--verbose', action='store_true', dest='verbose',
help='print names of every pair of files when comparing',
default=False)
parser.add_option('-q', '--quiet', action='store_true', dest='quiet',
help='suppress informative messages',
default=False)
parser.add_option('--list', action='store_true',
help='list cached versions')
parser.add_option('-o', '--outdir',
help='output directory for the report')
parser.add_option('-l', '--launch-browser', action='store_true',
help='launch browser if there are differences (default)',
dest='launch_browser', default=True)
parser.add_option('--no-launch-browser', action='store_false',
dest='launch_browser')
parser.add_option('-r', '--regenerate', action='store_true',
dest='regenerate', default=False,
help='discard cached files, generate new ones')
parser.add_option('--path', action='append', default=[],
help="extra Python path to insert in front")
parser.add_option('--ivija-version', dest='ivija_version_override',
help='override ivija version to be used by default')
parser.add_option('--ivija-branch', dest='ivija_branch_override',
help='override ivija branch to be used by default')
opts, args = parser.parse_args()
sys.path[0:0] = opts.path
tcmp = TheGreatPdfComparator()
if opts.verbose:
tcmp.verbose = True
tcmp.quiet = False
elif opts.quiet:
tcmp.verbose = False
tcmp.quiet = True
if opts.ivija_version_override:
cur_ver = opts.ivija_version_override
else:
cur_ver = tcmp.determine_version()
if opts.list:
my_rl_version = tcmp.determine_reportlab_version()
had_alt_versions = False
for rl_version in tcmp.list_known_reportlab_versions():
directory = os.path.dirname(tcmp.make_output_dir_name(reportlab_version=rl_version))
pending = "cached PDFs in %s" % directory
if rl_version == my_rl_version:
pending += " (current reportlab version)"
else:
had_alt_versions = True
for fn in sorted(os.listdir(directory),
key=lambda fn: os.stat(os.path.join(directory, fn)).st_mtime):
if fn.startswith('ivija-'):
branch_and_ver = fn[len('ivija-'):]
if (rl_version == my_rl_version
and branch_and_ver.endswith('-' + cur_ver)):
cur = "* "
else:
cur = " "
if pending:
print(pending)
pending = None
print(cur + branch_and_ver)
if pending:
print("no " + pending)
if had_alt_versions:
print("To compare across reportlab versions, pass a full directory name like this:")
print(" reportlab-output/reportlab-<rlver>/ivija-<ver>")
return
elif len(args) == 0:
if not cur_ver.endswith('-dirty'):
if opts.regenerate:
tcmp.wipe_current_pdf_cache()
tcmp.populate_pdf_cache()
tcmp.info("pristine checkout: nothing to compare")
return
old_ver = cur_ver[:-len('-dirty')]
tcmp.chatter("trying to compare %s to %s" % (cur_ver, old_ver))
one_dir = tcmp.make_output_dir_name(ivija_version=old_ver)
if not os.path.exists(one_dir):
tcmp.populate_pdf_cache_for_other_ver(old_ver)
return
elif len(args) == 1:
old_ver = args[0]
elif len(args) == 2:
old_ver, cur_ver = args
else:
tcmp.info("too many arguments")
sys.exit(1)
try:
subprocess.call("convert -version > /dev/null", shell=True)
except OSError:
tcmp.info("please install imagemagick")
sys.exit(1)
if opts.regenerate:
tcmp.wipe_current_pdf_cache()
if os.path.exists(old_ver):
one_dir = old_ver
old_title = one_dir
elif old_ver.startswith('reportlab-'):
rl_ver = old_ver[len('reportlab-'):]
old_title = old_ver
one_dir = tcmp.make_output_dir_name(ivija_branch=opts.ivija_branch_override,
ivija_version=opts.ivija_version_override,
reportlab_version=rl_ver)
if not os.path.exists(one_dir):
tcmp.error("no cache of %s (%s)" % (old_ver, one_dir))
sys.exit(1)
else:
old_title = old_ver
old_branch = tcmp.resolve_branch(old_ver)
old_ver = tcmp.resolve_version(old_ver)
one_dir = tcmp.make_output_dir_name(ivija_branch=old_branch,
ivija_version=old_ver)
if not os.path.exists(one_dir):
tcmp.populate_pdf_cache_for_other_ver(old_ver)
if os.path.exists(cur_ver):
another_dir = cur_ver
new_title = another_dir
elif cur_ver.startswith('reportlab-'):
# XXX: this is very asymmetric compared to
# old_ver.startswith('reportlab-')! rethink this.
rl_ver = cur_ver[len('reportlab-'):]
new_title = cur_ver
another_dir = tcmp.make_output_dir_name(ivija_branch=opts.ivija_branch_override,
ivija_version=opts.ivija_version_override,
reportlab_version=rl_ver)
if not os.path.exists(another_dir):
tcmp.error("no cache of %s (%s)" % (cur_ver, another_dir))
sys.exit(1)
else:
new_title = cur_ver
if cur_ver != tcmp.determine_version():
cur_branch = tcmp.resolve_branch(cur_ver)
cur_ver = tcmp.resolve_version(cur_ver)
else:
cur_branch = None
another_dir = tcmp.make_output_dir_name(ivija_branch=cur_branch,
ivija_version=cur_ver)
if not os.path.exists(another_dir):
tcmp.populate_pdf_cache_for_other_ver(cur_ver)
title = "Comparing reportgen %s and %s" % (old_title, new_title)
tcmp.report = ReportMaker(title, launch_browser=opts.launch_browser,
output_dir=opts.outdir)
tcmp.info("comparing %s and %s" % (one_dir, another_dir))
if os.path.isfile(one_dir) and os.path.isfile(another_dir):
tcmp.report.done(1, tcmp.compare_two_pdfs(one_dir, another_dir))
else:
tcmp.compare_all_pdfs(one_dir, another_dir)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment