Skip to content

Instantly share code, notes, and snippets.

@bennyistanto
Created June 8, 2024 07:01
Show Gist options
  • Save bennyistanto/9151d197a2557daa3bf16b5c4b3efc27 to your computer and use it in GitHub Desktop.
Save bennyistanto/9151d197a2557daa3bf16b5c4b3efc27 to your computer and use it in GitHub Desktop.
Computes the maximum precipitation values for each month from the daily and multi-day accumulated datasets
import xarray as xr
import os
import glob
from tqdm import tqdm
import pandas as pd
# Define directories and filenames for max value calculations
max_calculations = [
{"days": 1, "input_dir": "/mnt/d/temp/imerg/data/fdd_1day", "output_dir": "/mnt/d/temp/imerg/data/fddmaxm_1day"},
{"days": 2, "input_dir": "/mnt/d/temp/imerg/data/fdd_2day", "output_dir": "/mnt/d/temp/imerg/data/fddmaxm_2day"},
{"days": 3, "input_dir": "/mnt/d/temp/imerg/data/fdd_3day", "output_dir": "/mnt/d/temp/imerg/data/fddmaxm_3day"},
{"days": 4, "input_dir": "/mnt/d/temp/imerg/data/fdd_4day", "output_dir": "/mnt/d/temp/imerg/data/fddmaxm_4day"},
{"days": 5, "input_dir": "/mnt/d/temp/imerg/data/fdd_5day", "output_dir": "/mnt/d/temp/imerg/data/fddmaxm_5day"},
]
# Directory to store combined yearly datasets
combined_year_dir = "/mnt/d/temp/imerg/data/combined_year_ds"
# Global variable to store user's choice
user_choice = None
def set_user_decision():
"""Prompt user for decision on existing files and store it globally."""
global user_choice
if user_choice is None:
decision = input("An output file already exists. Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
while decision not in ['R', 'S', 'A']:
print("Invalid choice. Please choose again.")
decision = input("Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
user_choice = decision
def create_combined_yearly_ds(input_dir, days):
"""Create combined yearly datasets and save them to disk."""
# Check if input folder exists
if not os.path.exists(input_dir):
print(f"Input folder does not exist: {input_dir}")
exit(1)
os.makedirs(combined_year_dir, exist_ok=True)
# Create a list of files
file_list = sorted(glob.glob(os.path.join(input_dir, f"wld_cli_imerg_fdd_{days}day_*.nc4")))
# Group files by year
files_by_year = {}
for file_path in file_list:
date_str = file_path.split('_')[-1].split('.')[0]
year = date_str[:4]
if year not in files_by_year:
files_by_year[year] = []
files_by_year[year].append(file_path)
# Process each year separately and create combined dataset for each year
with tqdm(total=len(files_by_year), desc=f"Processing {days}-day data by year") as year_pbar:
for year, year_files in files_by_year.items():
combined_ds_path = os.path.join(combined_year_dir, f"combined_{days}day_{year}.nc4")
if not os.path.exists(combined_ds_path):
with tqdm(total=len(year_files), desc=f"Combining datasets for {year}", leave=False) as combine_pbar:
datasets = [xr.open_dataset(f) for f in year_files]
combined_year_ds = xr.concat(datasets, dim='time')
combine_pbar.update(len(year_files))
combined_year_ds.to_netcdf(combined_ds_path)
combined_year_ds.close()
for ds in datasets:
ds.close()
print(f"Combined dataset saved to {combined_ds_path}")
year_pbar.update(1)
for calc in max_calculations:
days = calc["days"]
input_dir = calc["input_dir"]
output_dir = calc["output_dir"]
os.makedirs(output_dir, exist_ok=True)
# Create combined yearly datasets if they don't already exist
create_combined_yearly_ds(input_dir, days)
# Load and process each combined dataset by year
with tqdm(total=len(os.listdir(combined_year_dir)), desc=f"Calculating monthly max for {days}-day data") as year_pbar:
for combined_ds_file in os.listdir(combined_year_dir):
if f"combined_{days}day_" not in combined_ds_file:
continue
combined_ds_path = os.path.join(combined_year_dir, combined_ds_file)
combined_ds = xr.open_dataset(combined_ds_path)
# Resample to monthly frequency and calculate daily maximum by month
monthly_max = combined_ds.resample(time='MS').max(dim='time')
# Save each month's maximum as a separate file
for date, ds in tqdm(monthly_max.groupby('time', squeeze=False), desc=f"Saving {days}-day monthly max files", leave=False):
year_month = pd.Timestamp(date.item()).strftime('%Y%m')
output_filename = f"wld_cli_imerg_fddmax_{days}day_{year_month}.nc4"
output_filepath = os.path.join(output_dir, output_filename)
if os.path.exists(output_filepath):
if user_choice is None:
set_user_decision()
if user_choice == 'S':
print(f"Skipping existing file: {output_filepath}")
continue # Skip to the next file
elif user_choice == 'A':
print("Aborting process.")
exit(0) # Exit the script
elif user_choice == 'R':
pass # Replace the file
try:
# Ensure CF conventions and compression
encoding = {var: {'zlib': True, 'complevel': 5} for var in ds.data_vars}
ds.attrs['Conventions'] = 'CF-1.8'
ds.to_netcdf(output_filepath, encoding=encoding)
print(f"Saved: {output_filepath}")
except Exception as e:
print(f"Error saving file {output_filepath}: {e}")
print(f"{days}-day monthly max processing complete for {combined_ds_file}.")
year_pbar.update(1)
print("All monthly max calculations processed.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment