Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Measures compression and decompression speed and size ratio on many files
#!/usr/bin/python3
import argparse
from collections import namedtuple
import csv
import os
import shutil
import subprocess
import statistics
import re
Stat = namedtuple('Stat', ['filename',
'size_ratio',
'compress_MB_sec', 'decompress_MB_sec',
'original_size', 'compressed_size',
'compress_runtime_usec', 'decompress_runtime_usec'])
def main():
parser = argparse.ArgumentParser(description='run compression on files in folder, and calculate stats')
parser.add_argument('dirs', nargs='+', help='directory to process files from')
parser.add_argument('--output', '-o', required=True, help='report CSV file')
parser.add_argument('--tmpfs', '-t', required=True, help='tmpfs dir to move the file to')
parser.add_argument('--pic-suffix', '-p', required=False, default='.ppm', help='process images with this suffix only')
parser.add_argument('--suffix', '-s', required=False, default='.out', help='suffix of output files')
parser.add_argument('--decompress', '-d', required=True, help='''command line to run on file, replace {in/out} with file name''')
parser.add_argument('--compress', '-c', required=True, help='''command line to run on file, replace {in/out} with file name
For example:
./runner.py dir -x 'zstd -encode {in} -level 9 -o {out}'
would run on each file in dir, 'zstd -encode {in} -level 9 {out}' with {in} replaced with file name
and output replaced with filename + .suffix (as received in command line arguments).
So if dir has file1 and file2, we would run
zstd -encode "dir/file1" -level 9 -o dir/file1.out
zstd -encode "dir/file2" -level 9 -o dir/file2.out
We would search the standard output for the time it took to run
there MUST be a printf of runtime:1214221 describing how many usec
did it take to process the image
''')
args = parser.parse_args()
stats_groups = {}
print('dirs to process: ' + ','.join(args.dirs))
for d in args.dirs:
stats = []
stats_groups[d] = stats
for root, dirs, files in os.walk(d):
for filename in files:
if not filename.endswith(args.pic_suffix):
continue
fullpath = os.path.join(args.tmpfs, 'tempfile'+args.pic_suffix)
shutil.copy(os.path.join(root, filename), fullpath)
output_fullpath = fullpath + args.suffix
decompressed_output_fullpath = output_fullpath + '.decompressed'
compress_runtime_usec = run_and_return_runtime(args.compress, fullpath, output_fullpath)
decompress_runtime_usec = run_and_return_runtime(args.decompress, output_fullpath, decompressed_output_fullpath)
original_size = os.stat(fullpath).st_size
compressed_size = os.stat(output_fullpath).st_size
compress_megabyte_per_sec = MB_per_sec_from_size_and_runtime(original_size, compress_runtime_usec)
decompress_megabyte_per_sec = MB_per_sec_from_size_and_runtime(original_size, decompress_runtime_usec)
ratio = float(compressed_size) / float(original_size)
stats.append(Stat(filename=fullpath,
size_ratio=float(compressed_size)/original_size,
compress_MB_sec=compress_megabyte_per_sec, decompress_MB_sec=decompress_megabyte_per_sec,
original_size=original_size, compressed_size=compressed_size,
compress_runtime_usec=compress_runtime_usec, decompress_runtime_usec=decompress_runtime_usec))
with open(args.output, 'w') as fp:
csvwriter = csv.writer(fp)
csvwriter.writerow(Stat._fields)
all_stats = []
for stats in stats_groups.values():
for stat in stats:
all_stats.append(stat)
csvwriter.writerow(['all dirs accumulations'])
write_stats(csvwriter, all_stats)
for i in range(3):
csvwriter.writerow([])
for d, stats in stats_groups.items():
csvwriter.writerow([d])
for stat in stats:
csvwriter.writerow(stat._asdict().values())
write_stats(csvwriter, stats)
if False:
stat = stats[0]
csvwriter.writerow(['accumulations'])
for acc_name in ['mean', 'stdev', 'variance', 'median']:
acc = stat._asdict()
acc_fun = getattr(statistics, acc_name)
for field in Stat._fields[1:]:
acc[field] = acc_fun([s._asdict()[field] for s in stats])
acc_row = list(acc.values())
acc_row[0] = acc_name
csvwriter.writerow(acc_row)
for i in range(3):
csvwriter.writerow([])
def write_stats(csvwriter, stats):
if not stats:
return
stat = stats[0]
for acc_name in ['mean', 'stdev', 'variance', 'median']:
acc = stat._asdict()
acc_fun = getattr(statistics, acc_name)
for field in Stat._fields[1:]:
acc[field] = acc_fun([s._asdict()[field] for s in stats])
acc_row = list(acc.values())
acc_row[0] = acc_name
csvwriter.writerow(acc_row)
def MB_per_sec_from_size_and_runtime(size_bytes, runtime_usec):
bytes_per_usec = float(size_bytes)/runtime_usec
bytes_per_sec = bytes_per_usec * 1_000_000
return bytes_per_sec / 1_024**2
re_for_runtime = re.compile('runtime_usec:([0-9]+)')
def run_and_return_runtime(cmd, input_file, output_file):
cmd = cmd.replace('{in}', "'%s'" % input_file)
cmd = cmd.replace('{out}', "'%s'" % output_file)
if os.path.isfile(output_file):
os.remove(output_file)
output = str(subprocess.check_output(['bash', '-xvc', cmd]))
times = re_for_runtime.findall(output)
if not times:
print('Cannot find runtime in process, aborting')
sys.exit(1)
return int(times[0])
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment