Skip to content

Instantly share code, notes, and snippets.

@barronh
Last active December 4, 2024 13:51
Show Gist options
  • Save barronh/05c78cca220142c395e7c9efe602bc64 to your computer and use it in GitHub Desktop.
Save barronh/05c78cca220142c395e7c9efe602bc64 to your computer and use it in GitHub Desktop.
CMAQ AMFs for TEMPO
__version__ = '0.3.1'
__doc__ = """
CMAQ AMF calculator
===================
---
author: Barron H. Henderson
last updated: 2024-12-03
---
Overview
--------
Designed to apply CMAQ-derived AMFs to TEMPO based on the trace gas user
manual v1.1. Data quality flags are not applied within the code, but should
be applied to the TEMPO file used as input.
Prerequisites
-------------
* python3
* numpy
* pandas
* netcdf4
* xarray
* pyrsig
* s3fs
* scipy
Example Application
-------------------
The code has be tested against the Expedited Modeling of Burn Events Results
(EMBER) dataset for 2023-09-01. The code uses s3fs
import tempocmaq
import matplotlib.pyplot as plt
proot = '/work/MOD3DEV/jkumm/EMBER/CMAQ/36US3'
m3tmpl = f'{proot}/input/mcip/metcro3d_%Y%m%d.ncf'
m2tmpl = f'{proot}/input/mcip/metcro2d_%Y%m%d.ncf'
croot = f'{proot}/output/2023fires.v2/noASSIM_full'
ctmpl = f'{croot}/CMAQv54_cb6r5_ae7_aq.36US3.35.noASSIM.CONC.%Y%j'
cmaq = tempocmaq.cmaq(ctmpl, m2tmpl, m3tmpl, exdate='2023-09-01')
# requires .netrc with URS credentials
asdc = tempocmaq.asdc_s3_access()
l2paths = asdc.ls('asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/2023.09.01')
gpaths = [k for k in l2paths if k.endswith('G02.nc') and 'T19' in k]
keep = [
'vertical_column_troposphere', 'amf_troposphere', 'surface_pressure',
'tropopause_pressure', 'scattering_weights', 'gas_profile'
]
gpath = gpaths[0]
print(gpath)
tf = asdc.open_tempo(gpath)
qacheck = (
(tf['eff_cloud_fraction'] < 0.2)
& (tf['solar_zenith_angle'] < 70)
)
tf = tf[keep].where(qacheck)
amfds = cmaq.get_amf(tf, withprofiles=True)
amfds['vertical_column_troposphere_orig'] = (
tf['vertical_column_troposphere']
)
amfds['amf_troposphere_orig'] = tf['amf_troposphere']
amfds['vertical_column_troposphere'] = (
tf['vertical_column_troposphere'] * tf['amf_troposphere']
/ amfds['amf_troposphere']
)
fig, ax = plt.subplots()
x = amfds['vertical_column_troposphere_orig']
y = amfds['vertical_column_troposphere']
xval = float(max(x.max(), y.max()))
opts = dict(mincnt=1, cmap='nipy_spectral', linewidth=0.1)
q = ax.hexbin(x, y, extent=(0, xval, 0, xval), **opts)
ax.axline((0, 0), slope=2, color='r', ls='--')
ax.axline((0, 0), slope=1.5, color='r', ls=':')
ax.axline((0, 0), slope=1/1.5, color='r', ls=':')
ax.axline((0, 0), slope=0.5, color='r', ls='--')
fig.colorbar(q, label='count')
fig.savefig('example.png')
desckeys = [
'vertical_column_troposphere_orig', 'vertical_column_troposphere'
]
print(amfds[desckeys].to_dataframe()[desckeys].describe().to_markdown())
# | | vert_column_troposphere_orig | vert_column_troposphere |
# |:------|-------------------------------:|--------------------------:|
# | count | 138037 | 137894 |
# | mean | 5.14357e+14 | 4.59546e+14 |
# | std | 6.62414e+14 | 5.71615e+14 |
# | min | -7.11184e+15 | -4.87431e+15 |
# | 25% | 1.06325e+14 | 9.7405e+13 |
# | 50% | 4.91573e+14 | 4.45004e+14 |
# | 75% | 8.9656e+14 | 8.0622e+14 |
# | max | 1.96225e+16 | 8.81033e+15 |
"""
class asdc_s3_access:
def __init__(self):
import pandas as pd
self._fs = None
self._expiration = pd.to_datetime('1970-01-01 00:00:00+00:00')
def get_cred(self):
import requests
# Get credentials using ~/.netrc
print('Getting AWS credentials', end='..', flush=True)
credsr = requests.get(
'https://data.asdc.earthdata.nasa.gov/s3credentials'
)
credsr.raise_for_status()
try:
creds = credsr.json()
except Exception:
print(credsr.text)
print('.done', flush=True)
return creds
def ls(self, path):
fs = self.get_fs()
return fs.ls(path)
def get_fs(self, refresh=False):
"""
Assumes credentials are in ~/.netrc
Returns
-------
fs : s3fs.FileSystem
"""
import s3fs
import pandas as pd
now = pd.to_datetime('now', utc=True)
dt = (now - self._expiration)
mindt = pd.to_timedelta('-2min')
expired = dt > mindt
refresh = refresh or expired
if self._fs is None or refresh:
# Build s3fs filesystem (fs) object with credentials for remote
# file operations
creds = self.get_cred()
self._expiration = pd.to_datetime(creds['expiration'])
fs = self._fs = s3fs.S3FileSystem(
key=creds['accessKeyId'], secret=creds['secretAccessKey'],
anon=False, token=creds['sessionToken']
)
else:
fs = self._fs
return fs
def open_tempo(self, path):
import os
import xarray as xr
import botocore.exceptions
# If not available, download it
if not os.path.exists(path):
os.makedirs(os.path.dirname(path), exist_ok=True)
try:
fs = self.get_fs()
fs.download(path, path)
except botocore.exceptions.ClientError:
# If fails to read, then assume credentials timed out and
# force a refresh of the fs
fs = self.get_fs(refresh=True)
fs.download(path, path)
sf = xr.open_dataset(path, group='support_data')
gf = xr.open_dataset(path, group='geolocation')
rf = xr.open_dataset(path, group='/')
pf = xr.open_dataset(path, group='product')
return xr.merge([rf, pf, gf, sf])
class cmaq:
def __init__(self, ctmpl, m2tmpl, m3tmpl, spc='NO2', exdate=None):
"""
CMAQ object that provides functions to calculate the hybrid coordinate
system (get_hyb), uses that system to interpolate CMAQ to the TEMPO
prior (get_profiles), and uses the interpolated data to calculate an
alternative AMF (get_amf).
Arguments
---------
ctmpl : str
Path strftime template for 3D IOAPI file CONC file with VGLVLS
defined as the relative pressure component of the hybrid coordinate
without accounting for the fractional absolute pressure. CONC must
have spc as a variable.
m2tmpl : str
Path strftime template for 2D IOAPI file METCRO2D file with surface
pressure (PRSFC) variable in Pascals
m3tmpl : str
Path strftime template for 3D IOAPI file METCRO3D file with dry
density (DENS kg/m3), pressure (PRES Pa), temperature (TA K),
layer full height (ZF m) variables.
spc : str
Species name in CMAQ (NO2 or FORM)
exdate : str
Date to use for example data (ie. initializing the hybrid
coordinate)
"""
# rearrangement of equation 1 in
# https://doi.org/10.1175/MWR-D-18-0334.1
import pandas as pd
import glob
self._ctmpl = ctmpl
self._m2tmpl = m2tmpl
self._m3tmpl = m3tmpl
if exdate is not None:
qpath = pd.to_datetime(exdate).strftime(ctmpl)
else:
qpat = ctmpl.replace('%Y', '*').replace('%y', '*')
qpat = qpat.replace('%m', '*').replace('%d', '*')
qpat = qpat.replace('%j', '*')
qpat = qpat.replace('**', '*').replace('**', '*')
qpat = qpat.replace('**', '*').replace('**', '*')
qpat = qpat.replace('**', '*').replace('**', '*')
qpaths = sorted(glob.glob(qpat))
if len(qpaths) == 0:
print(qpat)
raise ValueError('Could not find example file; supply exdate')
qpath = qpaths[0]
print('Example file', qpath)
self._hf = self.get_hyb(qpath)
self._spc = spc
self._exdate = exdate
def get_hyb(self, qpath):
"""
Arguments
---------
qpath : str
Path to CMAQ or MCIP 3D file
Returns
-------
hf : xarray.Dataset
Dataset with hybi, hyai, hybm, and hyam
"""
import numpy as np
import pandas as pd
import pyrsig
import xarray as xr
qf = pyrsig.open_ioapi(qpath)
pt = qf.VGTOP
hi = qf.VGLVLS.astype('d') # at interfaces
r = 0.2
p0 = 1e5
hm = (hi[1:] + hi[:-1]) / 2 # mid-levels
c1 = 2. * r**2 / (1 - r)**3
c2 = -r * (4. + r + r**2) / (1 - r)**3
c3 = 2 * (1. + r + r**2) / (1 - r)**3
c4 = -(1 + r) / (1 - r)**3
dfs = {}
for key, h in [('ilev', hi), ('lev', hm)]:
B = c1 + c2 * h + c3 * h**2 + c4 * h**3
B = np.where(h >= 1, 1, B)
B = np.where(h <= r, 0, B)
A = (h - B) * (p0 - pt) + pt - B * pt
df = pd.DataFrame({key: h, 'A': A, 'B': B}).set_index(key)
dfs[key] = df.copy()
cq_hyai = dfs['ilev']['A'].to_xarray()
cq_hyai.attrs.update(
long_name='absolute pressure component of interface',
units='Pa', description='pedge =ps * hybi + hyai'
)
cq_hybi = dfs['ilev']['B'].to_xarray()
cq_hybi.attrs.update(
long_name='relative surface pressure component of interface',
units='Pa', description='pedge =ps * hybi + hyai'
)
cq_hyam = dfs['lev']['A'].to_xarray()
cq_hyam.attrs.update(
long_name='absolute pressure component of mid-level',
units='Pa', description='pmid = ps * hybm + hyam'
)
cq_hybm = dfs['lev']['B'].to_xarray()
cq_hybm.attrs.update(
long_name='relative surface pressure component of mid-level',
units='Pa', description='pmid = ps * hybm + hyam'
)
hf = xr.Dataset()
hf.attrs.update(description='P = ps * B + A [=] Pa')
hf['hyai'] = cq_hyai
hf['hybi'] = cq_hybi
hf['hyam'] = cq_hyam
hf['hybm'] = cq_hybm
return hf
def get_profiles(self, tf, cfstrat=False):
"""
Calculate the partial column of NO2
Arguments
---------
tf : xarray.Dataset
TEMPO L2 file
cfstrat : bool
If cfstrat is True, then overwrite the partial column
from CMAQ with the GEOS-CF prior partial column above
the tropopause pressure.
Returns
-------
outf : xarray.Dataset
has CMAQ gas_profile and MCIP temperature
"""
import xarray as xr
import pyrsig
# replace with Avogadro = 6.02214076e+23 to remove scipy dependency
from scipy.constants import Avogadro
import pyproj
import pandas as pd
import numpy as np
spc = self._spc
MWAIR = 28.9628e-3 # Based on CMAQ CONST.EXT
date = pd.to_datetime(tf['time'][0].values)
sh = date.hour
cpath = date.strftime(self._ctmpl)
m2path = date.strftime(self._m2tmpl)
m3path = date.strftime(self._m3tmpl)
cf = pyrsig.open_ioapi(cpath).isel(TSTEP=sh, drop=True)
# conceptually, PEDGE and CVCD and PRES and TA
# could be subset for just the pixels that will be used.
# that should dramatically speed things up.
proj = pyproj.Proj(cf.crs_proj4)
tx, ty = proj(tf['longitude'], tf['latitude'])
tx = xr.DataArray(tx, dims=tf['longitude'].dims)
ty = xr.DataArray(ty, dims=tf['latitude'].dims)
tx = tx.where((tx > 0) & (tx < cf.NCOLS))
ty = ty.where((ty > 0) & (ty < cf.NROWS))
if tx.isnull().all() or ty.isnull().all():
raise ValueError('all TEMPO pixels outside domain')
cslice = slice(float(tx.min()) - 2, float(tx.max()) + 2) # slicing overly wide
rslice = slice(float(ty.min()) - 2, float(ty.max()) + 2) # slicing overly wide
# open the rest of teh files
m2f = pyrsig.open_ioapi(m2path).isel(TSTEP=sh, drop=True)
m3f = pyrsig.open_ioapi(m3path).isel(TSTEP=sh, drop=True)
DZ = m3f['ZF'].copy()
DZ[1:] = m3f['ZF'][1:].values - m3f['ZF'][:-1].values
# transform x [ppm] to PVCD [molec/cm2]
# micromol_i/mol_a * kg_a/m3 * m
# * (mol_a/kg_a * 1e-6 * m2/cm2 * #/mol)
PVCD = (
cf[spc] * m3f['DENS'] * DZ
* (1 / MWAIR * 1e-6 * 1e-4 * Avogadro)
)
PVCD.attrs['long_name'] = 'partical vertical column density'
PVCD.attrs['units'] = 'molecules/cm2'
nl = cf.sizes['LAY']
nr = cf.sizes['ROW']
nc = cf.sizes['COL']
CVCD = xr.DataArray(
np.zeros((nl + 1, nr, nc), dtype='f'),
dims=('ilev', 'ROW', 'COL'),
)
CVCD[1:] = PVCD.cumsum('LAY').values
CVCD.coords['ROW'] = PVCD.ROW
CVCD.coords['COL'] = PVCD.COL
CVCD.attrs['long_name'] = 'cumulative vertical column density'
CVCD.attrs['units'] = 'molecules/cm2'
PEDGE = (
m2f['PRSFC'].isel(LAY=0, drop=True)
* self._hf['hybi'] + self._hf['hyai']
).transpose('ilev', 'ROW', 'COL')
PEDGE.attrs['long_name'] = 'interface pressure'
PEDGE.attrs['units'] = 'Pa'
gc_ps = tf['surface_pressure'] # hPa
gc_hyai = xr.DataArray(gc_ps.Eta_A, dims=('ilev',))
gc_hybi = xr.DataArray(gc_ps.Eta_B, dims=('ilev',))
gc_hyam = xr.DataArray(
(gc_ps.Eta_A[1:] + gc_ps.Eta_A[:-1]) / 2,
dims=('swt_level',)
)
gc_hybm = xr.DataArray(
(gc_ps.Eta_B[1:] + gc_ps.Eta_B[:-1]) / 2,
dims=('swt_level',)
)
ydum = np.ones(gc_hybm.size, dtype='f') * np.nan
def xint(xp, fp):
if np.isnan(fp).all():
return ydum
x = (xp[0] * gc_hybi + gc_hyai * 100)
return np.diff(np.interp(x, xp[::-1], fp[::-1]))
cq_hyam_0 = self._hf['hyam'][0]
cq_hybm_0 = self._hf['hybm'][0]
def tint(ps, xp, fp):
if np.isnan(fp).all():
return ydum
# ps = (xp[0] - cq_hyam_0) / cq_hybm_0
x = (ps * gc_hybm + gc_hyam * 100)
return np.interp(x, xp[::-1], fp[::-1])
xp = PEDGE.sel(COL=cslice, ROW=rslice)
fp = CVCD.sel(COL=cslice, ROW=rslice)
pvcd = xr.apply_ufunc(
xint, xp, fp,
input_core_dims=[['ilev'], ['ilev']],
output_core_dims=[['swt_level']],
exclude_dims=set(('ilev',)),
vectorize=True,
)
prsfc = PEDGE.max('ilev').sel(COL=cslice, ROW=rslice)
xp = m3f.PRES.sel(COL=cslice, ROW=rslice)
fp = m3f.TA.sel(COL=cslice, ROW=rslice)
ta = xr.apply_ufunc(
tint, prsfc, xp, fp,
input_core_dims=[[], ['LAY'], ['LAY']],
output_core_dims=[['swt_level']],
exclude_dims=set(('LAY',)),
vectorize=True,
)
tpvcd = pvcd.sel(ROW=ty, COL=tx, method='nearest', drop=True).where(
~(tx.isnull() | ty.isnull())
)
if cfstrat:
gc_pmid = (gc_hybm * gc_ps + gc_hyam)
tpvcd = tpvcd.where(
gc_pmid > tf['tropopause_pressure'],
tf['gas_profile']
)
try:
tpvcd.attrs.update(tf['gas_profile'].attrs)
except Exception:
tpvcd.attrs.update({
'long_name': f'vertical profile of {spc} partial column',
'units': 'molecules/cm^2',
'valid_min': 0.0
})
tta = ta.sel(ROW=ty, COL=tx, method='nearest', drop=True).where(
~(tx.isnull() | ty.isnull())
)
tta.attrs.update({
'long_name': 'vertical profile of temperature from METCRO3D',
'units': 'K',
'valid_min': 0.0
})
outf = xr.Dataset()
outf['gas_profile'] = tpvcd
outf['temperature'] = tta
cf.close()
m2f.close()
m3f.close()
del cf, m2f, m3f
return outf
def get_amf(
self, tf, trop=None, total=None, strat=None,
cfstrat=False, withprofiles=False
):
"""
Arguments
---------
tf : xarray.Dataset
TEMPO L2 file
trop : bool
If True, calculate the tropospheric amf
default for NO2 is True and FORM is False
total : bool
If True, calculate the total amf
default for NO2 is False and FORM is True
strat : bool
If True, calculate the stratospheric amf
default for NO2 is False and FORM is False
cfstrat : bool
If True, replace CMAQ partial column data with the GEOS-CF prior
above the tropopause.
withprofiles : bool
If True, add profiles to output
Returns
-------
amf : xarray.Dataset
AMF calculated based on CMAQ gas_profile and MCIP temperature.
"""
import xarray as xr
import pandas as pd
spc = self._spc
if trop is None:
trop = {'NO2': True, 'FORM': False}.get(spc, False)
if total is None:
total = {'NO2': False, 'FORM': True}.get(spc, False)
if strat is None:
strat = {'NO2': False, 'FORM': False}.get(spc, False)
# Get the CMAQ partial column profile and temperature file
cprof = self.get_profiles(tf, cfstrat=cfstrat)
# TEMPO L2 Trace Gas and Cloud Level 2 and 3 Data Products: User Guide
# Version 1.1 See pg 19
a = 0.00316
b = 3.39e-6
T0 = 220.
dT = cprof['temperature'] - T0
alpha = 1 - a * dT + b * dT**2
gc_ps = tf['surface_pressure']
gc_hyam = xr.DataArray(
(gc_ps.Eta_A[1:] + gc_ps.Eta_A[:-1]) / 2,
dims=('swt_level',)
)
gc_hybm = xr.DataArray(
(gc_ps.Eta_B[1:] + gc_ps.Eta_B[:-1]) / 2,
dims=('swt_level',)
)
gc_p = (gc_ps * gc_hybm + gc_hyam)
gc_p.attrs['units'] = 'hPa'
amfds = xr.Dataset()
now = pd.to_datetime('now', utc=True).strftime('%Y-%m-%dT%H:%M:%SZ')
desc = f'CMAQ dervied variables for TEMPO by tempocmaq {__version__}'
amfds.attrs.update(
description=desc,
created=now
)
if total:
cgp = cprof['gas_profile']
vcd_total = cgp.sum('swt_level')
vcd_total.attrs.update({
'long_name': f'total {spc} CMAQ vertical column',
'units': 'molecules/cm^2'
})
amf_total = (
cgp * tf['scattering_weights'] * alpha
).sum('swt_level') / vcd_total
try:
amf_total.attrs.update(tf['amf_total'].attrs)
except Exception:
amf_total.attrs.update({
'long_name': f'{spc} CMAQ total air mass factor',
'valid_min': 0.0
})
amfds['amf_total'] = amf_total
amfds['vertical_column_total'] = vcd_total
if trop:
cgp = cprof['gas_profile'].where(gc_p > tf['tropopause_pressure'])
vcd_trop = cgp.sum('swt_level')
vcd_trop.attrs.update({
'long_name': f'troposphere {spc} CMAQ vertical column',
'units': 'molecules/cm^2'
})
amf_troposphere = (
cgp * tf['scattering_weights'] * alpha
).sum('swt_level') / vcd_trop
try:
amf_troposphere.attrs.update(tf['amf_troposphere'].attrs)
except Exception:
amf_troposphere.attrs.update({
'long_name': f'{spc} CMAQ tropospheric air mass factor',
'valid_min': 0.0
})
amfds['amf_troposphere'] = amf_troposphere
amfds['vertical_column_troposphere'] = vcd_trop
if strat:
cgp = cprof['gas_profile'].where(gc_p < tf['tropopause_pressure'])
vcd_strat = cgp.sum('swt_level')
vcd_strat.attrs.update({
'long_name': f'stratosphere {spc} CMAQ vertical column',
'units': 'molecules/cm^2'
})
amf_stratosphere = (
cgp * tf['scattering_weights'] * alpha
).sum('swt_level') / vcd_strat
try:
amf_stratosphere.attrs.update(tf['amf_stratosphere'].attrs)
except Exception:
amf_stratosphere.attrs.update({
'long_name': f'{spc} CMAQ stratospheric air mass factor',
'valid_min': 0.0
})
amfds['amf_stratosphere'] = amf_stratosphere
amfds['vertical_column_stratosphere'] = vcd_strat
if withprofiles:
amfds['temperature'] = cprof['temperature']
amfds['gas_profile'] = cprof['gas_profile']
return amfds
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment