Skip to content

Instantly share code, notes, and snippets.

@erget
Last active March 4, 2016 12:35
Show Gist options
  • Save erget/296209405591f705a6bd to your computer and use it in GitHub Desktop.
Save erget/296209405591f705a6bd to your computer and use it in GitHub Desktop.
A handy tool for profiling different aspects of packing methods used in ecCodes / GRIB API.
#!/bin/env python
"""
Measure write, read time and memory consumption for different packingTypes
"""
from __future__ import print_function
import argparse
from argparse import RawDescriptionHelpFormatter
import logging
from logging import info
import os
import shutil
from tempfile import mkdtemp
import time
import timeit
import matplotlib
import matplotlib.pylab as plt
import numpy as np
import pandas as pd
from eccodes import (CodesInternalError,
codes_count_in_file,
codes_clone,
codes_get_message_size,
codes_get,
codes_get_values,
codes_grib_new_from_file,
codes_release,
codes_set,
codes_set_values,
codes_write)
DESCRIPTION = """
Benchmark encoding/decoding for different packing methods using ecCodes.
"""
EPILOG = """
Benchmarks are provided on encoding, decoding and storage size. This should be
used on many different GRIBs with different kinds of data in order to obtain a
clear picture of how each packing method treats the specific data in question.
Processor time is measured with garbage collection deactivated.
Be aware that not all packing types are fully supported for all input GRIBs.
Also, ecCodes sometimes silently fails to reencode the data. The script catches
exceptions produced by the library and verifies that the data was reencoded.
Results are only shown for data that was actually reencoded.
Some errors cannot be caught because the program is commanded to exit from
within ecCodes. If this is the case for your data, you should only test packing
for supported packing types.
You can select which packing types to test by setting the comments at the top
of the script.
"""
PACKING_TYPES = (
"grid_ieee",
"grid_jpeg",
# "grid_png",
"grid_second_order",
# "grid_second_order_SPD1",
# "grid_second_order_SPD2",
# "grid_second_order_SPD3",
# "grid_second_order_boustrophedonic",
"grid_second_order_no_SPD",
"grid_second_order_no_boustrophedonic",
"grid_simple",
# "grid_simple_log_preprocessing",
"grid_simple_matrix",
# "grid_ccsds",
# "grid_second_order_constant_width",
# "grid_second_order_general_grib1",
# "grid_second_order_row_by_row",
"spectral_complex",
"spectral_ieee",
"spectral_simple"
)
UNITS = {"encode": "s", "decode": "s", "size": "byte"}
def array_for_each_grib(file_handle):
"""Return empty np.array with length == n messages in file."""
return np.empty(codes_count_in_file(file_handle))
def time_execution(function, *args, **kwargs):
"""Accurately time the execution of a function."""
timeit.gc.disable()
start = time.clock()
function(*args, **kwargs)
end = time.clock()
timeit.gc.enable()
return end - start
def repack(input_file, packing_type, outfile):
"""Repack infile with packing_type, write result to outfile."""
with open(input_file) as infile:
encoding_times = array_for_each_grib(infile)
i = 0
while True:
in_gid = codes_grib_new_from_file(infile)
if in_gid is None:
break
payload = codes_get_values(in_gid)
clone_id = codes_clone(in_gid)
codes_set(clone_id, "packingType", packing_type)
encoding_times[i] = time_execution(codes_set_values, clone_id,
payload)
with open(outfile, "a") as output:
codes_write(clone_id, output)
codes_release(clone_id)
codes_release(in_gid)
i += 1
with open(outfile) as output:
gid = codes_grib_new_from_file(output)
encoded_type = codes_get(gid, "packingType")
codes_release(gid)
if encoded_type != packing_type:
err_msg = "Repacking failed silently."
info(err_msg)
raise RuntimeError(err_msg)
return encoding_times
def unpack(input_file):
"""Time unpacking of all GRIBs in input_file."""
with open(input_file) as infile:
unpacking_times = array_for_each_grib(infile)
i = 0
while True:
in_gid = codes_grib_new_from_file(infile)
if in_gid is None:
break
unpacking_times[i] = time_execution(codes_get_values, in_gid)
codes_release(in_gid)
i += 1
return unpacking_times
def measure_sizes(input_file):
"""Measure size in bytes of all GRIBs in input_file."""
with open(input_file) as infile:
sizes = array_for_each_grib(infile)
i = 0
while True:
in_gid = codes_grib_new_from_file(infile)
if in_gid is None:
break
sizes[i] = codes_get_message_size(in_gid)
codes_release(in_gid)
i += 1
return sizes
def profile_packing_types(input_file):
"""Profile different packing types for all files in given list."""
encode_times = {}
decode_times = {}
sizes = {}
for packing_type in PACKING_TYPES:
info("Packing type {}...".format(packing_type))
filename = "{}.grib".format(packing_type)
info("Encoding...")
try:
encode_times[packing_type] = repack(input_file,
packing_type,
filename)
info("Decoding...")
decode_times[packing_type] = unpack(filename)
info("Measuring sizes...")
sizes[packing_type] = measure_sizes(filename)
except Exception:
pass
return {"encode": pd.DataFrame(encode_times),
"decode": pd.DataFrame(decode_times),
"size": pd.DataFrame(sizes)}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=DESCRIPTION, epilog=EPILOG,
formatter_class=RawDescriptionHelpFormatter)
parser.add_argument("grib", nargs="+", help="GRIB file to process")
parser.add_argument("--show", action="store_true",
help="Display results as boxplots")
parser.add_argument("--csv", help="Save results as CSV to specified file",
default=None)
parser.add_argument("--keep", action="store_true",
help="Keep results for inspection")
parser.add_argument("-v", action="store_true", help="Verbose output")
args = parser.parse_args()
gribfiles = [os.path.realpath(x) for x in args.grib]
if args.csv is not None:
args.csv = os.path.realpath(args.csv)
if args.v:
logging.basicConfig(format="%(levelname)s: %(message)s",
level=logging.INFO)
tmpdir = mkdtemp()
try:
os.chdir(tmpdir)
total_results = {"encode": pd.DataFrame(),
"decode": pd.DataFrame(),
"size": pd.DataFrame()}
for gribfile in gribfiles:
info("Working with GRIBS in {}".format(gribfile))
local_results = profile_packing_types(gribfile)
for key in local_results.keys():
total_results[key] = pd.concat((total_results[key],
local_results[key]))
finally:
if args.keep:
print("Results are in {}".format(tmpdir))
else:
shutil.rmtree(tmpdir)
aggregated_results = pd.DataFrame()
for op, df in total_results.items():
op_res = pd.DataFrame({"{}_min".format(op): df.min(),
"{}_avg".format(op): df.mean(),
"{}_max".format(op): df.max()})
aggregated_results = pd.concat((aggregated_results, op_res), axis=1)
ngribs = total_results["encode"].shape[0]
print("{} GRIBs tested.".format(ngribs))
print(aggregated_results)
if args.show:
matplotlib.style.use("ggplot")
for op in total_results.keys():
unit = UNITS[op]
total_results[op].plot(kind="box")
plt.title(op)
plt.ylabel(unit, rotation="horizontal")
plt.xticks(rotation="vertical")
plt.tight_layout()
plt.show()
if args.csv:
aggregated_results.to_csv(args.csv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment