Skip to content

Instantly share code, notes, and snippets.

@tueda
Last active August 11, 2016 17:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tueda/8146d9a44b5b1ec18fee to your computer and use it in GitHub Desktop.
Save tueda/8146d9a44b5b1ec18fee to your computer and use it in GitHub Desktop.
A program to compare numerical values in files. #bin #python
#!/bin/sh
""":" .
exec python "$0" "$@"
"""
import argparse
import math
import os
import sys
__doc__ = """Compare numerical values in files."""
if sys.version_info[:2] > (3, 2):
exec("""def raise_from(value, from_value):
raise value from from_value
""")
elif sys.version_info[:2] == (3, 2):
exec("""def raise_from(value, from_value):
if from_value is None:
raise value
raise value from from_value
""")
else:
def raise_from(value, from_value):
"""Wrapper of `raise ... from ...`."""
raise value
class NumberReader(object):
"""Reader for numbers."""
def __init__(self, filename, comments='#', numfunc=float, mode='abserr'):
"""Construct a reader object."""
self._file = None
self._filename = None
self._lineno = None
self._comments = comments
self._numfunc = numfunc
self.open(filename)
def __enter__(self):
"""Enter the runtime context."""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Exit the runtime context."""
self.close()
def open(self, filename):
"""Open a file."""
self.close()
self._file = open(filename, 'r')
self._filename = filename
self._lineno = 0
def close(self):
"""Close the file."""
if self._file:
self._file.close()
self._file = None
self._filename = None
self._lineno = None
def readline(self):
"""Read numbers in the next line."""
if not self._file:
raise IOError('file not opened')
while True:
line = self._file.readline()
if not line:
# The end of the stream.
return None
self._lineno += 1
line = line.strip()
if not line:
# An empty line.
return []
if self._comments and self._comments.find(line[0]) >= 0:
# A comment line.
continue
return [self._numfunc(n) for n in line.split()]
@property
def filename(self):
"""Return the filename."""
return self._filename
@property
def lineno(self):
"""Return the current line number."""
return self._lineno
def abs_diff(first_value, *other_values):
"""Compute the absolute difference.
Compute the absolute difference of the given values. For three or more
values, the first value is used as the reference value and return the
maximum absolute difference of each value and the first one.
"""
if len(other_values) == 0:
if len(first_value) == 0:
raise ValueError('abs_diff() arg is an empty sequence')
other_values = first_value[1:]
first_value = first_value[0]
if len(other_values) == 0:
return 0
d = 0
x0 = first_value
for x in other_values:
d = max(d, abs(x - x0))
return d
def rel_diff(first_value, *other_values):
"""Compute the relative difference.
Compute the relative difference of the given values. For three or more
values, the first value is used as the reference value and return the
maximum relative difference of each value and the first one.
"""
if len(other_values) == 0:
if len(first_value) == 0:
raise ValueError('rel_diff() arg is an empty sequence')
other_values = first_value[1:]
first_value = first_value[0]
if len(other_values) == 0:
return 0
d = 0
x0 = first_value
for x in other_values:
if x0 == 0 and x == 0:
continue
if x0 == 0:
return float('inf')
d = max(d, abs((x - x0) / x0))
return d
def compare_files(fnames, comments='#', output=None, dps=None, mode='absdiff'):
"""Compare files.
Parameters
----------
fnames : sequence of str
The (one or more) input file names.
comments : str, optional
The characters used for the start of a comment; default: '#'.
output : file or None, optional
The output stream; default: `sys.stdout`.
dps : int or None, optional
The number of decimal places used for the comparisons. Requires
`mpmath` packge; default: None.
mode : str, optional
The mode of printing compared results. Must be one of 'absdiff',
'reldiff', 'maxval' and 'minval'; default: 'absdiff'.
"""
if len(fnames) < 1:
raise ValueError('need one or more files to be compared')
if not comments:
raise ValueError('invalid comments characters')
if dps is not None:
if dps <= 0:
raise ValueError('invalid dps')
try:
import mpmath
except ImportError:
raise ValueError('dps={0} requires mpmath package'.format(dps))
if mode not in ('absdiff', 'reldiff', 'maxval', 'minval'):
raise ValueError('unknown mode')
if output is None:
output = sys.stdout
def my_isfinite(x):
return not math_isinf(x) and not math_isnan(x)
if dps is None:
math_isinf = math.isinf
math_isnan = math.isnan
numfunc = float # str to float
strfunc = str # float to str
isfinite = math.isfinite if hasattr(math, 'isfinite') else my_isfinite
else:
math_isinf = mpmath.isinf
math_isnan = mpmath.isnan
mpmath.mp.dps = dps
numfunc = mpmath.mpf
strfunc = str
isfinite = my_isfinite
comment_char = comments[0]
inf = numfunc('inf')
nelems = 0 # the total number of values in the files
ncols = 0 # the number of columns in the files
max_val = None
min_val = None
max_abs_diff = None
max_rel_diff = None
only_finite = None
nfiles = len(fnames)
files = []
try:
for f in fnames:
files.append(NumberReader(f, comments, numfunc))
if output:
# Print the header.
prog = os.path.basename(sys.argv[0])
items = []
items.append('--{0}'.format(mode))
if dps is not None:
items.append('--dps={0}'.format(dps))
items.extend(fnames)
output.write('{0} {1} {2}\n'.format(
comment_char, prog, ' '.join(items)))
while True:
# Read a line from each file.
lines = [None] * nfiles
for i in range(nfiles):
try:
lines[i] = files[i].readline()
except ValueError as ex:
desc = ' {0}:{1}'.format(
files[i].filename, files[i].lineno)
raise_from(ValueError(
'non-numeric value found:' + desc), ex)
if all(l is None for l in lines):
# All the files ended.
break
if any(l is None for l in lines):
# At least one file ended.
desc = ' ' + ', '.join(
'{0}:{1}{2}'.format(
files[i].filename,
'eof' if lines[i] is None else files[i].lineno,
'' if lines[i] is None else '({0})'.format(
len(lines[i])))
for i in range(nfiles)
)
raise ValueError(
'different numbers of values to compare:' + desc)
counts = set(len(l) for l in lines)
if len(counts) >= 2:
# At least one file has a different number of elements.
desc = ' ' + ', '.join(
'{0}:{1}({2})'.format(
files[i].filename, files[i].lineno, len(lines[i]))
for i in range(nfiles)
)
raise ValueError(
'different numbers of values to compare:' + desc)
count = list(counts)[0]
if count == 0:
# An empty line.
if output:
output.write('\n')
continue
nelems += count
if ncols == 0:
# First time.
ncols = count
max_val = [-inf] * count
min_val = [inf] * count
max_abs_diff = [-inf] * count
max_rel_diff = [-inf] * count
only_finite = [True] * count
elif ncols > 0 and ncols != count:
# Not fixed number of columns. Give up comparison of
# each column and flatten the numbers.
ncols = -1
max_val = max(*max_val)
min_val = max(*min_val)
max_abs_diff = max(*max_abs_diff)
max_rel_diff = max(*max_rel_diff)
only_finite = all(only_finite)
# Compare the numbers.
new_max_val = [None] * count
new_min_val = [None] * count
new_abs_diff = [None] * count
new_rel_diff = [None] * count
for i in range(count):
values = [l[i] for l in lines]
new_max_val[i] = max(values)
new_min_val[i] = min(values)
new_abs_diff[i] = abs_diff(values)
new_rel_diff[i] = rel_diff(values)
# Compare the obtained results with the previous ones.
if ncols >= 1:
for i in range(ncols):
max_val[i] = max(max_val[i], new_max_val[i])
min_val[i] = min(min_val[i], new_min_val[i])
max_abs_diff[i] = max(max_abs_diff[i], new_abs_diff[i])
max_rel_diff[i] = max(max_rel_diff[i], new_rel_diff[i])
else:
max_val = max(max_val, *new_max_val)
min_val = min(min_val, *new_min_val)
max_abs_diff = max(max_abs_diff, *new_abs_diff)
max_rel_diff = max(max_rel_diff, *new_rel_diff)
# Check the finiteness.
if ncols >= 1:
for i in range(ncols):
only_finite[i] = (only_finite[i] and
all(isfinite(l[i]) for l in lines))
else:
only_finite = (only_finite and
all(all(isfinite(x) for x in l) for l in lines))
# Now print the result.
if output:
if mode == 'absdiff':
result = [strfunc(e) for e in new_abs_diff]
elif mode == 'reldiff':
result = [strfunc(e) for e in new_rel_diff]
elif mode == 'maxval':
result = [strfunc(e) for e in new_max_val]
elif mode == 'minval':
result = [strfunc(e) for e in new_min_val]
output.write(' {0}\n'.format(' '.join(result)))
finally:
for f in files:
f.close()
# Print statistics.
if output:
if nelems == 0:
output.write('{0} no values\n'.format(comment_char))
elif ncols > 0:
output.write('{0} {1} rows * {2} columns = {3} elements\n'.format(
comment_char, nelems // ncols, ncols, nelems))
for i in range(ncols):
output.write('{0} column {1}:\n'.format(comment_char, i + 1))
output.write('{0} max val = {1}\n'.format(
comment_char, strfunc(max_val[i])))
output.write('{0} min val = {1}\n'.format(
comment_char, strfunc(min_val[i])))
if nfiles >= 2:
output.write('{0} max abs diff = {1}\n'.format(
comment_char, strfunc(max_abs_diff[i])))
output.write('{0} max rel diff = {1}\n'.format(
comment_char, strfunc(max_rel_diff[i])))
if not only_finite[i]:
output.write('{0} infinite number(s) found\n'.format(
comment_char))
max_val = max(*max_val)
min_val = min(*min_val)
max_abs_diff = max(*max_abs_diff)
max_rel_diff = max(*max_rel_diff)
only_finite = all(only_finite)
output.write('{0} all elements:\n'.format(comment_char))
else:
output.write('{0} {1} elements\n'.format(comment_char, nelems))
if nelems > 0:
output.write('{0} max val = {1}\n'.format(
comment_char, strfunc(max_val)))
output.write('{0} min val = {1}\n'.format(
comment_char, strfunc(min_val)))
if nfiles >= 2:
output.write('{0} max abs diff = {1}\n'.format(
comment_char, strfunc(max_abs_diff)))
output.write('{0} max rel diff = {1}\n'.format(
comment_char, strfunc(max_rel_diff)))
if not only_finite:
output.write('{0} infinite number(s) found\n'.format(
comment_char))
def _main():
"""Entry point."""
# Parse command line arguments.
parser = argparse.ArgumentParser(
usage='%(prog)s [options] files..'
)
parser.add_argument('--absdiff',
action='store_const',
const='absdiff',
default='absdiff',
help='print absolute diferences (default)',
dest='mode')
parser.add_argument('--reldiff',
action='store_const',
const='reldiff',
help='print relative diferences',
dest='mode')
parser.add_argument('--maxval',
action='store_const',
const='maxval',
help='print maximum values',
dest='mode')
parser.add_argument('--minval',
action='store_const',
const='minval',
help='print minimum values',
dest='mode')
parser.add_argument('--dps',
action='store',
type=int,
help='use N decimal places with '
'(default: machine precision)',
metavar='N')
parser.add_argument('--comment-chars',
action='store',
default='#',
help='use STR as the start of comments '
'(default: \'#\')',
metavar='STR')
parser.add_argument('files',
nargs='*',
help=argparse.SUPPRESS)
args = parser.parse_args()
compare_files(
args.files,
comments=args.comment_chars,
dps=args.dps,
mode=args.mode
)
if __name__ == '__main__':
_main()
# vim: ft=python et ts=8 sts=4 sw=4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment