Last active
December 4, 2024 13:51
-
-
Save barronh/05c78cca220142c395e7c9efe602bc64 to your computer and use it in GitHub Desktop.
CMAQ AMFs for TEMPO
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__version__ = '0.3.1' | |
__doc__ = """ | |
CMAQ AMF calculator | |
=================== | |
--- | |
author: Barron H. Henderson | |
last updated: 2024-12-03 | |
--- | |
Overview | |
-------- | |
Designed to apply CMAQ-derived AMFs to TEMPO based on the trace gas user | |
manual v1.1. Data quality flags are not applied within the code, but should | |
be applied to the TEMPO file used as input. | |
Prerequisites | |
------------- | |
* python3 | |
* numpy | |
* pandas | |
* netcdf4 | |
* xarray | |
* pyrsig | |
* s3fs | |
* scipy | |
Example Application | |
------------------- | |
The code has be tested against the Expedited Modeling of Burn Events Results | |
(EMBER) dataset for 2023-09-01. The code uses s3fs | |
import tempocmaq | |
import matplotlib.pyplot as plt | |
proot = '/work/MOD3DEV/jkumm/EMBER/CMAQ/36US3' | |
m3tmpl = f'{proot}/input/mcip/metcro3d_%Y%m%d.ncf' | |
m2tmpl = f'{proot}/input/mcip/metcro2d_%Y%m%d.ncf' | |
croot = f'{proot}/output/2023fires.v2/noASSIM_full' | |
ctmpl = f'{croot}/CMAQv54_cb6r5_ae7_aq.36US3.35.noASSIM.CONC.%Y%j' | |
cmaq = tempocmaq.cmaq(ctmpl, m2tmpl, m3tmpl, exdate='2023-09-01') | |
# requires .netrc with URS credentials | |
asdc = tempocmaq.asdc_s3_access() | |
l2paths = asdc.ls('asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/2023.09.01') | |
gpaths = [k for k in l2paths if k.endswith('G02.nc') and 'T19' in k] | |
keep = [ | |
'vertical_column_troposphere', 'amf_troposphere', 'surface_pressure', | |
'tropopause_pressure', 'scattering_weights', 'gas_profile' | |
] | |
gpath = gpaths[0] | |
print(gpath) | |
tf = asdc.open_tempo(gpath) | |
qacheck = ( | |
(tf['eff_cloud_fraction'] < 0.2) | |
& (tf['solar_zenith_angle'] < 70) | |
) | |
tf = tf[keep].where(qacheck) | |
amfds = cmaq.get_amf(tf, withprofiles=True) | |
amfds['vertical_column_troposphere_orig'] = ( | |
tf['vertical_column_troposphere'] | |
) | |
amfds['amf_troposphere_orig'] = tf['amf_troposphere'] | |
amfds['vertical_column_troposphere'] = ( | |
tf['vertical_column_troposphere'] * tf['amf_troposphere'] | |
/ amfds['amf_troposphere'] | |
) | |
fig, ax = plt.subplots() | |
x = amfds['vertical_column_troposphere_orig'] | |
y = amfds['vertical_column_troposphere'] | |
xval = float(max(x.max(), y.max())) | |
opts = dict(mincnt=1, cmap='nipy_spectral', linewidth=0.1) | |
q = ax.hexbin(x, y, extent=(0, xval, 0, xval), **opts) | |
ax.axline((0, 0), slope=2, color='r', ls='--') | |
ax.axline((0, 0), slope=1.5, color='r', ls=':') | |
ax.axline((0, 0), slope=1/1.5, color='r', ls=':') | |
ax.axline((0, 0), slope=0.5, color='r', ls='--') | |
fig.colorbar(q, label='count') | |
fig.savefig('example.png') | |
desckeys = [ | |
'vertical_column_troposphere_orig', 'vertical_column_troposphere' | |
] | |
print(amfds[desckeys].to_dataframe()[desckeys].describe().to_markdown()) | |
# | | vert_column_troposphere_orig | vert_column_troposphere | | |
# |:------|-------------------------------:|--------------------------:| | |
# | count | 138037 | 137894 | | |
# | mean | 5.14357e+14 | 4.59546e+14 | | |
# | std | 6.62414e+14 | 5.71615e+14 | | |
# | min | -7.11184e+15 | -4.87431e+15 | | |
# | 25% | 1.06325e+14 | 9.7405e+13 | | |
# | 50% | 4.91573e+14 | 4.45004e+14 | | |
# | 75% | 8.9656e+14 | 8.0622e+14 | | |
# | max | 1.96225e+16 | 8.81033e+15 | | |
""" | |
class asdc_s3_access: | |
def __init__(self): | |
import pandas as pd | |
self._fs = None | |
self._expiration = pd.to_datetime('1970-01-01 00:00:00+00:00') | |
def get_cred(self): | |
import requests | |
# Get credentials using ~/.netrc | |
print('Getting AWS credentials', end='..', flush=True) | |
credsr = requests.get( | |
'https://data.asdc.earthdata.nasa.gov/s3credentials' | |
) | |
credsr.raise_for_status() | |
try: | |
creds = credsr.json() | |
except Exception: | |
print(credsr.text) | |
print('.done', flush=True) | |
return creds | |
def ls(self, path): | |
fs = self.get_fs() | |
return fs.ls(path) | |
def get_fs(self, refresh=False): | |
""" | |
Assumes credentials are in ~/.netrc | |
Returns | |
------- | |
fs : s3fs.FileSystem | |
""" | |
import s3fs | |
import pandas as pd | |
now = pd.to_datetime('now', utc=True) | |
dt = (now - self._expiration) | |
mindt = pd.to_timedelta('-2min') | |
expired = dt > mindt | |
refresh = refresh or expired | |
if self._fs is None or refresh: | |
# Build s3fs filesystem (fs) object with credentials for remote | |
# file operations | |
creds = self.get_cred() | |
self._expiration = pd.to_datetime(creds['expiration']) | |
fs = self._fs = s3fs.S3FileSystem( | |
key=creds['accessKeyId'], secret=creds['secretAccessKey'], | |
anon=False, token=creds['sessionToken'] | |
) | |
else: | |
fs = self._fs | |
return fs | |
def open_tempo(self, path): | |
import os | |
import xarray as xr | |
import botocore.exceptions | |
# If not available, download it | |
if not os.path.exists(path): | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
try: | |
fs = self.get_fs() | |
fs.download(path, path) | |
except botocore.exceptions.ClientError: | |
# If fails to read, then assume credentials timed out and | |
# force a refresh of the fs | |
fs = self.get_fs(refresh=True) | |
fs.download(path, path) | |
sf = xr.open_dataset(path, group='support_data') | |
gf = xr.open_dataset(path, group='geolocation') | |
rf = xr.open_dataset(path, group='/') | |
pf = xr.open_dataset(path, group='product') | |
return xr.merge([rf, pf, gf, sf]) | |
class cmaq: | |
def __init__(self, ctmpl, m2tmpl, m3tmpl, spc='NO2', exdate=None): | |
""" | |
CMAQ object that provides functions to calculate the hybrid coordinate | |
system (get_hyb), uses that system to interpolate CMAQ to the TEMPO | |
prior (get_profiles), and uses the interpolated data to calculate an | |
alternative AMF (get_amf). | |
Arguments | |
--------- | |
ctmpl : str | |
Path strftime template for 3D IOAPI file CONC file with VGLVLS | |
defined as the relative pressure component of the hybrid coordinate | |
without accounting for the fractional absolute pressure. CONC must | |
have spc as a variable. | |
m2tmpl : str | |
Path strftime template for 2D IOAPI file METCRO2D file with surface | |
pressure (PRSFC) variable in Pascals | |
m3tmpl : str | |
Path strftime template for 3D IOAPI file METCRO3D file with dry | |
density (DENS kg/m3), pressure (PRES Pa), temperature (TA K), | |
layer full height (ZF m) variables. | |
spc : str | |
Species name in CMAQ (NO2 or FORM) | |
exdate : str | |
Date to use for example data (ie. initializing the hybrid | |
coordinate) | |
""" | |
# rearrangement of equation 1 in | |
# https://doi.org/10.1175/MWR-D-18-0334.1 | |
import pandas as pd | |
import glob | |
self._ctmpl = ctmpl | |
self._m2tmpl = m2tmpl | |
self._m3tmpl = m3tmpl | |
if exdate is not None: | |
qpath = pd.to_datetime(exdate).strftime(ctmpl) | |
else: | |
qpat = ctmpl.replace('%Y', '*').replace('%y', '*') | |
qpat = qpat.replace('%m', '*').replace('%d', '*') | |
qpat = qpat.replace('%j', '*') | |
qpat = qpat.replace('**', '*').replace('**', '*') | |
qpat = qpat.replace('**', '*').replace('**', '*') | |
qpat = qpat.replace('**', '*').replace('**', '*') | |
qpaths = sorted(glob.glob(qpat)) | |
if len(qpaths) == 0: | |
print(qpat) | |
raise ValueError('Could not find example file; supply exdate') | |
qpath = qpaths[0] | |
print('Example file', qpath) | |
self._hf = self.get_hyb(qpath) | |
self._spc = spc | |
self._exdate = exdate | |
def get_hyb(self, qpath): | |
""" | |
Arguments | |
--------- | |
qpath : str | |
Path to CMAQ or MCIP 3D file | |
Returns | |
------- | |
hf : xarray.Dataset | |
Dataset with hybi, hyai, hybm, and hyam | |
""" | |
import numpy as np | |
import pandas as pd | |
import pyrsig | |
import xarray as xr | |
qf = pyrsig.open_ioapi(qpath) | |
pt = qf.VGTOP | |
hi = qf.VGLVLS.astype('d') # at interfaces | |
r = 0.2 | |
p0 = 1e5 | |
hm = (hi[1:] + hi[:-1]) / 2 # mid-levels | |
c1 = 2. * r**2 / (1 - r)**3 | |
c2 = -r * (4. + r + r**2) / (1 - r)**3 | |
c3 = 2 * (1. + r + r**2) / (1 - r)**3 | |
c4 = -(1 + r) / (1 - r)**3 | |
dfs = {} | |
for key, h in [('ilev', hi), ('lev', hm)]: | |
B = c1 + c2 * h + c3 * h**2 + c4 * h**3 | |
B = np.where(h >= 1, 1, B) | |
B = np.where(h <= r, 0, B) | |
A = (h - B) * (p0 - pt) + pt - B * pt | |
df = pd.DataFrame({key: h, 'A': A, 'B': B}).set_index(key) | |
dfs[key] = df.copy() | |
cq_hyai = dfs['ilev']['A'].to_xarray() | |
cq_hyai.attrs.update( | |
long_name='absolute pressure component of interface', | |
units='Pa', description='pedge =ps * hybi + hyai' | |
) | |
cq_hybi = dfs['ilev']['B'].to_xarray() | |
cq_hybi.attrs.update( | |
long_name='relative surface pressure component of interface', | |
units='Pa', description='pedge =ps * hybi + hyai' | |
) | |
cq_hyam = dfs['lev']['A'].to_xarray() | |
cq_hyam.attrs.update( | |
long_name='absolute pressure component of mid-level', | |
units='Pa', description='pmid = ps * hybm + hyam' | |
) | |
cq_hybm = dfs['lev']['B'].to_xarray() | |
cq_hybm.attrs.update( | |
long_name='relative surface pressure component of mid-level', | |
units='Pa', description='pmid = ps * hybm + hyam' | |
) | |
hf = xr.Dataset() | |
hf.attrs.update(description='P = ps * B + A [=] Pa') | |
hf['hyai'] = cq_hyai | |
hf['hybi'] = cq_hybi | |
hf['hyam'] = cq_hyam | |
hf['hybm'] = cq_hybm | |
return hf | |
def get_profiles(self, tf, cfstrat=False): | |
""" | |
Calculate the partial column of NO2 | |
Arguments | |
--------- | |
tf : xarray.Dataset | |
TEMPO L2 file | |
cfstrat : bool | |
If cfstrat is True, then overwrite the partial column | |
from CMAQ with the GEOS-CF prior partial column above | |
the tropopause pressure. | |
Returns | |
------- | |
outf : xarray.Dataset | |
has CMAQ gas_profile and MCIP temperature | |
""" | |
import xarray as xr | |
import pyrsig | |
# replace with Avogadro = 6.02214076e+23 to remove scipy dependency | |
from scipy.constants import Avogadro | |
import pyproj | |
import pandas as pd | |
import numpy as np | |
spc = self._spc | |
MWAIR = 28.9628e-3 # Based on CMAQ CONST.EXT | |
date = pd.to_datetime(tf['time'][0].values) | |
sh = date.hour | |
cpath = date.strftime(self._ctmpl) | |
m2path = date.strftime(self._m2tmpl) | |
m3path = date.strftime(self._m3tmpl) | |
cf = pyrsig.open_ioapi(cpath).isel(TSTEP=sh, drop=True) | |
# conceptually, PEDGE and CVCD and PRES and TA | |
# could be subset for just the pixels that will be used. | |
# that should dramatically speed things up. | |
proj = pyproj.Proj(cf.crs_proj4) | |
tx, ty = proj(tf['longitude'], tf['latitude']) | |
tx = xr.DataArray(tx, dims=tf['longitude'].dims) | |
ty = xr.DataArray(ty, dims=tf['latitude'].dims) | |
tx = tx.where((tx > 0) & (tx < cf.NCOLS)) | |
ty = ty.where((ty > 0) & (ty < cf.NROWS)) | |
if tx.isnull().all() or ty.isnull().all(): | |
raise ValueError('all TEMPO pixels outside domain') | |
cslice = slice(float(tx.min()) - 2, float(tx.max()) + 2) # slicing overly wide | |
rslice = slice(float(ty.min()) - 2, float(ty.max()) + 2) # slicing overly wide | |
# open the rest of teh files | |
m2f = pyrsig.open_ioapi(m2path).isel(TSTEP=sh, drop=True) | |
m3f = pyrsig.open_ioapi(m3path).isel(TSTEP=sh, drop=True) | |
DZ = m3f['ZF'].copy() | |
DZ[1:] = m3f['ZF'][1:].values - m3f['ZF'][:-1].values | |
# transform x [ppm] to PVCD [molec/cm2] | |
# micromol_i/mol_a * kg_a/m3 * m | |
# * (mol_a/kg_a * 1e-6 * m2/cm2 * #/mol) | |
PVCD = ( | |
cf[spc] * m3f['DENS'] * DZ | |
* (1 / MWAIR * 1e-6 * 1e-4 * Avogadro) | |
) | |
PVCD.attrs['long_name'] = 'partical vertical column density' | |
PVCD.attrs['units'] = 'molecules/cm2' | |
nl = cf.sizes['LAY'] | |
nr = cf.sizes['ROW'] | |
nc = cf.sizes['COL'] | |
CVCD = xr.DataArray( | |
np.zeros((nl + 1, nr, nc), dtype='f'), | |
dims=('ilev', 'ROW', 'COL'), | |
) | |
CVCD[1:] = PVCD.cumsum('LAY').values | |
CVCD.coords['ROW'] = PVCD.ROW | |
CVCD.coords['COL'] = PVCD.COL | |
CVCD.attrs['long_name'] = 'cumulative vertical column density' | |
CVCD.attrs['units'] = 'molecules/cm2' | |
PEDGE = ( | |
m2f['PRSFC'].isel(LAY=0, drop=True) | |
* self._hf['hybi'] + self._hf['hyai'] | |
).transpose('ilev', 'ROW', 'COL') | |
PEDGE.attrs['long_name'] = 'interface pressure' | |
PEDGE.attrs['units'] = 'Pa' | |
gc_ps = tf['surface_pressure'] # hPa | |
gc_hyai = xr.DataArray(gc_ps.Eta_A, dims=('ilev',)) | |
gc_hybi = xr.DataArray(gc_ps.Eta_B, dims=('ilev',)) | |
gc_hyam = xr.DataArray( | |
(gc_ps.Eta_A[1:] + gc_ps.Eta_A[:-1]) / 2, | |
dims=('swt_level',) | |
) | |
gc_hybm = xr.DataArray( | |
(gc_ps.Eta_B[1:] + gc_ps.Eta_B[:-1]) / 2, | |
dims=('swt_level',) | |
) | |
ydum = np.ones(gc_hybm.size, dtype='f') * np.nan | |
def xint(xp, fp): | |
if np.isnan(fp).all(): | |
return ydum | |
x = (xp[0] * gc_hybi + gc_hyai * 100) | |
return np.diff(np.interp(x, xp[::-1], fp[::-1])) | |
cq_hyam_0 = self._hf['hyam'][0] | |
cq_hybm_0 = self._hf['hybm'][0] | |
def tint(ps, xp, fp): | |
if np.isnan(fp).all(): | |
return ydum | |
# ps = (xp[0] - cq_hyam_0) / cq_hybm_0 | |
x = (ps * gc_hybm + gc_hyam * 100) | |
return np.interp(x, xp[::-1], fp[::-1]) | |
xp = PEDGE.sel(COL=cslice, ROW=rslice) | |
fp = CVCD.sel(COL=cslice, ROW=rslice) | |
pvcd = xr.apply_ufunc( | |
xint, xp, fp, | |
input_core_dims=[['ilev'], ['ilev']], | |
output_core_dims=[['swt_level']], | |
exclude_dims=set(('ilev',)), | |
vectorize=True, | |
) | |
prsfc = PEDGE.max('ilev').sel(COL=cslice, ROW=rslice) | |
xp = m3f.PRES.sel(COL=cslice, ROW=rslice) | |
fp = m3f.TA.sel(COL=cslice, ROW=rslice) | |
ta = xr.apply_ufunc( | |
tint, prsfc, xp, fp, | |
input_core_dims=[[], ['LAY'], ['LAY']], | |
output_core_dims=[['swt_level']], | |
exclude_dims=set(('LAY',)), | |
vectorize=True, | |
) | |
tpvcd = pvcd.sel(ROW=ty, COL=tx, method='nearest', drop=True).where( | |
~(tx.isnull() | ty.isnull()) | |
) | |
if cfstrat: | |
gc_pmid = (gc_hybm * gc_ps + gc_hyam) | |
tpvcd = tpvcd.where( | |
gc_pmid > tf['tropopause_pressure'], | |
tf['gas_profile'] | |
) | |
try: | |
tpvcd.attrs.update(tf['gas_profile'].attrs) | |
except Exception: | |
tpvcd.attrs.update({ | |
'long_name': f'vertical profile of {spc} partial column', | |
'units': 'molecules/cm^2', | |
'valid_min': 0.0 | |
}) | |
tta = ta.sel(ROW=ty, COL=tx, method='nearest', drop=True).where( | |
~(tx.isnull() | ty.isnull()) | |
) | |
tta.attrs.update({ | |
'long_name': 'vertical profile of temperature from METCRO3D', | |
'units': 'K', | |
'valid_min': 0.0 | |
}) | |
outf = xr.Dataset() | |
outf['gas_profile'] = tpvcd | |
outf['temperature'] = tta | |
cf.close() | |
m2f.close() | |
m3f.close() | |
del cf, m2f, m3f | |
return outf | |
def get_amf( | |
self, tf, trop=None, total=None, strat=None, | |
cfstrat=False, withprofiles=False | |
): | |
""" | |
Arguments | |
--------- | |
tf : xarray.Dataset | |
TEMPO L2 file | |
trop : bool | |
If True, calculate the tropospheric amf | |
default for NO2 is True and FORM is False | |
total : bool | |
If True, calculate the total amf | |
default for NO2 is False and FORM is True | |
strat : bool | |
If True, calculate the stratospheric amf | |
default for NO2 is False and FORM is False | |
cfstrat : bool | |
If True, replace CMAQ partial column data with the GEOS-CF prior | |
above the tropopause. | |
withprofiles : bool | |
If True, add profiles to output | |
Returns | |
------- | |
amf : xarray.Dataset | |
AMF calculated based on CMAQ gas_profile and MCIP temperature. | |
""" | |
import xarray as xr | |
import pandas as pd | |
spc = self._spc | |
if trop is None: | |
trop = {'NO2': True, 'FORM': False}.get(spc, False) | |
if total is None: | |
total = {'NO2': False, 'FORM': True}.get(spc, False) | |
if strat is None: | |
strat = {'NO2': False, 'FORM': False}.get(spc, False) | |
# Get the CMAQ partial column profile and temperature file | |
cprof = self.get_profiles(tf, cfstrat=cfstrat) | |
# TEMPO L2 Trace Gas and Cloud Level 2 and 3 Data Products: User Guide | |
# Version 1.1 See pg 19 | |
a = 0.00316 | |
b = 3.39e-6 | |
T0 = 220. | |
dT = cprof['temperature'] - T0 | |
alpha = 1 - a * dT + b * dT**2 | |
gc_ps = tf['surface_pressure'] | |
gc_hyam = xr.DataArray( | |
(gc_ps.Eta_A[1:] + gc_ps.Eta_A[:-1]) / 2, | |
dims=('swt_level',) | |
) | |
gc_hybm = xr.DataArray( | |
(gc_ps.Eta_B[1:] + gc_ps.Eta_B[:-1]) / 2, | |
dims=('swt_level',) | |
) | |
gc_p = (gc_ps * gc_hybm + gc_hyam) | |
gc_p.attrs['units'] = 'hPa' | |
amfds = xr.Dataset() | |
now = pd.to_datetime('now', utc=True).strftime('%Y-%m-%dT%H:%M:%SZ') | |
desc = f'CMAQ dervied variables for TEMPO by tempocmaq {__version__}' | |
amfds.attrs.update( | |
description=desc, | |
created=now | |
) | |
if total: | |
cgp = cprof['gas_profile'] | |
vcd_total = cgp.sum('swt_level') | |
vcd_total.attrs.update({ | |
'long_name': f'total {spc} CMAQ vertical column', | |
'units': 'molecules/cm^2' | |
}) | |
amf_total = ( | |
cgp * tf['scattering_weights'] * alpha | |
).sum('swt_level') / vcd_total | |
try: | |
amf_total.attrs.update(tf['amf_total'].attrs) | |
except Exception: | |
amf_total.attrs.update({ | |
'long_name': f'{spc} CMAQ total air mass factor', | |
'valid_min': 0.0 | |
}) | |
amfds['amf_total'] = amf_total | |
amfds['vertical_column_total'] = vcd_total | |
if trop: | |
cgp = cprof['gas_profile'].where(gc_p > tf['tropopause_pressure']) | |
vcd_trop = cgp.sum('swt_level') | |
vcd_trop.attrs.update({ | |
'long_name': f'troposphere {spc} CMAQ vertical column', | |
'units': 'molecules/cm^2' | |
}) | |
amf_troposphere = ( | |
cgp * tf['scattering_weights'] * alpha | |
).sum('swt_level') / vcd_trop | |
try: | |
amf_troposphere.attrs.update(tf['amf_troposphere'].attrs) | |
except Exception: | |
amf_troposphere.attrs.update({ | |
'long_name': f'{spc} CMAQ tropospheric air mass factor', | |
'valid_min': 0.0 | |
}) | |
amfds['amf_troposphere'] = amf_troposphere | |
amfds['vertical_column_troposphere'] = vcd_trop | |
if strat: | |
cgp = cprof['gas_profile'].where(gc_p < tf['tropopause_pressure']) | |
vcd_strat = cgp.sum('swt_level') | |
vcd_strat.attrs.update({ | |
'long_name': f'stratosphere {spc} CMAQ vertical column', | |
'units': 'molecules/cm^2' | |
}) | |
amf_stratosphere = ( | |
cgp * tf['scattering_weights'] * alpha | |
).sum('swt_level') / vcd_strat | |
try: | |
amf_stratosphere.attrs.update(tf['amf_stratosphere'].attrs) | |
except Exception: | |
amf_stratosphere.attrs.update({ | |
'long_name': f'{spc} CMAQ stratospheric air mass factor', | |
'valid_min': 0.0 | |
}) | |
amfds['amf_stratosphere'] = amf_stratosphere | |
amfds['vertical_column_stratosphere'] = vcd_strat | |
if withprofiles: | |
amfds['temperature'] = cprof['temperature'] | |
amfds['gas_profile'] = cprof['gas_profile'] | |
return amfds |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment