Skip to content

Instantly share code, notes, and snippets.

@bennyistanto
Created March 13, 2024 08:22
Show Gist options
  • Save bennyistanto/21767f51364a08bd7355e102f4897b29 to your computer and use it in GitHub Desktop.
Save bennyistanto/21767f51364a08bd7355e102f4897b29 to your computer and use it in GitHub Desktop.
Generate derivative product from Vegetation Indices - 8day timeseries
# -*- coding: utf-8 -*-
"""
NAME
10_mxd13q1_viproducts_8day.py
Generate derivative product from Vegetation Indices
DESCRIPTION
Input data for this script will use MXD13Q1 8-days data generate from GEE or downloaded
from NASA. This script can do calculation for ratio, difference, standardize anomaly
and vegetation condition index.
The calculation required timeseries VI and the long-term statistics (min, mean, max, std)
REQUIREMENT
ArcGIS must installed before using this script, as it required arcpy module.
EXAMPLES
C:\\Program Files\\ArcGIS\\Pro\\bin\\Python\\envs\\arcgispro-py3\\python 10_mxd13q1_viproducts_8day.py
NOTES
This script is designed to work with MODIS naming convention
If using other data, some adjustment are required: parsing filename and directory
CONTACT
Benny Istanto
Climate Geographer
GOST/DECAT/DECDG, The World Bank
LICENSE
This script is in the public domain, free from copyrights or restrictions.
VERSION
$Id$
TODO
xx
"""
import os
import arcpy
from datetime import datetime, timedelta
import re
import uuid # For generating unique temporary filenames
# To avoid overwriting outputs, set overwriteOutput option to True for this script execution context
arcpy.env.overwriteOutput = True
# Raster environment settings for output consistency and optimization
arcpy.env.compression = "LZ77"
arcpy.env.pyramid = "PYRAMIDS -1 NEAREST LZ77 NO_SKIP"
# ISO3 Country Code
iso3 = "idn" # Update this to the country code you're working with
# Define input and output folders
input_folder = "D:\\temp\\modis\\{}\\gee\\04_fillnullwithstats\\evi_all_8day".format(iso3)
stats_folder = "D:\\temp\\modis\\{}\\gee\\03_statistics\\evi_all_8day".format(iso3)
ratioanom_folder = "D:\\temp\\modis\\{}\\gee\\05_ratioanom\\evi_all_8day".format(iso3)
diffanom_folder = "D:\\temp\\modis\\{}\\gee\\06_diffanom\\evi_all_8day".format(iso3)
stdanom_folder = "D:\\temp\\modis\\{}\\gee\\07_stdanom\\evi_all_8day".format(iso3)
vci_folder = "D:\\temp\\modis\\{}\\gee\\08_vci\\evi_all_8day".format(iso3)
# Check if input folder exists
if not os.path.exists(input_folder):
print(f"Input folder does not exist: {input_folder}")
exit(1)
print("Input folder found, processing...")
# Global variable to store user's choice
user_choice = None
def set_user_decision():
"""Prompt user for decision on existing files and store it globally."""
global user_choice
if user_choice is None:
decision = input("An output file already exists. Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
while decision not in ['R', 'S', 'A']:
print("Invalid choice. Please choose again.")
decision = input("Choose an action - Replace (R), Skip (S), Abort (A): ").upper()
user_choice = decision
def derivative_vi(input_folder, stats_folder, ratioanom_folder, diffanom_folder, stdanom_folder, vci_folder):
global user_choice # Use the global variable to store the user's choice
# Loop through the files in the input folder
for raster in os.listdir(input_folder):
# Check if the file is a TIFF file
if raster.endswith(".tif") or raster.endswith(".tiff"):
# Get the year, month, and day or day of year from the raster name
# {iso3}_phy_mxd13q1_evi_{yyyymmdd}.tif or {iso3}_phy_mxd13q1_evi_{yyyydoy}.tif
if "_" in raster:
year = int(raster[20:24])
month = int(raster[24:26])
day = int(raster[26:28])
date = datetime(year, month, day)
doy = date.strftime('%j')
else:
year = int(raster[20:24])
doy = int(raster[24:27])
leap_year = (datetime(year, 2, 29).strftime('%j') != '059')
if leap_year:
doy += 1
date = datetime(year, 1, 1) + timedelta(doy - 1)
# Get the DOY from the date
doy = date.strftime('%j')
# Construct the corresponding filenames in the stats folder
avg_raster = "{0}_phy_mxd13q1_20yr_avg_{1}.tif".format(iso3, doy.zfill(3))
min_raster = "{0}_phy_mxd13q1_20yr_min_{1}.tif".format(iso3, doy.zfill(3))
max_raster = "{0}_phy_mxd13q1_20yr_max_{1}.tif".format(iso3, doy.zfill(3))
std_raster = "{0}_phy_mxd13q1_20yr_std_{1}.tif".format(iso3, doy.zfill(3))
# Full paths to the statistics rasters
avg_raster = os.path.join(stats_folder, avg_raster)
min_raster = os.path.join(stats_folder, min_raster)
max_raster = os.path.join(stats_folder, max_raster)
std_raster = os.path.join(stats_folder, std_raster)
# Verify that the statistics rasters exist
stats_rasters_exist = True
for stats_raster in [avg_raster, min_raster, max_raster, std_raster]:
if not arcpy.Exists(stats_raster):
print(f"Error: {stats_raster} not found in {stats_folder}")
stats_rasters_exist = False
break
else:
print(f"Stats raster found: {stats_raster}")
if not stats_rasters_exist:
continue
# Create the output raster name with the appropriate format
# Ratio anomaly
if "_" in stats_raster:
ratioanom_raster = os.path.join(ratioanom_folder, "{}_phy_mxd13q1_ratioanom_{}{:02d}{:02d}.tif".format(iso3, year, month, day))
else:
ratioanom_raster = os.path.join(ratioanom_folder, "{}_phy_mxd13q1_ratioanom_{}{}.tif".format(iso3, year, doy.zfill(3)))
# Difference anomaly
if "_" in stats_raster:
diffanom_raster = os.path.join(diffanom_folder, "{}_phy_mxd13q1_diffanom_{}{:02d}{:02d}.tif".format(iso3, year, month, day))
else:
diffanom_raster = os.path.join(diffanom_folder, "{}_phy_mxd13q1_diffanom_{}{}.tif".format(iso3, year, doy.zfill(3)))
# Standardize anomaly
if "_" in stats_raster:
stdanom_raster = os.path.join(stdanom_folder, "{}_phy_mxd13q1_stdanom_{}{:02d}{:02d}.tif".format(iso3, year, month, day))
else:
stdanom_raster = os.path.join(stdanom_folder, "{}_phy_mxd13q1_stdanom_{}{}.tif".format(iso3, year, doy.zfill(3)))
# Vegetation condition index
if "_" in stats_raster:
vci_raster = os.path.join(vci_folder, "{}_phy_mxd13q1_vci_{}{:02d}{:02d}.tif".format(iso3, year, month, day))
else:
vci_raster = os.path.join(vci_folder, "{}_phy_mxd13q1_vci_{}{}.tif".format(iso3, year, doy.zfill(3)))
# Now, insert a loop to check the existence of each output and prompt for user decision if necessary
outputs = [ratioanom_raster, diffanom_raster, stdanom_raster, vci_raster]
for output_raster in outputs:
if arcpy.Exists(output_raster):
if user_choice is None:
set_user_decision()
if user_choice == 'S':
print(f"Skipping existing file: {output_raster}")
continue # Skip to the next output raster
elif user_choice == 'A':
print("Aborting process.")
return # Exit the function
elif user_choice == 'R':
arcpy.Delete_management(output_raster) # Delete if replacing
try:
arcpy.CheckOutExtension("spatial")
# Prepared the input
in_raster = arcpy.Raster(os.path.join(input_folder, raster))
print(in_raster)
avg_raster = arcpy.Raster(avg_raster)
print(avg_raster)
min_raster = arcpy.Raster(min_raster)
print(min_raster)
max_raster = arcpy.Raster(max_raster)
print(max_raster)
std_raster = arcpy.Raster(std_raster)
print(std_raster)
# Calculate the indices
ratioanom = arcpy.sa.Divide(arcpy.sa.Float(in_raster), arcpy.sa.Float(avg_raster)) * 100
diffanom = arcpy.sa.Float(in_raster) - arcpy.sa.Float(avg_raster)
stdanom = arcpy.sa.Divide((arcpy.sa.Float(in_raster) - arcpy.sa.Float(avg_raster)), arcpy.sa.Float(std_raster))
vci_unclipped = arcpy.sa.Divide((arcpy.sa.Float(in_raster) - arcpy.sa.Float(min_raster)), (arcpy.sa.Float(max_raster) - arcpy.sa.Float(min_raster))) * 100
vci = arcpy.sa.Con(vci_unclipped > 100, 100, arcpy.sa.Con(vci_unclipped < 0, 0, vci_unclipped))
# Before saving each output, create a temporary filename for intermediate saving
# Then, use arcpy.management.CopyRaster to apply LZ77 compression when saving the final output
for idx, calc_raster in enumerate([ratioanom, diffanom, stdanom, vci]):
# Determine the pixel type based on the index
if idx in [2, 3]: # Assuming stdanom and vci need floating-point precision
pixel_type = "32_BIT_FLOAT"
nodata_value = "-3.402823e+38" # Or "NaN", if appropriate and supported
else: # Use 16_BIT_SIGNED for other outputs
pixel_type = "16_BIT_SIGNED"
nodata_value = "-32768" # Common nodata value for 16_BIT_SIGNED
# Create a temporary filename for intermediate saving
temp_filename = f"temp_{uuid.uuid4().hex}.tif"
temp_file_path = os.path.join(arcpy.env.scratchFolder, temp_filename)
# Save the calculation result to the temporary file
calc_raster.save(temp_file_path)
# Replace direct saving with CopyRaster for LZ77 compression
arcpy.management.CopyRaster(
in_raster=temp_file_path,
out_rasterdataset=outputs[idx],
config_keyword="",
background_value="",
nodata_value=nodata_value,
onebit_to_eightbit="NONE",
colormap_to_RGB="NONE",
pixel_type=pixel_type,
scale_pixel_value="NONE",
RGB_to_Colormap="NONE",
format="TIFF",
transform="NONE",
process_as_multidimensional="CURRENT_SLICE",
build_multidimensional_transpose="NO_TRANSPOSE"
)
# Attempt to delete the temporary file
try:
arcpy.Delete_management(temp_file_path)
print(f"Temporary file {temp_file_path} deleted.")
except Exception as e:
print(f"Warning: Unable to delete temporary file {temp_file_path}")
# Clear any locks or residual files
arcpy.ClearWorkspaceCache_management()
except Exception as e:
print(f"An error occurred: {e}")
finally:
arcpy.CheckInExtension("spatial")
print(f"{outputs[idx]} completed with LZ77 compression, calculated from {os.path.join(input_folder, raster)} and other statistics rasters.")
# Remove the incorrectly placed 'else' statement here.
else:
print(f"Skipping non-TIFF file: {raster}")
# Main function
def main():
# Call the derivative_vi() function for the input folder
derivative_vi(input_folder, stats_folder, ratioanom_folder, diffanom_folder, stdanom_folder, vci_folder)
if __name__ == '__main__':
main()
print("Script completed.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment