Skip to content

Instantly share code, notes, and snippets.

@bennyistanto
Created March 13, 2024 08:26
Show Gist options
  • Save bennyistanto/a4d1bc342343c1f574235fcc17731b6c to your computer and use it in GitHub Desktop.
Save bennyistanto/a4d1bc342343c1f574235fcc17731b6c to your computer and use it in GitHub Desktop.
Generate derivative product from Vegetation Indices - quarterly timeseries
# -*- coding: utf-8 -*-
"""
NAME
12_mxd13q1_viproducts_quarter.py
Generate derivative product from Vegetation Indices
DESCRIPTION
Input data for this script will use MXD13Q1 quarterly data generate by 05_mxd13q1_8day2quarter.py
from NASA. This script can do calculation for ratio, difference, standardize anomaly
and vegetation condition index.
The calculation required timeseries VI and the long-term statistics (min, mean, max, std)
REQUIREMENT
ArcGIS must installed before using this script, as it required arcpy module.
EXAMPLES
C:\\Program Files\\ArcGIS\\Pro\\bin\\Python\\envs\\arcgispro-py3\\python 12_mxd13q1_viproducts_quarter.py
NOTES
This script is designed to work with MODIS naming convention
If using other data, some adjustment are required: parsing filename and directory
CONTACT
Benny Istanto
Climate Geographer
GOST/DECAT/DECDG, The World Bank
LICENSE
This script is in the public domain, free from copyrights or restrictions.
VERSION
$Id$
TODO
xx
"""
import os
import arcpy
from datetime import datetime, timedelta
import re
import uuid # For generating unique temporary filenames
# To avoid overwriting outputs, set overwriteOutput option to True for this script execution context
arcpy.env.overwriteOutput = True
# Raster environment settings for output consistency and optimization
arcpy.env.compression = "LZ77"
arcpy.env.pyramid = "PYRAMIDS -1 NEAREST LZ77 NO_SKIP"
# ISO3 Country Code
iso3 = "idn" # Update this to the country code you're working with
# Define input and output folders
input_folder = "D:\\temp\\modis\\{}\\gee\\04_fillnullwithstats\\evi_all_quarterly".format(iso3)
stats_folder = "D:\\temp\\modis\\{}\\gee\\03_statistics\\evi_all_quarterly".format(iso3)
ratioanom_folder = "D:\\temp\\modis\\{}\\gee\\05_ratioanom\\evi_all_quarterly".format(iso3)
diffanom_folder = "D:\\temp\\modis\\{}\\gee\\06_diffanom\\evi_all_quarterly".format(iso3)
stdanom_folder = "D:\\temp\\modis\\{}\\gee\\07_stdanom\\evi_all_quarterly".format(iso3)
vci_folder = "D:\\temp\\modis\\{}\\gee\\08_vci\\evi_all_quarterly".format(iso3)
# Quarters definition
quarters_def = ["010203", "020304", "030405", "040506", "050607", "060708", "070809", "080910", "091011", "101112", "111201", "120102"]
print("Script started...")
# Check if input folder exists
if not os.path.exists(input_folder):
print(f"Input folder does not exist: {input_folder}")
exit(1)
print("Input folder found, processing...")
# Global variable to store user's choice
user_choice = None
def set_user_decision():
"""Prompt user for decision on existing files and store it globally."""
global user_choice
if user_choice is None:
decision = input("An output file already exists. Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
while decision not in ['R', 'S', 'A']:
print("Invalid choice. Please choose again.")
decision = input("Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
user_choice = decision
def derivative_vi(input_folder, stats_folder, ratioanom_folder, diffanom_folder, stdanom_folder, vci_folder):
global user_choice # Use the global variable to store the user's choice
# Loop through the files in the input folder
for raster in os.listdir(input_folder):
# Only process .tif files
if raster.endswith(".tif") or raster.endswith(".tiff"):
print(f"Processing raster: {raster}")
parts = raster.split("_") # idn_phy_mxd13q1_evi_mean_2002_010203.tif
# Check the parts of the filename
print(f"Filename parts: {parts}")
# Adjusting to the actual number of parts in the filenames
if len(parts) == 7:
# Extract year and quarter from the last part of the filename
quarter = parts[6].split(".")[0]
year = parts[5] # The year is now correctly the 6th part (index 5)
print(f"Quarter: {quarter}, Year: {year}")
if quarter in quarters_def:
print(f"Quarter is valid: {quarter}")
# Construct the corresponding filenames in the stats folder
avg_raster_name = f"{iso3}_phy_mxd13q1_evi_{quarter}_2003_2022_avg.tif"
min_raster_name = f"{iso3}_phy_mxd13q1_evi_{quarter}_2003_2022_min.tif"
max_raster_name = f"{iso3}_phy_mxd13q1_evi_{quarter}_2003_2022_max.tif"
std_raster_name = f"{iso3}_phy_mxd13q1_evi_{quarter}_2003_2022_std.tif"
print(f"Expected stats raster names: {avg_raster_name}, {min_raster_name}, {max_raster_name}, {std_raster_name}")
# Full paths to the statistics rasters
avg_raster = os.path.join(stats_folder, avg_raster_name)
min_raster = os.path.join(stats_folder, min_raster_name)
max_raster = os.path.join(stats_folder, max_raster_name)
std_raster = os.path.join(stats_folder, std_raster_name)
# Verify that the statistics rasters exist
stats_rasters_exist = True
for stats_raster in [avg_raster, min_raster, max_raster, std_raster]:
if not arcpy.Exists(stats_raster):
print(f"Error: {stats_raster} not found in {stats_folder}")
stats_rasters_exist = False
break
else:
print(f"Stats raster found: {stats_raster}")
if not stats_rasters_exist:
continue
# Define output paths for anomalies and VCI
ratioanom_raster = os.path.join(ratioanom_folder, f"{iso3}_phy_mxd13q1_ratioanom_{year}_{quarter}.tif")
diffanom_raster = os.path.join(diffanom_folder, f"{iso3}_phy_mxd13q1_diffanom_{year}_{quarter}.tif")
stdanom_raster = os.path.join(stdanom_folder, f"{iso3}_phy_mxd13q1_stdanom_{year}_{quarter}.tif")
vci_raster = os.path.join(vci_folder, f"{iso3}_phy_mxd13q1_vci_{year}_{quarter}.tif")
# Now, insert a loop to check the existence of each output and prompt for user decision if necessary
outputs = [ratioanom_raster, diffanom_raster, stdanom_raster, vci_raster]
for output_raster in outputs:
if arcpy.Exists(output_raster):
if user_choice is None:
set_user_decision()
if user_choice == 'S':
print(f"Skipping existing file: {output_raster}")
continue # Skip to the next output raster
elif user_choice == 'A':
print("Aborting process.")
return # Exit the function
elif user_choice == 'R':
arcpy.Delete_management(output_raster) # Delete if replacing
try:
arcpy.CheckOutExtension("spatial")
# Prepare the input raster
in_raster = arcpy.Raster(os.path.join(input_folder, raster))
# Calculate the indices
ratioanom = arcpy.sa.Divide(arcpy.sa.Float(in_raster), arcpy.sa.Float(avg_raster)) * 100
diffanom = arcpy.sa.Float(in_raster) - arcpy.sa.Float(avg_raster)
stdanom = arcpy.sa.Divide((arcpy.sa.Float(in_raster) - arcpy.sa.Float(avg_raster)), arcpy.sa.Float(std_raster))
vci_unclipped = arcpy.sa.Divide((arcpy.sa.Float(in_raster) - arcpy.sa.Float(min_raster)), (arcpy.sa.Float(max_raster) - arcpy.sa.Float(min_raster))) * 100
vci = arcpy.sa.Con(vci_unclipped > 100, 100, arcpy.sa.Con(vci_unclipped < 0, 0, vci_unclipped))
# Before saving each output, create a temporary filename for intermediate saving
# Then, use arcpy.management.CopyRaster to apply LZ77 compression when saving the final output
for idx, calc_raster in enumerate([ratioanom, diffanom, stdanom, vci]):
# Determine the pixel type based on the index
if idx in [2, 3]: # Assuming stdanom and vci need floating-point precision
pixel_type = "32_BIT_FLOAT"
nodata_value = "-3.402823e+38" # Or "NaN", if appropriate and supported
else: # Use 16_BIT_SIGNED for other outputs
pixel_type = "16_BIT_SIGNED"
nodata_value = "-32768" # Common nodata value for 16_BIT_SIGNED
# Create a temporary filename for intermediate saving
temp_filename = f"temp_{uuid.uuid4().hex}.tif"
temp_file_path = os.path.join(arcpy.env.scratchFolder, temp_filename)
# Save the calculation result to the temporary file
calc_raster.save(temp_file_path)
# Replace direct saving with CopyRaster for LZ77 compression
arcpy.management.CopyRaster(
in_raster=temp_file_path,
out_rasterdataset=outputs[idx],
config_keyword="",
background_value="",
nodata_value=nodata_value,
onebit_to_eightbit="NONE",
colormap_to_RGB="NONE",
pixel_type=pixel_type,
scale_pixel_value="NONE",
RGB_to_Colormap="NONE",
format="TIFF",
transform="NONE",
process_as_multidimensional="CURRENT_SLICE",
build_multidimensional_transpose="NO_TRANSPOSE"
)
# Attempt to delete the temporary file
try:
arcpy.Delete_management(temp_file_path)
print(f"Temporary file {temp_file_path} deleted.")
except Exception as e:
print(f"Warning: Unable to delete temporary file {temp_file_path}")
# Clear any locks or residual files
arcpy.ClearWorkspaceCache_management()
except Exception as e:
print(f"An error occurred: {e}")
finally:
arcpy.CheckInExtension("spatial")
print(f"{outputs[idx]} completed with LZ77 compression, calculated from {os.path.join(input_folder, raster)} and other statistics rasters.")
else:
print(f"Invalid quarter in filename: {raster}")
else:
print(f"Unexpected filename format: {raster}")
else:
print(f"Skipping non-TIFF file: {raster}")
# Main function
def main():
# Call the derivative_vi() function for the input folder
derivative_vi(input_folder, stats_folder, ratioanom_folder, diffanom_folder, stdanom_folder, vci_folder)
if __name__ == '__main__':
main()
print("Script completed.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment