Skip to content

Instantly share code, notes, and snippets.

@joferkington
Last active November 28, 2023 15:14
Show Gist options
  • Save joferkington/44fab4fa3f2b7436d1cc3c28364b64cc to your computer and use it in GitHub Desktop.
Save joferkington/44fab4fa3f2b7436d1cc3c28364b64cc to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
# Copyright 2019 Planet Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Example of cli-based tool to download, merge, and clip data for multiple mosaics
in a series. Expects the PL_API_KEY environment variable to be set and contain
your Planet api key.
Note that the merging and clipping functionality depends on having gdal cli
tools installed.
Examples::
# Download quads into folders name after each mosaic
python download_mosaic_series.py "Global Monthly" --region=-85,35,-85,35
# Download, merge, and clip quads into a single file per mosaic
python download_mosaic_series.py "Global Monthly" --region=-85,35,-85,35 --clip
"""
import os
import re
import glob
import json
import shutil
import argparse
import subprocess
import datetime as dt
import itertools as it
from urllib3.util.retry import Retry
from concurrent.futures import ThreadPoolExecutor
import requests
from requests.adapters import HTTPAdapter
def main():
descrip = "Download quads for multiple mosaics in a series"
parser = argparse.ArgumentParser(description=descrip)
parser.add_argument('series_name',
help='Name of the mosaic series, e.g. "Global Monthly"')
parser.add_argument('--region',
help='Region to download quads for. Expected to be a local geojson file'
' or lon_min,lat_min,lon_max,lat_max.')
parser.add_argument('--start-date',
help='Beginning of time interval in YYYY-MM-DD format.')
parser.add_argument('--end-date',
help='End of time interval in YYYY-MM-DD format.')
parser.add_argument('--merge-quads', action='store_true',
help='If specified, merge the downloaded quads into one file. Requires'
' gdal command-line utilities to be installed.')
parser.add_argument('--clip', action='store_true',
help='If specified, merge and clip the quads to the exact --region.'
' Requires gdal command-line utilities to be installed.')
parser.add_argument('--api-key',
help='Explicitly specify a Planet api key. If not specified, this will'
' be read from the PL_API_KEY environment variable')
parser.add_argument('--no-folder', action='store_true',
help='By default, quads from each mosaic will be put in a separate'
' folder. This option does not create separate folders and adds'
' the mosaic name to the filename instead. i.e. all files will'
' be written to the current folder.')
args = parser.parse_args()
if args.start_date is not None:
args.start_date = dt.datetime.strptime(args.start_date, '%Y-%m-%d')
if args.end_date is not None:
args.end_date = dt.datetime.strptime(args.end_date, '%Y-%m-%d')
if args.clip:
args.merge_quads = True
if not args.region:
poly = None
args.clip = False
else:
bounds = args.region.split(',')
if len(bounds) == 4:
xmin, ymin, xmax, ymax = map(float, bounds)
if not xmax >= xmin and ymax >= ymin:
raise ValueError('Inavlid bounding box: {}'.format(args.region))
poly = bounding_box_to_poly(xmin, ymin, xmax, ymax)
else:
poly = geojson_to_geometry(args.region)
client = Client(api_key=args.api_key)
if not client.api_key:
msg = ('API key not set! Either set PL_API_KEY in the environment or'
' use the --api-key option')
raise ValueError(msg)
series = client.series(name=args.series_name)
mosaics = list(series.mosaics(args.start_date, args.end_date))
quads = series.download_quads(region=poly, start_date=args.start_date,
end_date=args.end_date, flat=args.no_folder)
for path in quads:
print('Downloaded {}'.format(path))
if args.merge_quads:
clip = poly if args.clip else None
folders = [os.path.join(os.getcwd(), x.name) for x in mosaics]
merge_quads(folders, clip=clip, flat=args.no_folder)
def merge_quads(folders, clip=None, flat=False):
cutline = None
if clip is not None:
with open('cutline.geojson', 'w') as outfile:
json.dump(clip, outfile)
cutline = 'cutline.gpkg'
cmd = ['ogr2ogr', '-q', '-s_srs', 'EPSG:4326', '-t_srs', 'EPSG:3857',
'-of', 'GPKG', cutline, 'cutline.geojson']
subprocess.check_call(cmd)
os.remove('cutline.geojson')
for dirname in folders:
if flat:
filenames = list(glob.glob(dirname + '*.tif'))
else:
filenames = list(glob.glob(dirname + '/*.tif'))
basename = os.path.split(dirname)[-1]
vrtname = basename + '.vrt'
tifname = basename + '.tif'
subprocess.check_call(['gdalbuildvrt', '-q', vrtname] + filenames)
if cutline:
cutname = basename + '.cut.tif'
cmd = ['gdalwarp', '-q', '-cutline', cutline, '-crop_to_cutline',
vrtname, cutname]
print('...Clipping quads...')
subprocess.check_call(cmd)
os.remove(vrtname)
vrtname = cutname
cmd = ['gdal_translate', '-q', vrtname, tifname,
'-co', 'TILED=YES', '-co', 'COMPRESS=LZW']
subprocess.check_call(cmd)
cmd = ['gdaladdo', '-q', '-r', 'average', tifname, '2', '4', '8', '16']
subprocess.check_call(cmd)
try:
shutil.rmtree(dirname)
except Exception:
pass
os.remove(vrtname)
for filename in filenames:
# Assume we might have a "dirty" current folder where the globbing
# found a file with the same name from a previous run
if os.path.basename(filename) != tifname:
os.remove(filename)
print('Merged quads into {}'.format(tifname))
if cutline:
os.remove(cutline)
def geojson_to_geometry(filename):
with open(filename, 'r') as infile:
data = json.load(infile)
try:
featuretype = data['type']
except KeyError:
raise ValueError('{} does not appear to be GeoJSON!'.format(filename))
if featuretype in ['Polygon', 'MultiPolygon']:
return data
elif featuretype == 'FeatureCollection':
geoms = []
for item in data['features']:
if item['geometry']['type'] == 'Polygon':
geoms.append(item['geometry']['coordinates'])
elif item['geometry']['type'] == 'MultiPolygon':
geoms.extend(item['geometry']['coordinates'])
else:
msg = 'Unsupported type: {}'.format(item['geometry']['type'])
raise ValueError(msg)
return {'type': 'MultiPolygon', 'coordinates': geoms}
else:
raise ValueError('Unsupported geometry type: {}'.format(featuretype))
def bounding_box_to_poly(xmin, ymin, xmax, ymax):
aoi = {
"type": "Polygon",
"coordinates": [
[[xmin, ymin],
[xmax, ymin],
[xmax, ymax],
[xmin, ymax],
[xmin, ymin]]
]
}
return aoi
def _get_client(client):
if client is None:
client = Client()
return client
def _chunks(iterable, chunksize):
iterable = iter(iterable)
while True:
v = tuple(it.islice(iterable, chunksize))
if v:
yield v
else:
return
class Client(object):
"""Base client for working with the Planet mosaics API"""
base_url = 'https://api.planet.com/basemaps/v1'
def __init__(self, api_key=None):
"""
:param str api_key:
Your Planet API key. If not specified, this will be read from the
PL_API_KEY environment variable.
"""
if api_key is None:
api_key = os.getenv('PL_API_KEY')
self.api_key = api_key
self.session = requests.Session()
self.session.auth = (api_key, '')
retries = Retry(total=5, backoff_factor=0.2, status_forcelist=[429])
self.session.mount('https://', HTTPAdapter(max_retries=retries))
def _url(self, endpoint):
return '{}/{}'.format(self.base_url, endpoint)
def _consume_pages(self, endpoint, key, **params):
"""General pagination structure for Planet APIs."""
url = self._url(endpoint)
while True:
response = self._get(url, **params)
for item in response[key]:
yield item
if '_next' in response['_links']:
url = response['_links']['_next']
else:
break
def _query(self, endpoint, key, json_query):
"""Post and then get for pagination. Being lazy here and repeating."""
url = None
while True:
if url is None:
url = self._url(endpoint)
response = self._post(url, json_query)
else:
response = self._get(url)
for item in response[key]:
yield item
if '_next' in response['_links']:
url = response['_links']['_next']
else:
break
def _list(self, endpoint, key=None, **params):
key = key or endpoint
for item in self._consume_pages(endpoint, key, **params):
yield item
def _get(self, url, **params):
rv = self.session.get(url, params=params)
rv.raise_for_status()
return rv.json()
def _post(self, url, json_data):
rv = self.session.post(url, json=json_data)
rv.raise_for_status()
return rv.json()
def _item(self, endpoint, **params):
return self._get(self._url(endpoint), **params)
def _download(self, url, filename=None, output_dir=None):
response = self.session.get(url, stream=True)
response.raise_for_status()
disposition = response.headers['Content-Disposition']
if filename is None:
filename = re.findall(r'filename="(.+)"', disposition)[0]
if filename is None:
msg = 'Filename not specified and no content-disposition info!'
raise ValueError(msg)
if output_dir is not None:
try:
os.mkdir(output_dir)
except OSError:
# Due to threading, the directory may be created simultaneously
pass
filename = os.path.join(output_dir, filename)
# Download in chunks.
with open(filename, 'wb') as outfile:
shutil.copyfileobj(response.raw, outfile)
del response
return filename
def series(self, name=None, series_id=None):
"""
Retrieve a MosaicSeries object for a specific series by either name or
ID. You must specify either name or series_id, but not both.
:param str name:
The name of the series (e.g. "Global Monthly")
:param str series_id:
The series ID (e.g. "431b62a0-eaf9-45e7-acf1-d58278176d52")
:returns MosaicSeries:
An object representing the series
"""
if name is not None:
return MosaicSeries.from_name(name, self)
elif series_id is not None:
return MosaicSeries.from_id(series_id, self)
else:
raise ValueError('You must specify either name or series_id!')
def list_series(self, name_contains=None):
"""
Iterate through all mosaic series you have access to, optionally
filtering based on name. Yields ``MosaicSeries`` instances.
:param str name_contains:
Search only for series that contain the specified substring in
their names
"""
for item in self._list('series', name__contains=name_contains):
yield MosaicSeries(item, self)
def mosaic(self, name=None, mosaic_id=None):
"""
Retrieve a `Mosaic` object for a particular mosaic, either by name or
ID. You must specify either name or mosaic_id, but not both.
:param str name:
The name of the mosaic (e.g. "global_monthly_2019_09_mosaic")
:param str mosaic_id:
The mosaic ID (e.g. "431b62a0-eaf9-45e7-acf1-d58278176d52")
:returns Mosaic:
An object representing the mosaic
"""
if name is not None:
return Mosaic.from_name(name, self)
elif mosaic_id is not None:
return Mosaic.from_id(mosaic_id, self)
else:
raise ValueError('You must specify either name or mosaic_id!')
class MosaicSeries(object):
"""Represents a collection of mosaics, usually on a regular interval."""
def __init__(self, info, client=None):
"""
:param dict info:
The JSON api response for the series from the mosaic api
:param Client client:
A specific Client instance to use. Will be created if not specified.
"""
self.client = _get_client(client)
self.id = info['id']
self.name = info['name']
self.info = info
@classmethod
def from_name(cls, name, client=None):
"""
Initialize a series based on its exact name.
"""
client = _get_client(client)
info = next(client._list('series', name__is=name))
return cls(info, client)
@classmethod
def from_id(cls, series_id, client=None):
"""
Initialize a series based on its ID.
"""
client = _get_client(client)
info = client._item('series/{}'.format(self.id))
return cls(info, client)
def mosaics(self, start_date=None, end_date=None):
"""
Iterate through mosaics in this series, optionally between the specified
start and end dates.
:param datetime start_date:
The earliest date to use. Note that this check is based on the
first_acquired metadata for the mosaic.
:param datetime end:
The earliest date to use. Note that this check is based on the
last_acquired metadata for the mosaic.
"""
if start_date is None:
start_date = dt.datetime.min
if end_date is None:
end_date = dt.datetime.max
endpoint = 'series/{}/mosaics'.format(self.id)
for info in self.client._list(endpoint, key='mosaics'):
mosaic = Mosaic(info, self.client)
if (mosaic.start_date >= start_date) and (mosaic.end_date <= end_date):
yield mosaic
def download_quads(self, region=None, bbox=None, start_date=None,
end_date=None, nthreads=16, flat=False,
filename_template=None):
"""
Download quads for all mosaics in the series. Will be downloaded into
separate folders based on mosaic names.
:param dict region:
A GeoJSON polygon region
:param tuple bbox:
A min_lon, min_lat, max_lon, max_lat tuple.
:param datetime start_date:
The earliest date to use. Note that this check is based on the
first_acquired metadata for the mosaic.
:param datetime end_date:
The earliest date to use. Note that this check is based on the
last_acquired metadata for the mosaic.
:param int nthreads:
Number of concurrent downloads.
:param bool flat:
By default, quads will be placed in separate, newly-created
folders. This option places all quads in the specified folder with
the mosaic name included in the filename.
:param str filename_template:
A {} style format string with the keys "mosaic", "level", "x", "y".
Defaults to the Content-Deposition sepecified by the API. (i.e.
typically "L{z}-{x}E-{y}N.tif")
"""
if flat and not filename_template:
filename_template = '{mosaic}-L{level}-{x:04d}E-{y:04d}N.tif'
def all_quads():
for mosaic in self.mosaics(start_date, end_date):
for quad in mosaic.quads(bbox, region):
if quad.downloadable:
yield quad
def download(quad):
filename, output_dir = None, None
if not flat:
output_dir = quad.mosaic_name
if filename_template is not None:
filename = filename_template.format(mosaic=quad.mosaic_name,
level=quad.level,
x=quad.x,
y=quad.y)
return quad.download(filename=filename, output_dir=output_dir)
groups = _chunks(all_quads(), 4 * nthreads)
with ThreadPoolExecutor(nthreads) as executor:
for group in groups:
for path in executor.map(download, group):
yield path
class Mosaic(object):
"""Representation of a single mosaic."""
def __init__(self, info, client=None):
self.client = _get_client(client)
self.id = info['id']
self.name = info['name']
self.level = info['level']
self.info = info
iso = '%Y-%m-%dT%H:%M:%S.%fZ'
self.start_date = dt.datetime.strptime(self.info['first_acquired'], iso)
self.end_date = dt.datetime.strptime(self.info['last_acquired'], iso)
@classmethod
def from_name(cls, name, client=None):
"""Look up a mosaic by name in the Planet Basemaps API."""
client = _get_client(client)
info = next(client._list('mosaics', name__is=name))
return cls(info, client)
@classmethod
def from_id(cls, mosaic_id, client=None):
"""Look up a mosaic by ID in the Planet Basemaps API."""
client = _get_client(client)
info = client._item('mosaics/{}'.format(mosaic_id))
return cls(info, client)
def _bbox_search(self, bbox):
if bbox is None:
bbox = self.info['bbox']
endpoint = 'mosaics/{}/quads'.format(self.id)
bbox = ','.join(str(item) for item in bbox)
return self.client._consume_pages(endpoint, bbox=bbox)
def _region_search(self, region):
endpoint = 'mosaics/{}/quads/search'.format(self.id)
return self.client._query(endpoint, 'items', region)
def quads(self, bbox=None, region=None):
"""
Retrieve info for all quads within a specific AOI specified as either a
lon/lat ``bbox`` or a geojson ``region``.
:param tuple bbox:
A 4-item tuple of floats. Expected to be (longitude_min,
latitude_min, longitude_max, latitude_max).
:param dict region:
A GeoJSON geometry (usually polygon or multipolygon, not a feature
collection) in WGS84 representing the exact AOI.
"""
if region:
quads = self._region_search(region)
else:
quads = self._bbox_search(bbox)
for info in quads:
yield MosaicQuad(info, self, self.client)
def download_quads(self, output_dir=None, bbox=None, region=None,
nthreads=16, filename_template=None):
"""
Download mosaic data to a local directory for a specific AOI specified
as either a lon/lat ``bbox`` or a geojson ``region``.
:param tuple bbox:
A 4-item tuple of floats. Expected to be (longitude_min,
latitude_min, longitude_max, latitude_max).
:param dict region:
A GeoJSON geometry (usually polygon or multipolygon, not a feature
collection) in WGS84 representing the exact AOI.
:param int nthreads:
Number of concurrent downloads.
:param str filename_template:
A {} style format string with the keys "mosaic", "level", "x", "y".
Defaults to the Content-Deposition sepecified by the API. (i.e.
typically "L{z}-{x}E-{y}N.tif")
"""
def download(quad):
if filename_template is not None:
filename = filename_template.format(mosaic=self.name,
level=quad.level,
x=quad.x,
y=quad.y)
else:
filename = None
quad.download(filename=filename, output_dir=output_dir)
quads = self.quads(bbox, region)
groups = _chunks(quads, 4 * nthreads)
with ThreadPoolExecutor(nthreads) as executor:
for group in groups:
for path in executor.map(download, group):
yield path
class MosaicQuad(object):
"""A representation of a single quad."""
def __init__(self, info, mosaic=None, client=None):
self.client = _get_client(client)
self.info = info
self.id = info['id']
self.coverage = self.info.get('percent_complete')
self.bounds = self.info.get('bbox')
if mosaic is None:
# Bit odd that quads don't include a mosaic ID directly...
mosaic_id = self.info['_links']['_self'].split('/')[-3]
mosaic = Mosaic.from_id(mosaic_id)
self.mosaic = mosaic
self.mosaic_name = mosaic.name
x, y = self.id.split('-')
self.x = int(x)
self.y = int(y)
self.level = self.mosaic.level
@property
def downloadable(self):
"""Whether or not you have download permissions for the quad."""
return 'download' in self.info['_links']
def download(self, filename=None, output_dir=None):
"""Download quad data locally."""
url = self.info['_links'].get('download')
if url:
return self.client._download(url, filename, output_dir)
def contribution(self):
"""
Planet data api URLs for each scene that contributed to this quad.
"""
url = self.info['_links'].get('items')
if url:
data = self.client._get(url)
return [item['link'] for item in data['items']]
else:
return []
if __name__ == '__main__':
main()
@brendancol
Copy link

@joferkington Hey thanks for this script!

I'm testing it locally with the example command (python download_mosaic_series.py "Global Monthly" --region=-85,35,-85,35) and verified my API env var is set correctly.

I'm currently getting a requests.exceptions.HTTPError: 400 Client Error on the quads/search POST request.

I'll keep testing...but any ideas on how to fix?

STACK TRACE

  File "/Users/brendan/nrc-planet-discovery/download_mosaic_series.p
y", line 279, in _query
    response = self._post(url, json_query)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/brendan/nrc-planet-discovery/download_mosaic_series.p
y", line 303, in _post
    rv.raise_for_status()
  File "/Users/  File "/Users/brendan/nrc-planet-discovery/download_mosaic_series.p
y", line 279, in _query
    response = self._post(url, json_query)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/brendan/nrc-planet-discovery/download_mosaic_series.p
y", line 303, in _post
    rv.raise_for_status()
  File "/Users/me/miniconda3/envs/gis/lib/python3.11/site-packa
ges/requests/models.py", line 1021, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url
: https://api.planet.com/basemaps/v1/mosaics/48fff803-4104-49bc-b913
-7467b7a5ffb5/quads/search
/miniconda3/envs/gis/lib/python3.11/site-packa
ges/requests/models.py", line 1021, in raise_for_status
    raise HTTPError(http_error_msg, response=self)



requests.exceptions.HTTPError: 400 Client Error: Bad Request for url
: https://api.planet.com/basemaps/v1/mosaics/48fff803-4104-49bc-b913
-7467b7a5ffb5/quads/search

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment