Last active
May 1, 2022 14:30
-
-
Save davidbau/5d72c68c550946249f9d9636a007bff2 to your computer and use it in GitHub Desktop.
npycat: cat utility and swiss army knife for npy and npz files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
npycat: cat utility and swiss army knife for numpy and pytorch files. | |
""" | |
import numpy, argparse | |
args = None | |
def main(): | |
global args | |
parser = argparse.ArgumentParser( | |
description="""\ | |
prints the contents of numpy .npy or .npz, or pytorch .pt files. | |
""", | |
epilog="""\ | |
examples: | |
just print the metadata (shape and type) for data.npy | |
npycat data.npy --nodata | |
show every number, and the mean and variance, in a 1-d slice of a 5-d tensor | |
npycat tensor.npy[0,0,:,0,1] --noabbrev --mean --var | |
""", | |
formatter_class=argparse.RawDescriptionHelpFormatter) | |
parser.add_argument('files', metavar='file', nargs='*', | |
help='filenames with optional slices such as file.npy[:,0]') | |
parser.add_argument('--slice', | |
metavar='slice', default=None, | |
type=parse_slice, | |
help='slice to apply to all files') | |
parser.add_argument('--unpackbits', | |
metavar='axis', nargs='?', | |
default=None, const=-1, type=int, | |
help='unpack single-bits from byte array') | |
parser.add_argument('--key', metavar='key', default=None, | |
help='key to dereference in npz dictionary') | |
add_boolean_argument(parser, 'shape', True, | |
help='show array shape') | |
add_boolean_argument(parser, 'type', True, | |
help='show array data type') | |
add_boolean_argument(parser, 'mean', | |
help='compute mean') | |
add_boolean_argument(parser, 'std', | |
help='compute stdev') | |
add_boolean_argument(parser, 'var', | |
help='compute variance') | |
add_boolean_argument(parser, 'min', | |
help='compute min') | |
add_boolean_argument(parser, 'max', | |
help='compute max') | |
add_boolean_argument(parser, 'l0', | |
help='compute L0 norm, number of nonzeros') | |
add_boolean_argument(parser, 'l1', | |
help='compute L1 norm, sum of absolute values') | |
add_boolean_argument(parser, 'l2', | |
help='compute L2 norm, euclidean size') | |
add_boolean_argument(parser, 'linf', | |
help='compute L-infinity norm, max absolute value') | |
add_boolean_argument(parser, 'bincount', | |
help='compute bincount histogram') | |
add_boolean_argument(parser, 'sort', | |
help='compute sorted order') | |
add_boolean_argument(parser, 'meta', True, | |
help='use --nometa to suppress metadata') | |
add_boolean_argument(parser, 'data', True, | |
help='use --nodata to suppress data') | |
add_boolean_argument(parser, 'abbrev', True, | |
help='use --noabbrev to suppress abbreviation of data') | |
add_boolean_argument(parser, 'name', False, | |
help='show filename with metadata') | |
add_boolean_argument(parser, 'kname', True, | |
help='show key name from npz dictionaries') | |
add_boolean_argument(parser, 'raise', False, | |
help='raise errors instead of catching them') | |
args = parser.parse_args() | |
if getattr(args, 'raise'): | |
_worker() | |
else: | |
try: | |
_worker() | |
except Exception as e: | |
print(e) | |
def format_meta(arr, key=None, filename=None): | |
metaline = [] | |
metasection = [] | |
if args.name and filename is not None: | |
metaline.append(filename) | |
if args.kname is not None and key is not None: | |
metaline.append('%s:' % key) | |
if args.type: | |
metaline.append(typestr(arr)) | |
if args.shape: | |
if hasattr(arr, 'shape') and len(arr.shape) > 0: | |
metaline.append('size=' + | |
'x'.join(str(s) for s in arr.shape)) | |
if args.mean: | |
try: | |
metaline.append('mean=%f' % arr.mean()) | |
except: | |
pass | |
if args.std: | |
try: | |
metaline.append('std=%f' % arr.std()) | |
except: | |
pass | |
if args.var: | |
try: | |
metaline.append('var=%f' % arr.var()) | |
except: | |
pass | |
if args.max: | |
try: | |
metaline.append('max=%f' % arr.max()) | |
except: | |
pass | |
if args.min: | |
try: | |
metaline.append('min=%f' % arr.min()) | |
except: | |
pass | |
if args.l0: | |
try: | |
metaline.append('l0=%d' % len(arr.nonzero()[0])) | |
except: | |
pass | |
if args.l1: | |
try: | |
metaline.append('l1=%f' % numpy.abs(arr).sum()) | |
except: | |
pass | |
if args.l2: | |
try: | |
metaline.append('l2=%f' % numpy.sqrt(numpy.square(arr).sum())) | |
except: | |
pass | |
if args.linf: | |
try: | |
metaline.append('linf=%f' % max(abs(arr.max()), abs(arr.min()))) | |
except: | |
pass | |
if args.bincount: | |
metasection.append('bincount:') | |
for i, count in enumerate(numpy.bincount(arr.flatten())): | |
metasection.append('%4d: %d' % (i, count)) | |
if args.sort: | |
metasection.append('sort:') | |
flatarr = arr.flatten() | |
for i, ind in enumerate(numpy.argsort(flatarr)): | |
metasection.append('%4d: %g' % (ind, flatarr[ind])) | |
metaline = ' '.join(metaline) | |
metasection = '\n'.join(metasection) | |
return metaline, metasection | |
def _worker(): | |
omit_file = None | |
if args.slice is None: | |
slices = [f for f in args.files | |
if f.startswith('[') and f.endswith(']')] | |
if len(slices) == 1: | |
omit_file = slices[0] | |
args.slice = parse_slice(slices[0]) | |
first = True | |
for filename in args.files: | |
if filename == omit_file: | |
continue | |
dat = try_open(filename) | |
if args.key: | |
dat = dat[args.key] | |
for key, arr in dat.items() if hasattr(dat, 'items') else [(None, dat)]: | |
if first: | |
first = False | |
elif args.data: | |
print('') # print spacer | |
if args.unpackbits is not None: | |
arr = numpy.unpackbits(arr, axis=args.unpackbits) | |
if args.slice is not None: | |
arr = arr[args.slice] | |
metaline, metasection = format_meta(arr, key, filename) | |
if args.meta and metaline: | |
print(metaline) | |
if args.meta and metasection: | |
print(metasection) | |
if args.data: | |
if args.abbrev: | |
print_abbrev(arr) | |
else: | |
print_full(arr) | |
def is_object_arr(data): | |
return isinstance(data, numpy.ndarray) and data.dtype.name == 'object' | |
def is_integer_type(data): | |
if isinstance(data, numpy.ndarray): | |
return (numpy.issubdtype(data.dtype, numpy.integer) or | |
data.dtype == numpy.bool) | |
return not torch.is_floating_point(data) | |
def print_abbrev(data, colwidth=0, screenwidth=None): | |
# Compute some sizes for formatting | |
innermost = False | |
if screenwidth is None: | |
try: | |
r, cols = subprocess.check_output(['stty', 'size']).split() | |
screenwidth = int(cols) | |
except: | |
screenwidth = 80 | |
innermost = True | |
colwidth = dict(float64=16, float32=9, float16=7, | |
int64=19, int32=9, int16=5, int8=3, | |
uint64=19, uint32=9, uint16=5, uint8=3).get( | |
typestr(data), 16) | |
if hasattr(data, 'shape') and len(data.shape) == 0: | |
data = data.item() | |
if not hasattr(data, 'shape'): | |
try: | |
print(abbrev_num(data, colwidth, innermost)) | |
return | |
except: | |
pass | |
if hasattr(data, 'items'): | |
dictlike = True | |
try: | |
items = data.items() | |
except: | |
dictlike = False | |
if dictlike: | |
print('{') | |
first = True | |
for k, v in items: | |
if not first: | |
print() | |
if args.meta: | |
metaline, metasection = format_meta(v, k) | |
if metaline: | |
print(metaline) | |
if metasection: | |
print(metasection) | |
else: | |
print(str(k) + ':') | |
print_abbrev(v, colwidth, screenwidth) | |
first = False | |
print('}') | |
return | |
print(repr(data)) | |
return | |
dots = ' ... ' | |
surround = len(data.shape) > 2 or is_object_arr(data) | |
edges = 1 if len(data.shape) > 2 else 3 if len(data.shape) > 1 else 4 | |
if len(data) <= edges * 2 + 1: | |
indexes = [range(len(data))] | |
numparts = len(data) | |
numdots = 0 | |
else: | |
indexes = [range(0, edges), [None], range(len(data) - edges, len(data))] | |
numparts = edges * 2 | |
numdots = len(dots) | |
if is_object_arr(data) or len(data.shape) > 1: | |
for chunk in indexes: | |
for j in chunk: | |
if j is None: | |
print(dots) | |
else: | |
if surround: | |
print('[') | |
print_abbrev(data[j], colwidth, screenwidth) | |
if surround: | |
try: | |
print('] %s shape=%s' % (str(data[j].dtype), | |
'x'.join(str(s) for s in data[j].shape))) | |
except: | |
print(']') | |
elif numparts > 0: | |
fieldwidth = min(colwidth, (screenwidth - numdots) // numparts - 1) | |
print(' '.join([dots if j is None else | |
abbrev_num(data[j], fieldwidth, innermost) | |
for chunk in indexes for j in chunk])) | |
def print_full(data): | |
if len(data.shape) == 0: | |
print(data.item()) | |
return | |
if len(data.shape) > 1: | |
for j in range(len(data)): | |
print_full(data[j]) | |
if len(data.shape) > 2: | |
print('') | |
elif is_integer_type(data): | |
print(' '.join(['%d' % d for d in data])) | |
else: | |
print(' '.join(['%g' % d for d in data])) | |
def abbrev_num(num, width, strip): | |
if width == 0: | |
return '%g' % num | |
precision = width - 1 | |
result = ('%%%d.%dg' % (width, width)) % num | |
while len(result) > width: | |
precision -= len(result) - width | |
result = ('%%%d.%dg' % (width, precision)) % num | |
if strip: | |
result = result.strip() | |
return result | |
def typestr(arr): | |
if hasattr(arr, 'dtype'): | |
return str(arr.dtype) | |
if type(arr).__module__ == 'builtins': | |
return type(arr).__name__ | |
return type(arr).__module__ + '.' + type(arr).__name__ | |
def try_open(filename): | |
the_err = None | |
try: | |
return numpy.lib.format.open_memmap(filename, mode='r') | |
except: | |
pass | |
try: | |
result = numpy.load(filename, allow_pickle=True) | |
if not hasattr(result, 'items') or 'archive/data.pkl' not in result: | |
return result | |
except Exception as err: | |
the_err = err | |
pass | |
try: | |
import torch | |
return torch.load(filename, map_location=torch.device('cpu')) | |
except Exception as torch_err: | |
if not the_err: | |
the_err = torch_err | |
pass | |
if '[' in filename and filename.endswith(']'): | |
filename, bracketed = filename[:-1].split('[', 1) | |
return try_open(filename)[parse_slice(bracketed)] | |
elif ':' in filename: | |
filename, key = filename.split(':', 1) | |
return try_open(filename)[key] | |
elif '.' not in filename: | |
from glob import glob | |
candidates = [f for f in glob(filename + '.*') | |
if any([f.endswith(e) for e in ['.npy', '.npz', '.pt']])] | |
if len(candidates) == 1: | |
return try_open(candidates[0]) | |
raise the_err | |
def parse_slice(expr): | |
def single_index(s): | |
return int(s) if s.strip() not in ['', 'None', 'newaxis'] else None | |
def single_slice(expr): | |
if '...' == expr.strip: | |
return Ellipsis | |
pieces = list(map(single_index, expr.split(':'))) | |
if len(pieces) == 1: | |
return pieces[0] | |
else: | |
return slice(*pieces) | |
expr = expr.strip() | |
if expr.startswith('[') and expr.endswith(']'): | |
expr = expr[1:-1] | |
result = tuple(map(single_slice, expr.split(','))) | |
return result[0] if len(result) == 1 else result | |
def add_boolean_argument(parser, name, default=None, **kwargs): | |
"""Add a boolean argument to an ArgumentParser instance.""" | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument('--' + name, | |
default=default, | |
action='store_true', | |
**kwargs) | |
group.add_argument('--no-' + name, dest=name, action='store_false', | |
help=argparse.SUPPRESS) | |
group.add_argument('--no' + name, dest=name, action='store_false', | |
help=argparse.SUPPRESS) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment