Skip to content

Instantly share code, notes, and snippets.

@james-hinton
Created April 25, 2023 12:06
Show Gist options
  • Save james-hinton/88876c7e8709e788546a4e0ca03e4eb7 to your computer and use it in GitHub Desktop.
Save james-hinton/88876c7e8709e788546a4e0ca03e4eb7 to your computer and use it in GitHub Desktop.
Check Cloud Optimized GeoTIFF
import pathlib
import rasterio
from rasterio.env import GDALVersion
import click
from typing import Union, Optional, Tuple, List, Dict, Any
def is_cog(
src_path: Union[str, pathlib.PurePath],
strict: bool = False,
config: Optional[Dict] = None,
quiet: bool = False,
) -> Tuple[bool, List[str], List[str]]:
"""
Validate Cloud Optimized Geotiff.
This script is the rasterio equivalent of
https://svn.osgeo.org/gdal/trunk/gdal/swig/python/samples/validate_cloud_optimized_geotiff.py
Parameters
----------
src_path: str or PathLike object
A dataset path or URL. Will be opened in "r" mode.
strict: bool
Treat warnings as errors
quiet: bool
Remove standard outputs
Returns
-------
is_valid: bool
True is src_path is a valid COG.
errors: list
List of validation errors.
warnings: list
List of validation warnings.
"""
errors: List[str] = []
warnings: List[str] = []
details: Dict[str, Any] = {}
config = config or {}
if not GDALVersion.runtime().at_least("2.2"):
raise Exception("GDAL 2.2 or above required")
with rasterio.Env(**config):
with rasterio.open(src_path) as src:
if not src.driver == "GTiff":
errors.append("The file is not a GeoTIFF")
if not quiet:
click.secho("The following errors were found:", fg="red", err=True)
for e in errors:
click.echo("- " + e, err=True)
return False, errors, warnings
if any(pathlib.Path(x).suffix.lower() == ".ovr" for x in src.files):
errors.append(
"Overviews found in external .ovr file. They should be internal"
)
overviews = src.overviews(1)
if src.width > 512 and src.height > 512:
if not src.is_tiled:
errors.append(
"The file is greater than 512xH or 512xW, but is not tiled"
)
if not overviews:
warnings.append(
"The file is greater than 512xH or 512xW, it is recommended "
"to include internal overviews"
)
ifd_offset = int(src.get_tag_item("IFD_OFFSET", "TIFF", bidx=1))
# Starting from GDAL 3.1, GeoTIFF and COG have ghost headers
# e.g:
# """
# GDAL_STRUCTURAL_METADATA_SIZE=000140 bytes
# LAYOUT=IFDS_BEFORE_DATA
# BLOCK_ORDER=ROW_MAJOR
# BLOCK_LEADER=SIZE_AS_UINT4
# BLOCK_TRAILER=LAST_4_BYTES_REPEATED
# KNOWN_INCOMPATIBLE_EDITION=NO
# """
#
# This header should be < 200bytes
if ifd_offset > 300:
errors.append(
f"The offset of the main IFD should be < 300. It is {ifd_offset} instead"
)
ifd_offsets = [ifd_offset]
details["ifd_offsets"] = {}
details["ifd_offsets"]["main"] = ifd_offset
if overviews and overviews != sorted(overviews):
errors.append("Overviews should be sorted")
for ix, dec in enumerate(overviews):
# NOTE: Size check is handled in rasterio `src.overviews` methods
# https://github.com/mapbox/rasterio/blob/4ebdaa08cdcc65b141ed3fe95cf8bbdd9117bc0b/rasterio/_base.pyx
# We just need to make sure the decimation level is > 1
if not dec > 1:
errors.append(
"Invalid Decimation {} for overview level {}".format(dec, ix)
)
# Check that the IFD of descending overviews are sorted by increasing
# offsets
ifd_offset = int(src.get_tag_item("IFD_OFFSET", "TIFF", bidx=1, ovr=ix))
ifd_offsets.append(ifd_offset)
details["ifd_offsets"]["overview_{}".format(ix)] = ifd_offset
if ifd_offsets[-1] < ifd_offsets[-2]:
if ix == 0:
errors.append(
"The offset of the IFD for overview of index {} is {}, "
"whereas it should be greater than the one of the main "
"image, which is at byte {}".format(
ix, ifd_offsets[-1], ifd_offsets[-2]
)
)
else:
errors.append(
"The offset of the IFD for overview of index {} is {}, "
"whereas it should be greater than the one of index {}, "
"which is at byte {}".format(
ix, ifd_offsets[-1], ix - 1, ifd_offsets[-2]
)
)
block_offset = src.get_tag_item("BLOCK_OFFSET_0_0", "TIFF", bidx=1)
data_offset = int(block_offset) if block_offset else 0
data_offsets = [data_offset]
details["data_offsets"] = {}
details["data_offsets"]["main"] = data_offset
for ix, _dec in enumerate(overviews):
block_offset = src.get_tag_item(
"BLOCK_OFFSET_0_0", "TIFF", bidx=1, ovr=ix
)
data_offset = int(block_offset) if block_offset else 0
data_offsets.append(data_offset)
details["data_offsets"]["overview_{}".format(ix)] = data_offset
if data_offsets[-1] != 0 and data_offsets[-1] < ifd_offsets[-1]:
if len(overviews) > 0:
errors.append(
"The offset of the first block of the smallest overview "
"should be after its IFD"
)
else:
errors.append(
"The offset of the first block of the image should "
"be after its IFD"
)
for i in range(len(data_offsets) - 2, 0, -1):
if data_offsets[i] < data_offsets[i + 1]:
errors.append(
"The offset of the first block of overview of index {} should "
"be after the one of the overview of index {}".format(i - 1, i)
)
if len(data_offsets) >= 2 and data_offsets[0] < data_offsets[1]:
errors.append(
"The offset of the first block of the main resolution image "
"should be after the one of the overview of index {}".format(
len(overviews) - 1
)
)
for ix, _dec in enumerate(overviews):
with rasterio.open(src_path, OVERVIEW_LEVEL=ix) as ovr_dst:
if ovr_dst.width > 512 and ovr_dst.height > 512:
if not ovr_dst.is_tiled:
errors.append("Overview of index {} is not tiled".format(ix))
if warnings and not quiet:
click.secho("The following warnings were found:", fg="yellow", err=True)
for w in warnings:
click.echo("- " + w, err=True)
click.echo(err=True)
if errors and not quiet:
click.secho("The following errors were found:", fg="red", err=True)
for e in errors:
click.echo("- " + e, err=True)
is_valid = False if errors or (warnings and strict) else True
return is_valid, errors, warnings
if __name__ == "__main__":
print(is_cog('coga.tif'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment