Created
September 12, 2022 13:28
-
-
Save alaniwi/7df9fe4e10d051010969a65cce05a5a3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
help = ("Compresses a netCDF file by compressing time slices and " | |
"concatenating them. Requires NCO utilities.") | |
import os | |
import sys | |
import subprocess | |
import argparse | |
import shutil | |
from multiprocessing import Pool | |
from netCDF4 import Dataset | |
def compress(input_file, output_file, chunk_length, compress_level, dim_name, pool_size, | |
temp_file_location): | |
if output_file is None: | |
if not input_file.endswith(".nc"): | |
raise ValueError("filename must end .nc") | |
output_file = f"{input_file[:-3]}_compressed.nc" | |
with Dataset(input_file) as ds: | |
if (dim_name not in ds.dimensions | |
or not ds.dimensions[dim_name].isunlimited): | |
raise Exception("'time' not unlimited dimension") | |
length = ds.variables[dim_name].size | |
tmp_files = [] | |
try: | |
all_ncks_args = [] | |
for chunk_index, chunk_start in enumerate(range(0, length, | |
chunk_length)): | |
chunk_end = min(chunk_start + chunk_length - 1, length - 1) | |
tmp_file = f"{temp_file_location}/{output_file}_part{chunk_index}" | |
tmp_files.append(tmp_file) | |
ncks_args = ["-4", | |
"-L", f"{compress_level}", | |
"-d", f"time,{chunk_start},{chunk_end}", | |
input_file, tmp_file] | |
all_ncks_args.append(ncks_args) | |
print(f"Compressing {len(all_ncks_args)} chunks (parallel pool size {pool_size})") | |
with Pool(processes=pool_size) as pool: | |
res = pool.map(run_ncks, all_ncks_args) | |
#for msg in res: | |
# print(msg) | |
temp_output_file = (os.path.join(temp_file_location, | |
os.path.basename(output_file)) | |
+ f"_{os.getpid()}") | |
ncrcat_args = tmp_files + [temp_output_file] | |
print("Concatenating compressed chunks") | |
msg = run_prog("ncrcat", ncrcat_args) | |
#print(msg) | |
print("Moving temporary output file") | |
shutil.move(temp_output_file, output_file) | |
final_size = os.path.getsize(output_file) | |
initial_size = os.path.getsize(input_file) | |
print(f"{input_file} ({initial_size} bytes) => {output_file} ({final_size} bytes), " | |
f"compression {(1 - final_size / initial_size) * 100 : .1f}%") | |
finally: | |
print("removing temporary files") | |
for file in tmp_files: | |
if os.path.exists(file): | |
try: | |
os.remove(file) | |
except OSError: | |
print(f"warning: couldn't delete temp file {file}") | |
def run_ncks(args): | |
message = run_prog("ncks", args) | |
return message + f" - output file size {os.path.getsize(args[-1])}" | |
def run_prog(exe_name, args): | |
all_args = [exe_name] + args | |
cmdline = ' '.join(all_args) | |
#print(f"running {cmdline}") | |
sp = subprocess.Popen(all_args, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
stdout, stderr = sp.communicate() | |
status = sp.returncode | |
if status != 0: | |
raise Exception(f"Failed while running {cmdline} ." | |
f"Command stdout: {stdout}. " | |
f"Command stderr: {stderr}. " | |
f"Command exit code: {status}.") | |
return(f"Ran: {cmdline}") | |
def parse_args(): | |
parser = argparse.ArgumentParser(help) | |
parser.add_argument("-l", "--chunk-length", type=int, | |
help="chunk size in timesteps, defaults to 100", | |
default=100) | |
parser.add_argument("-c", "--compress-level", type=int, | |
help="compress level (should be 1-9, default 6)", | |
default=6) | |
parser.add_argument("-t", "--temp-file-location", | |
help="directory for temporary files", | |
default=".") | |
parser.add_argument("-d", "--dimension", | |
help="dimension for chunks (defaults to 'time')", | |
default="time") | |
parser.add_argument("-p", "--pool-size", | |
help="number of parallel processes for compression", | |
type=int, | |
default=4) | |
parser.add_argument("input_file", help="input filename") | |
parser.add_argument("output_file", help=("output filename " | |
"(defaults to input filename but with '.nc' => 'compressed.nc')"), | |
nargs="?") | |
return parser.parse_args() | |
def main(): | |
filenames = sys.argv[1:] | |
args = parse_args() | |
compress(args.input_file, args.output_file, args.chunk_length, args.compress_level, | |
args.dimension, args.pool_size, args.temp_file_location) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment