Skip to content

Instantly share code, notes, and snippets.

@mangecoeur
Created March 24, 2017 10:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mangecoeur/f28feebf30603fe5faba71a8eb8a2f50 to your computer and use it in GitHub Desktop.
Save mangecoeur/f28feebf30603fe5faba71a8eb8a2f50 to your computer and use it in GitHub Desktop.
Script to archive .pp files to netCDF
import datetime
import shutil
import tempfile
import tarfile
from collections import namedtuple
from pathlib import Path
from enum import IntEnum
import numpy as np
import netCDF4 as ncdf
from dateutil.rrule import rrule, HOURLY
class ParameterCodes(IntEnum):
temperature = 3236
precipitation = 88888
irradiance = 1235
irradiance_longwave = 2207
humidity = 3245
wind_speed = 99999
PARAMETERS = {
3236: {
'desc': 'Temperature (Kelvin)',
'name': 'temperature',
'units': 'K'
},
88888: {
'desc': 'Precipitation (kg/m^2 s)',
'name': 'precipitation',
'units': 'kg m-2 s-1'
},
1235: {
'desc': 'Shortwave radiation (Watts/m^2)',
'name': 'irradiance',
'units': 'W m-2'
},
2207: {
'desc': 'Reflected longwave radiation (Watts/m^2)',
'name': 'irradiance_longwave',
'units': 'W m-2'
},
3245: {
'desc': 'Relative humidity (%)',
'name': 'humidity',
'units': '%'
},
99999: {
'desc': 'Windspeed (m/s)',
'name': 'wind_speed',
'units': 'm s-1'
}
}
# ideally would expand to include all 64 terms, but can't be arsed...
Header = namedtuple('Header', ['year', 'month', 'day', 'hour', 'data_len', 'ny', 'nx',
'field_code', 'level', 'p_lat', 'p_lon', 'zy', 'dy', 'zx', 'dx'])
def get_pp_header(file):
# the first 64 values are the header, which are 32bit integers
# so 64 x 4bytes each - read 64x4 bytes from file
# WARNING - for some reason needed to add 3 ints. First and last one are 256, not right :/
# I suspect that arrays are padded by 4 bytes at each end. So pad at start and end
# of header (2x4 bytes) plus padding at start of data (1x4 bytes) = 3x4bytes extra
header_bytes = file.read(67 * 4)
# In the fortran code, define header as containing both ints and floats
# using equivalence.
header = np.frombuffer(header_bytes, dtype='>i', offset=4, count=64)
rheader = np.frombuffer(header_bytes, dtype='>f', offset=4, count=64)
# from the extract_pp.f90, these should be the bits of the header corresponding to each bit of metadata
# Note that we do zero-indexing so everything is a bit off relative to fortran version
yyyy = header[0]
mm = header[1]
dd = header[2]
hh = header[3]
data_len = header[14]
# grid size in positions 18 and 19
ny = header[17]
nx = header[18]
fieldcode = header[41]
level = header[33]
# all floats
b_plat = rheader[55]
b_plon = rheader[56]
b_zy = rheader[58]
b_dy = rheader[59]
b_zx = rheader[60]
b_dx = rheader[61]
return Header(yyyy, mm, dd, hh, data_len, ny, nx, fieldcode,
level, b_plat, b_plon, b_zy, b_dy, b_zx, b_dx)
def get_num_hours(header):
if header.year % 4 == 0:
num_hours = 8784
else:
num_hours = 8760
return num_hours
def get_units(header):
return PARAMETERS[header.field_code]['units']
def get_var_name(header: Header):
field_code = header.field_code
var_name = PARAMETERS[field_code]['name']
return var_name
def get_pp_contents(file, data_len) -> np.ndarray:
"""
Assume the file has been "reset" since we don't
always read the header from it
The data is in 32bit floats.
:param file:
:param data_len: Length of the data, from the header
:return:
"""
file.seek(67 * 4)
file_contents = np.fromfile(file, dtype='>f4')
return file_contents[:data_len]
def init_array(nx, ny, nt):
a = np.empty((ny, nx, nt))
a[:] = np.NaN
return a
def load_single(file_path, year):
with file_path.open("rb") as file:
header = get_pp_header(file)
# Early return if this is thr wrong year!
if header.year != year:
# return None, None
raise ValueError('Wrong year in header: {} given, {} required'.format(header.year, year))
start_date = datetime.datetime(header.year, header.month, header.day, header.hour)
day_of_year = start_date.timetuple().tm_yday
# Get the index corresponding to file's timestamp (from zero to n_hours_in_year)
d_idx = 24 * (day_of_year - 1) + header.hour
arr = get_pp_contents(file, header.data_len)
arr = arr.reshape((header.ny, header.nx))
return d_idx, arr
def load_files_to_array(files, header):
# Some files appear to be mixed up with different years.
# Use the year from the first file header to filter input files
year = header.year
num_hours = get_num_hours(header)
out = init_array(header.nx, header.ny, num_hours)
for file_path in files:
d_idx, arr = load_single(file_path, year)
if d_idx is not None:
out[:, :, d_idx] = arr
return out
def configure_dset(dset, header, latitudes, longitudes, shape, start_date, units, var_name):
dset.title = 'NWP {} data on ~4km grid, non-standard lat/lon'.format(var_name)
dset.institution = 'MetOffice'
dset.source = 'NWP model'
dset.comment = 'Archived from MetOffice .pp files'
dset.p_lat = header.p_lat
dset.p_lon = header.p_lon
dset.zx = header.zx
dset.zy = header.zy
dset.nx = header.nx
dset.ny = header.ny
dset.variable = var_name
nlat, nlon, num_hours = shape
var_dim = dset.createDimension(var_name, None)
time = dset.createDimension('time', num_hours)
lat = dset.createDimension('lat', nlat)
lon = dset.createDimension('lon', nlon)
lat_var = dset.createVariable('lat', 'f4', ('lat',))
long_var = dset.createVariable('lon', 'f4', ('lon',))
main_var = dset.createVariable(var_name, 'f4', ('lat', 'lon', 'time',),
zlib=True,
least_significant_digit=4, # helps compress size!
chunksizes=(72, 72, 168)
# chunksizes=(nlat, nlon, 1)
)
main_var.units = units
main_var.long_name = var_name
main_var.standard_name = var_name
# main_var.grid_mapping = 'crs'
# main_var.crs = 4326
# work on this later...
lat_var.units = 'degrees_north'
lat_var.long_name = 'latitudes'
lat_var.standard_name = 'latitudes'
lat_var[:] = latitudes
long_var.units = 'degrees_east'
long_var.long_name = 'longitude'
long_var.standard_name = 'longitude'
long_var[:] = longitudes
times = dset.createVariable('time', 'u4', ('time',))
times.units = 'seconds since 1970-1-1'
times.calendar = 'gregorian'
times.long_name = 'time'
times.standard_name = 'time'
rd_datetimes = list(rrule(HOURLY, dtstart=start_date, count=num_hours))
times[:] = ncdf.date2num(rd_datetimes, units=times.units)
# dset.set_auto_mask(True)
return dset
def load_array_to_ncdf(outfile_name, data, var_name, header):
# data dimension
ny = header.ny
nx = header.nx
num_hours = get_num_hours(header)
units = get_units(header)
# Start on the first day of the first month, hour zero
start_date = datetime.datetime(header.year, 1, 1, 0)
# some fiddlyness with getting the right number of points and interval
# Generator version is exact translation, but it seems to give the same
# result as linspace
rlat = np.linspace(header.zy + header.dy, header.zy + ny * header.dy, ny,
dtype=np.float32)
rlon = np.linspace(header.zx + header.dx, header.zx + nx * header.dx, nx,
dtype=np.float32)
# longitudes can't be > 360
rlon[rlon > 360] -= 360
shape = (ny, nx, num_hours)
with ncdf.Dataset(str(outfile_name), 'w', format='NETCDF4') as dset:
print('init netCDF at: ', outfile_name)
dset = configure_dset(dset, header, rlat, rlon, shape, start_date, units, var_name)
# var_data = dset.variables[var_name]
# print('data', var_data[0, 0, 0])
print('adding data to netcdf')
dset.variables[var_name][:, :, :] = data
def header_from_folder(pp_folder, variable_code=None):
if variable_code is not None:
glob_pattern = '2*_{}_09999.pp'.format(str(variable_code).zfill(5))
else:
glob_pattern = '2*.pp'
files = sorted(pp_folder.glob(glob_pattern))
# get metadata from first file
with files[0].open("rb") as file:
header = get_pp_header(file)
return files, header
def pp_to_netcdf(pp_folder, output_folder, variable_code=None):
"""
Archive a folder full of pp files into a single netcdf file.
This is generally used to archive hourly files for a given variable for one year
into a single netCDF file for that year
:param variable_code:
:param pp_folder:
:return:
"""
if not isinstance(pp_folder, Path):
pp_folder = Path(pp_folder)
if not pp_folder.is_dir():
raise ValueError('Given path is not a directory: ', pp_folder)
print('Getting file info')
files, header = header_from_folder(pp_folder, variable_code)
var_name = get_var_name(header)
print('loading {} to netcdf'.format(var_name))
print('load data to array')
arr_out = load_files_to_array(files, header)
print('save array to netcdf')
# create .nc file with same name as folder, at same level as folder
# outfile_name = Path(folder, '..', var_name + '{}'.format(start_date.year) + '.nc')
ncdf_path = output_folder / (var_name + '-{}'.format(header.year) + '.nc')
load_array_to_ncdf(ncdf_path, arr_out, var_name, header)
print('done')
def pp_to_netcdf_lowmem(pp_folder, output_folder, variable_code=None):
"""
Archive a folder full of pp files into a single netcdf file.
This is generally used to archive hourly files for a given variable for one year
into a single netCDF file for that year.
This alternative implementation uses much less memory by directly writing pp files into
the netCDF, instead of loading them all into memory then writing in bulk.
However this is much slower - so as long as you have plenty of RAM (about 20GB
uncompressed, 12GB for systems with compressed RAM such as OSX) don't use this.
:param variable_code:
:param pp_folder:
:return:
"""
if not isinstance(pp_folder, Path):
pp_folder = Path(pp_folder)
if not pp_folder.is_dir():
raise ValueError('Given path is not a directory: ', pp_folder)
print('Getting file info')
files, header = header_from_folder(pp_folder, variable_code)
var_name = get_var_name(header)
print('loading {} to netcdf'.format(var_name))
print('load data to array')
print('save array to netcdf')
# create .nc file with same name as folder, at same level as folder
# outfile_name = Path(folder, '..', var_name + '{}'.format(start_date.year) + '.nc')
ncdf_path = output_folder / (var_name + '-{}'.format(header.year) + '.nc')
print('done')
# data dimension
ny = header.ny
nx = header.nx
num_hours = get_num_hours(header)
units = get_units(header)
# Start on the first day of the first month, hour zero
start_date = datetime.datetime(header.year, 1, 1, 0)
# some fiddlyness with getting the right number of points and interval
# Generator version is exact translation, but it seems to give the same
# result as linspace
rlat = np.linspace(header.zy + header.dy, header.zy + ny * header.dy, ny,
dtype=np.float32)
rlon = np.linspace(header.zx + header.dx, header.zx + nx * header.dx, nx,
dtype=np.float32)
# longitudes can't be > 360
rlon[rlon > 360] -= 360
with ncdf.Dataset(str(ncdf_path), 'w', format='NETCDF4') as dset:
print('init netCDF at: ', ncdf_path)
dset = configure_dset(dset, header, rlat, rlon, num_hours, start_date, units, var_name)
print('adding data to netcdf')
for file_path in files:
d_idx, arr = load_single(file_path, header.year)
if d_idx is not None:
dset.variables[var_name][:, :, d_idx] = arr
def archive_pp_to_netcdf(archive_path: Path, output_folder, variable_codes=None, extract=True):
"""
Unfortunately, they lobbed both radiation vars together, a bit annoying...
:param archive_path:
:param output_folder:
:param variable_codes: special case var for when archive contains
more than one variable
:return:
"""
# TODO: Need to change this for Legion
with tempfile.TemporaryDirectory() as extract_loc:
if extract:
print('unpacking to: ', extract_loc)
shutil.unpack_archive(str(archive_path), extract_loc)
else:
# ignore the temp file and assume the files are in the folder already
print('using existing files in: ', str(archive_path))
extract_loc = archive_path
extract_loc = Path(extract_loc)
# Some archives contain their data 'flat' (i.e. not in a top-level directory)
# while others do have a top-level. Need to work around this.
extracted_files = list(extract_loc.glob('*.pp'))
if len(extracted_files) > 0:
# looks like extract loc now contains the pp files, no additional top-level dir
pp_folder = extract_loc
else:
# NOTE: if extract is true, should not get here
# Get the folder name from the archive, to protect against the case where
# there is other junk in the archive extract temp location.
top_folder = None
with tarfile.open(str(archive_path)) as file:
for finfo in file:
# get the first file name that doesn't start with a dot
if not finfo.name.startswith('.'):
top_folder = finfo.name
break
extracted_files = list((extract_loc / top_folder).glob('*.pp'))
if len(extracted_files) > 0:
pp_folder = extract_loc / top_folder
else:
pp_folder = extract_loc
print('extracted to folder: ', pp_folder)
print('converting pp to netcdf')
# deal with special case that we get several codes from
if variable_codes is not None:
# must be a list
for variable_code in variable_codes:
pp_to_netcdf(pp_folder, output_folder, variable_code=variable_code)
# pp_to_netcdf_lowmem(pp_folder, output_folder, variable_code=variable_code)
else:
pp_to_netcdf(pp_folder, output_folder)
# pp_to_netcdf_lowmem(pp_folder, output_folder)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Archive metoffice .tar.gz archives of .pp files to netcdf format')
parser.add_argument('file_path', type=str,
help='path of file to resample')
parser.add_argument('output_path', type=str,
help='output file path')
parser.add_argument('--noextract', action="store_false", default=True)
args = parser.parse_args()
filepath = Path(args.file_path)
output_path = Path(args.output_path)
do_extract = args.noextract
print("Processing Archive:")
print(str(filepath))
print("Saving to folder:")
print(str(output_path))
print("Extracting?")
print(do_extract)
archive_pp_to_netcdf(filepath, output_path, extract=do_extract)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment