Skip to content

Instantly share code, notes, and snippets.

@steph-ben
Last active December 5, 2023 11:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save steph-ben/2cd80f033d8f00c365bb170142206bae to your computer and use it in GitHub Desktop.
Save steph-ben/2cd80f033d8f00c365bb170142206bae to your computer and use it in GitHub Desktop.
Xarray & GRIBs > Performances improvements

Xarray & GRIBs > Performances improvements

Summary :


Expectations

Needs :

  • Explore NWP data for a long period of time (eg. years)
  • Access daily NWP data for operational tasks computation

Constraints :

  • CPU/Disk space limited
  • Operational task need to be as quick as possible
  • Reading GRIB files from CIPS DS read-only partition

Stack :

  • Python xarray cfgrib engine

Current status

Issues with heterogeneous data

Heterogeneous data means data with different coordinates, like leveltype, stepType, etc. When opening large GRIB file, when heterogeneous data, we often encounter this error :

>>> import xarray as xr
>>> # wget https://github.com/ecmwf/cfgrib/raw/master/nam.t00z.awp21100.tm00.grib2
>>> ds = xr.open_dataset("nam.t00z.awp21100.tm00.grib2", engine="cfgrib")
DatasetBuildError: multiple values for unique key, try re-open the file with one of:
    filter_by_keys={'typeOfLevel': 'meanSea'}
    filter_by_keys={'typeOfLevel': 'surface'}
    filter_by_keys={'typeOfLevel': 'isobaricInhPa'}
    filter_by_keys={'typeOfLevel': 'heightAboveGround'}
    filter_by_keys={'typeOfLevel': 'atmosphereSingleLayer'}
    filter_by_keys={'typeOfLevel': 'cloudBase'}
    filter_by_keys={'typeOfLevel': 'cloudTop'}
    filter_by_keys={'typeOfLevel': 'heightAboveGroundLayer'}
    filter_by_keys={'typeOfLevel': 'tropopause'}
    filter_by_keys={'typeOfLevel': 'maxWind'}
    filter_by_keys={'typeOfLevel': 'isothermZero'}
    filter_by_keys={'typeOfLevel': 'pressureFromGroundLayer'}

Possible solutions are :

  • Using filter_by_keys
  • Using cfgrib.open_datasets() that will automate opening all keys

But downside is that :

  • We then got different xr.Dataset , which is not ideal for manipulating data easily
  • Slow

Issues when reading from read-only partition

When reading GRIB files from read-only partition, by default cfgrib cannot create index and produce this error :

>>> xr.open_dataset("demo/nam.t00z.awp21100.tm00.grib2", 
    engine="cfgrib", 
    backend_kwargs={'filter_by_keys': {'typeOfLevel': 'surface'}}
)                                                                                     ...                        ...              
Can't create file 'demo/nam.t00z.awp21100.tm00.grib2.923a8.idx'     

Possible solution :

  • We always use the indexpath='' argument.

But the downside is that :

  • Rescan the whole GRIB file everytime, which is dramatically slow for big files

Solutions proposal

Using indexes on read-only partitions

We can specify a index path directory, with something like :

>>> Path("/tmp/indexes/demo").mkdir(parents=True, exist_ok=True)
>>> xr.open_dataset("demo/nam.t00z.awp21100.tm00.grib2", engine="cfgrib", 
        backend_kwargs={
            'filter_by_keys': {'typeOfLevel': 'surface'}, 
            'indexpath': '/tmp/indexes/{path}.idx'
        })
...

Cfgrib will store indexes like that :

/tmp/indexes/             
├── demo                                  
│   └── nam.t00z.awp21100.tm00.grib2.idx

But the downside is that :

  • We need to manually create index path directory
  • Or use cfgrib pull request ecmwf/cfgrib#360

Filter heterogeneous

By using cfgrib version from ecmwf/cfgrib#362 , one can open heterogeneous GRIB file with the following :

>>> import xarray as xr
>>> xr.open_dataset('nam.t00z.awp21100.tm00.grib2', engine='cfgrib',
        backend_kwargs={'filter_heterogeneous': True})
<xarray.Dataset>
Dimensions:                                  (y: 65, x: 93, isobaricInhPa: 19,
                                              pressureFromGroundLayer: 5,
                                              isobaricInhPa1: 5,
                                              pressureFromGroundLayer1: 2,
                                              pressureFromGroundLayer2: 2,
                                              heightAboveGroundLayer: 2)
Coordinates:
    time                      datetime64[ns] ...
    step                      timedelta64[ns] ...
    meanSea                   float64 ...
    latitude                  (y, x) float64 ...
    longitude                 (y, x) float64 ...
    valid_time                datetime64[ns] ...
    surface                   float64 ...
  * isobaricInhPa             (isobaricInhPa) float64 1e+03 950.0 ... 100.0
    cloudBase                 float64 ...
    cloudTop                  float64 ...
    maxWind                   float64 ...
    isothermZero              float64 ...
    tropopause                float64 ...
  * pressureFromGroundLayer   (pressureFromGroundLayer) float64 3e+03 ... 1.5...
  * isobaricInhPa1            (isobaricInhPa1) float64 1e+03 850.0 ... 250.0
    heightAboveGround         float64 ...
    heightAboveGround1        float64 ...
    heightAboveGround2        float64 ...
  * pressureFromGroundLayer1  (pressureFromGroundLayer1) float64 9e+03 1.8e+04
  * pressureFromGroundLayer2  (pressureFromGroundLayer2) float64 9e+03 1.8e+04
    atmosphereSingleLayer     float64 ...
  * heightAboveGroundLayer    (heightAboveGroundLayer) float64 1e+03 3e+03
    pressureFromGroundLayer3  float64 ...
    pressureFromGroundLayer4  float64 ...
Dimensions without coordinates: y, x
Data variables:
    prmsl__meanSea__instant                  (y, x) float32 ...
    gust__surface__instant                   (y, x) float32 ...
    gh__isobaricInhPa__instant               (isobaricInhPa, y, x) float32 ...
    gh__cloudBase__instant                   (y, x) float32 ...
    gh__cloudTop__instant                    (y, x) float32 ...
    gh__maxWind__instant                     (y, x) float32 ...
    gh__isothermZero__instant                (y, x) float32 ...
    t__isobaricInhPa__instant                (isobaricInhPa, y, x) float32 ...
    t__cloudTop__instant                     (y, x) float32 ...
    t__tropopause__instant                   (y, x) float32 ...
    t__pressureFromGroundLayer__instant      (pressureFromGroundLayer, y, x) float32 ...
    r__isobaricInhPa__instant                (isobaricInhPa, y, x) float32 ...
    r__isothermZero__instant                 (y, x) float32 ...
    r__pressureFromGroundLayer__instant      (pressureFromGroundLayer, y, x) float32 ...
    w__isobaricInhPa__instant                (isobaricInhPa, y, x) float32 ...
    u__isobaricInhPa__instant                (isobaricInhPa, y, x) float32 ...
    u__tropopause__instant                   (y, x) float32 ...
    u__maxWind__instant                      (y, x) float32 ...
    u__pressureFromGroundLayer__instant      (pressureFromGroundLayer, y, x) float32 ...
    v__isobaricInhPa__instant                (isobaricInhPa, y, x) float32 ...
    v__tropopause__instant                   (y, x) float32 ...
    v__maxWind__instant                      (y, x) float32 ...
    v__pressureFromGroundLayer__instant      (pressureFromGroundLayer, y, x) float32 ...
    absv__isobaricInhPa__instant             (isobaricInhPa1, y, x) float32 ...
    mslet__meanSea__instant                  (y, x) float32 ...
    sp__surface__instant                     (y, x) float32 ...
    orog__surface__instant                   (y, x) float32 ...
    t2m__heightAboveGround__instant          (y, x) float32 ...
    r2__heightAboveGround__instant           (y, x) float32 ...
    u10__heightAboveGround__instant          (y, x) float32 ...
    v10__heightAboveGround__instant          (y, x) float32 ...
    tp__surface__accum                       (y, x) float32 ...
    acpcp__surface__accum                    (y, x) float32 ...
    csnow__surface__instant                  (y, x) float32 ...
    cicep__surface__instant                  (y, x) float32 ...
    cfrzr__surface__instant                  (y, x) float32 ...
    crain__surface__instant                  (y, x) float32 ...
    cape__surface__instant                   (y, x) float32 ...
    cape__pressureFromGroundLayer__instant   (pressureFromGroundLayer1, y, x) float32 ...
    cin__surface__instant                    (y, x) float32 ...
    cin__pressureFromGroundLayer__instant    (pressureFromGroundLayer2, y, x) float32 ...
    pwat__atmosphereSingleLayer__instant     (y, x) float32 ...
    pres__cloudBase__instant                 (y, x) float32 ...
    pres__cloudTop__instant                  (y, x) float32 ...
    pres__maxWind__instant                   (y, x) float32 ...
    hlcy__heightAboveGroundLayer__instant    (heightAboveGroundLayer, y, x) float32 ...
    trpp__tropopause__instant                (y, x) float32 ...
    pli__pressureFromGroundLayer__instant    (y, x) float32 ...
    4lftx__pressureFromGroundLayer__instant  (y, x) float32 ...
    unknown__surface__instant                (y, x) float32 ...
Attributes:
    GRIB_edition:            2
    GRIB_centre:             kwbc
    GRIB_centreDescription:  US National Weather Service - NCEP
    GRIB_subCentre:          0
    Conventions:             CF-1.7
    institution:             US National Weather Service - NCEP
    history:                 2023-12-04T16:56 GRIB to CDM+CF via cfgrib-0.9.1...

But the downside is that :

  • We need to used a forked version of cfgrib
  • Or wait until cfgrib accept pull request ecmwf/cfgrib#362

Proposed workflow

Given the following GRIB files :

arome/full/
├── [654M]  arome_indo_20231128_0000_00.grib
├── [836M]  arome_indo_20231128_0000_01.grib
├── [895M]  arome_indo_20231128_0000_02.grib
├── [952M]  arome_indo_20231128_0000_03.grib
├── [1013M]  arome_indo_20231128_0000_04.grib
├── [1.0G]  arome_indo_20231128_0000_05.grib
├── [1.1G]  arome_indo_20231128_0000_06.grib
├── [1.1G]  arome_indo_20231128_0000_07.grib
├── [1.1G]  arome_indo_20231128_0000_08.grib
├── [1.1G]  arome_indo_20231128_0000_09.grib
├── [1.1G]  arome_indo_20231128_0000_10.grib
├── [1.1G]  arome_indo_20231128_0000_11.grib
├── [1.1G]  arome_indo_20231128_0000_12.grib
├── [1.1G]  arome_indo_20231128_0000_13.grib
├── [1.1G]  arome_indo_20231128_0000_14.grib
├── [1.1G]  arome_indo_20231128_0000_15.grib
├── [1.2G]  arome_indo_20231128_0000_16.grib
├── [1.2G]  arome_indo_20231128_0000_17.grib
├── [1.2G]  arome_indo_20231128_0000_18.grib
├── [1.2G]  arome_indo_20231128_0000_19.grib
├── [1.2G]  arome_indo_20231128_0000_20.grib
├── [1.2G]  arome_indo_20231128_0000_21.grib
└── [1.2G]  arome_indo_20231128_0000_22.grib
pip install git+https://github.com/steph-ben/cfgrib.git
  • Pre-generate index files in another process
$ time for f in arome/full/*; do cfgrib build_index --index-basedir /tmp/indexes $f ; done
arome/full/arome_indo_20231128_0000_00.grib: Creating index          ~5seconds
arome/full/arome_indo_20231128_0000_01.grib: Creating index
arome/full/arome_indo_20231128_0000_02.grib: Creating index
...
real    2m56.073s
user    0m41.224s
sys     0m33.831s
  • Open full dataset with :
>>> fp_list = list(Path("arome/full").glob("*.grib"))
>>> ds = xr.open_mfdataset(fp_list, 
        parallel=True,
        combine='nested',
        concat_dim='valid_time',
        backend_kwargs={
            'filter_heterogeneous': True,
            'indexpath': '/tmp/indexes/{path}.idx',
        })
>>> ds
<xarray.Dataset>
Dimensions:                              (valid_time: 22, latitude: 1201,
                                          longitude: 2601, isobaricInhPa: 23,
                                          heightAboveGround: 25,
                                          heightAboveGround1: 22,
                                          potentialVorticity: 3,
                                          heightAboveGround2: 22, level: 5,
                                          isobaricInhPa1: 18,
                                          isobaricInhPa2: 20,
                                          isobaricInhPa3: 10,
                                          isobaricInhPa4: 5, isobaricInhPa5: 2)
Coordinates: (12/33)
    time                                 datetime64[ns] 1970-01-01
    step                                 (valid_time) timedelta64[ns] 01:00:0...
    surface                              float64 0.0
  * latitude                             (latitude) float64 15.0 14.97 ... -15.0
  * longitude                            (longitude) float64 85.0 ... 150.0
  * valid_time                           (valid_time) datetime64[ns] 1970-01-...
    ...                                   ...
  * isobaricInhPa3                       (isobaricInhPa3) float64 1e+03 ... 6...
  * isobaricInhPa4                       (isobaricInhPa4) float64 850.0 ... 3...
    level1                               float64 18.0
  * isobaricInhPa5                       (isobaricInhPa5) float64 950.0 300.0
    meanSea                              float64 0.0
    level2                               float64 0.0
Data variables: (12/81)
    t__surface__instant                  (valid_time, latitude, longitude) float32 dask.array<chunksize=(1, 1201, 2601), meta=np.ndarray>
    t__isobaricInhPa__instant            (valid_time, isobaricInhPa, latitude, longitude) float32 dask.array<chunksize=(1, 23, 1201, 2601), meta=np.ndarray>
    t__heightAboveGround__instant        (valid_time, heightAboveGround, latitude, longitude) float32 dask.array<chunksize=(1, 25, 1201, 2601), meta=np.ndarray>
    sd__surface__instant                 (valid_time, latitude, longitude) float32 dask.array<chunksize=(1, 1201, 2601), meta=np.ndarray>
    iews__surface__accum                 (valid_time, latitude, longitude) float32 dask.array<chunksize=(1, 1201, 2601), meta=np.ndarray>
    u__isobaricInhPa__instant            (valid_time, isobaricInhPa, latitude, longitude) float32 dask.array<chunksize=(1, 23, 1201, 2601), meta=np.ndarray>
    ...                                   ...
    ptype__surface__max                  (valid_time, latitude, longitude) float32 dask.array<chunksize=(1, 1201, 2601), meta=np.ndarray>
    ptype__surface__avg                  (valid_time, latitude, longitude) float32 dask.array<chunksize=(1, 1201, 2601), meta=np.ndarray>
    d__isobaricInhPa__instant            (valid_time, isobaricInhPa5, latitude, longitude) float32 dask.array<chunksize=(1, 2, 1201, 2601), meta=np.ndarray>
    prmsl__meanSea__instant              (valid_time, latitude, longitude) float32 dask.array<chunksize=(1, 1201, 2601), meta=np.ndarray>
    sp__surface__instant                 (valid_time, latitude, longitude) float32 dask.array<chunksize=(1, 1201, 2601), meta=np.ndarray>
    CAPE_INS__unknown__instant           (valid_time, latitude, longitude) float32 dask.array<chunksize=(1, 1201, 2601), meta=np.ndarray>
Attributes:
    GRIB_edition:            2
    GRIB_centre:             lfpw
    GRIB_centreDescription:  French Weather Service - Toulouse
    GRIB_subCentre:          14
    Conventions:             CF-1.7
    institution:             French Weather Service - Toulouse
    history:                 2023-12-05T12:52 GRIB to CDM+CF via cfgrib-0.9.1...
    
~22seconds
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment