Created
April 25, 2023 12:06
-
-
Save james-hinton/88876c7e8709e788546a4e0ca03e4eb7 to your computer and use it in GitHub Desktop.
Check Cloud Optimized GeoTIFF
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pathlib | |
import rasterio | |
from rasterio.env import GDALVersion | |
import click | |
from typing import Union, Optional, Tuple, List, Dict, Any | |
def is_cog( | |
src_path: Union[str, pathlib.PurePath], | |
strict: bool = False, | |
config: Optional[Dict] = None, | |
quiet: bool = False, | |
) -> Tuple[bool, List[str], List[str]]: | |
""" | |
Validate Cloud Optimized Geotiff. | |
This script is the rasterio equivalent of | |
https://svn.osgeo.org/gdal/trunk/gdal/swig/python/samples/validate_cloud_optimized_geotiff.py | |
Parameters | |
---------- | |
src_path: str or PathLike object | |
A dataset path or URL. Will be opened in "r" mode. | |
strict: bool | |
Treat warnings as errors | |
quiet: bool | |
Remove standard outputs | |
Returns | |
------- | |
is_valid: bool | |
True is src_path is a valid COG. | |
errors: list | |
List of validation errors. | |
warnings: list | |
List of validation warnings. | |
""" | |
errors: List[str] = [] | |
warnings: List[str] = [] | |
details: Dict[str, Any] = {} | |
config = config or {} | |
if not GDALVersion.runtime().at_least("2.2"): | |
raise Exception("GDAL 2.2 or above required") | |
with rasterio.Env(**config): | |
with rasterio.open(src_path) as src: | |
if not src.driver == "GTiff": | |
errors.append("The file is not a GeoTIFF") | |
if not quiet: | |
click.secho("The following errors were found:", fg="red", err=True) | |
for e in errors: | |
click.echo("- " + e, err=True) | |
return False, errors, warnings | |
if any(pathlib.Path(x).suffix.lower() == ".ovr" for x in src.files): | |
errors.append( | |
"Overviews found in external .ovr file. They should be internal" | |
) | |
overviews = src.overviews(1) | |
if src.width > 512 and src.height > 512: | |
if not src.is_tiled: | |
errors.append( | |
"The file is greater than 512xH or 512xW, but is not tiled" | |
) | |
if not overviews: | |
warnings.append( | |
"The file is greater than 512xH or 512xW, it is recommended " | |
"to include internal overviews" | |
) | |
ifd_offset = int(src.get_tag_item("IFD_OFFSET", "TIFF", bidx=1)) | |
# Starting from GDAL 3.1, GeoTIFF and COG have ghost headers | |
# e.g: | |
# """ | |
# GDAL_STRUCTURAL_METADATA_SIZE=000140 bytes | |
# LAYOUT=IFDS_BEFORE_DATA | |
# BLOCK_ORDER=ROW_MAJOR | |
# BLOCK_LEADER=SIZE_AS_UINT4 | |
# BLOCK_TRAILER=LAST_4_BYTES_REPEATED | |
# KNOWN_INCOMPATIBLE_EDITION=NO | |
# """ | |
# | |
# This header should be < 200bytes | |
if ifd_offset > 300: | |
errors.append( | |
f"The offset of the main IFD should be < 300. It is {ifd_offset} instead" | |
) | |
ifd_offsets = [ifd_offset] | |
details["ifd_offsets"] = {} | |
details["ifd_offsets"]["main"] = ifd_offset | |
if overviews and overviews != sorted(overviews): | |
errors.append("Overviews should be sorted") | |
for ix, dec in enumerate(overviews): | |
# NOTE: Size check is handled in rasterio `src.overviews` methods | |
# https://github.com/mapbox/rasterio/blob/4ebdaa08cdcc65b141ed3fe95cf8bbdd9117bc0b/rasterio/_base.pyx | |
# We just need to make sure the decimation level is > 1 | |
if not dec > 1: | |
errors.append( | |
"Invalid Decimation {} for overview level {}".format(dec, ix) | |
) | |
# Check that the IFD of descending overviews are sorted by increasing | |
# offsets | |
ifd_offset = int(src.get_tag_item("IFD_OFFSET", "TIFF", bidx=1, ovr=ix)) | |
ifd_offsets.append(ifd_offset) | |
details["ifd_offsets"]["overview_{}".format(ix)] = ifd_offset | |
if ifd_offsets[-1] < ifd_offsets[-2]: | |
if ix == 0: | |
errors.append( | |
"The offset of the IFD for overview of index {} is {}, " | |
"whereas it should be greater than the one of the main " | |
"image, which is at byte {}".format( | |
ix, ifd_offsets[-1], ifd_offsets[-2] | |
) | |
) | |
else: | |
errors.append( | |
"The offset of the IFD for overview of index {} is {}, " | |
"whereas it should be greater than the one of index {}, " | |
"which is at byte {}".format( | |
ix, ifd_offsets[-1], ix - 1, ifd_offsets[-2] | |
) | |
) | |
block_offset = src.get_tag_item("BLOCK_OFFSET_0_0", "TIFF", bidx=1) | |
data_offset = int(block_offset) if block_offset else 0 | |
data_offsets = [data_offset] | |
details["data_offsets"] = {} | |
details["data_offsets"]["main"] = data_offset | |
for ix, _dec in enumerate(overviews): | |
block_offset = src.get_tag_item( | |
"BLOCK_OFFSET_0_0", "TIFF", bidx=1, ovr=ix | |
) | |
data_offset = int(block_offset) if block_offset else 0 | |
data_offsets.append(data_offset) | |
details["data_offsets"]["overview_{}".format(ix)] = data_offset | |
if data_offsets[-1] != 0 and data_offsets[-1] < ifd_offsets[-1]: | |
if len(overviews) > 0: | |
errors.append( | |
"The offset of the first block of the smallest overview " | |
"should be after its IFD" | |
) | |
else: | |
errors.append( | |
"The offset of the first block of the image should " | |
"be after its IFD" | |
) | |
for i in range(len(data_offsets) - 2, 0, -1): | |
if data_offsets[i] < data_offsets[i + 1]: | |
errors.append( | |
"The offset of the first block of overview of index {} should " | |
"be after the one of the overview of index {}".format(i - 1, i) | |
) | |
if len(data_offsets) >= 2 and data_offsets[0] < data_offsets[1]: | |
errors.append( | |
"The offset of the first block of the main resolution image " | |
"should be after the one of the overview of index {}".format( | |
len(overviews) - 1 | |
) | |
) | |
for ix, _dec in enumerate(overviews): | |
with rasterio.open(src_path, OVERVIEW_LEVEL=ix) as ovr_dst: | |
if ovr_dst.width > 512 and ovr_dst.height > 512: | |
if not ovr_dst.is_tiled: | |
errors.append("Overview of index {} is not tiled".format(ix)) | |
if warnings and not quiet: | |
click.secho("The following warnings were found:", fg="yellow", err=True) | |
for w in warnings: | |
click.echo("- " + w, err=True) | |
click.echo(err=True) | |
if errors and not quiet: | |
click.secho("The following errors were found:", fg="red", err=True) | |
for e in errors: | |
click.echo("- " + e, err=True) | |
is_valid = False if errors or (warnings and strict) else True | |
return is_valid, errors, warnings | |
if __name__ == "__main__": | |
print(is_cog('coga.tif')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment