Skip to content

Instantly share code, notes, and snippets.

@alaniwi
Created September 12, 2022 13:28
Show Gist options
  • Save alaniwi/7df9fe4e10d051010969a65cce05a5a3 to your computer and use it in GitHub Desktop.
Save alaniwi/7df9fe4e10d051010969a65cce05a5a3 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
help = ("Compresses a netCDF file by compressing time slices and "
"concatenating them. Requires NCO utilities.")
import os
import sys
import subprocess
import argparse
import shutil
from multiprocessing import Pool
from netCDF4 import Dataset
def compress(input_file, output_file, chunk_length, compress_level, dim_name, pool_size,
temp_file_location):
if output_file is None:
if not input_file.endswith(".nc"):
raise ValueError("filename must end .nc")
output_file = f"{input_file[:-3]}_compressed.nc"
with Dataset(input_file) as ds:
if (dim_name not in ds.dimensions
or not ds.dimensions[dim_name].isunlimited):
raise Exception("'time' not unlimited dimension")
length = ds.variables[dim_name].size
tmp_files = []
try:
all_ncks_args = []
for chunk_index, chunk_start in enumerate(range(0, length,
chunk_length)):
chunk_end = min(chunk_start + chunk_length - 1, length - 1)
tmp_file = f"{temp_file_location}/{output_file}_part{chunk_index}"
tmp_files.append(tmp_file)
ncks_args = ["-4",
"-L", f"{compress_level}",
"-d", f"time,{chunk_start},{chunk_end}",
input_file, tmp_file]
all_ncks_args.append(ncks_args)
print(f"Compressing {len(all_ncks_args)} chunks (parallel pool size {pool_size})")
with Pool(processes=pool_size) as pool:
res = pool.map(run_ncks, all_ncks_args)
#for msg in res:
# print(msg)
temp_output_file = (os.path.join(temp_file_location,
os.path.basename(output_file))
+ f"_{os.getpid()}")
ncrcat_args = tmp_files + [temp_output_file]
print("Concatenating compressed chunks")
msg = run_prog("ncrcat", ncrcat_args)
#print(msg)
print("Moving temporary output file")
shutil.move(temp_output_file, output_file)
final_size = os.path.getsize(output_file)
initial_size = os.path.getsize(input_file)
print(f"{input_file} ({initial_size} bytes) => {output_file} ({final_size} bytes), "
f"compression {(1 - final_size / initial_size) * 100 : .1f}%")
finally:
print("removing temporary files")
for file in tmp_files:
if os.path.exists(file):
try:
os.remove(file)
except OSError:
print(f"warning: couldn't delete temp file {file}")
def run_ncks(args):
message = run_prog("ncks", args)
return message + f" - output file size {os.path.getsize(args[-1])}"
def run_prog(exe_name, args):
all_args = [exe_name] + args
cmdline = ' '.join(all_args)
#print(f"running {cmdline}")
sp = subprocess.Popen(all_args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = sp.communicate()
status = sp.returncode
if status != 0:
raise Exception(f"Failed while running {cmdline} ."
f"Command stdout: {stdout}. "
f"Command stderr: {stderr}. "
f"Command exit code: {status}.")
return(f"Ran: {cmdline}")
def parse_args():
parser = argparse.ArgumentParser(help)
parser.add_argument("-l", "--chunk-length", type=int,
help="chunk size in timesteps, defaults to 100",
default=100)
parser.add_argument("-c", "--compress-level", type=int,
help="compress level (should be 1-9, default 6)",
default=6)
parser.add_argument("-t", "--temp-file-location",
help="directory for temporary files",
default=".")
parser.add_argument("-d", "--dimension",
help="dimension for chunks (defaults to 'time')",
default="time")
parser.add_argument("-p", "--pool-size",
help="number of parallel processes for compression",
type=int,
default=4)
parser.add_argument("input_file", help="input filename")
parser.add_argument("output_file", help=("output filename "
"(defaults to input filename but with '.nc' => 'compressed.nc')"),
nargs="?")
return parser.parse_args()
def main():
filenames = sys.argv[1:]
args = parse_args()
compress(args.input_file, args.output_file, args.chunk_length, args.compress_level,
args.dimension, args.pool_size, args.temp_file_location)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment