Skip to content

Instantly share code, notes, and snippets.

@jakebrinkmann
Last active April 12, 2024 12:42
Show Gist options
  • Save jakebrinkmann/c3bc9ec04c3a171028b9882bcdbe759d to your computer and use it in GitHub Desktop.
Save jakebrinkmann/c3bc9ec04c3a171028b9882bcdbe759d to your computer and use it in GitHub Desktop.
Python module for straight-forward EarthExplorer Machine-to-Machine

marine-toad-machine

For good luck

Dependencies

TODO

Installation

TODO

Usage

Example:

python download_m2m.py \
	-d /d/data/TESTING/ --max 5 --threads 4 \
	--dataset ARD_TILE --products QA,SR,TOA,BT \
	--temporal "1984-01-01,2017-10-15" \
	--fields '{"Grid Region": "AK", "Spacecraft": "LANDSAT_8", "Horizontal": 3, "Vertical": 2}'

Error messages by key:

# python download_m2m.py -d ~/data/l2-albers/ -u user1234 --fields '{"Water": 29}' --dataset ARD_TILE --products QA

Search "Water" failed. Available:
* Tile Identifier
* Tile Grid Region
* Tile Grid Horizontal
* Tile Grid Vertical
* Tile Production Date (Ex. YYYY/MM/DD)
* Cloud Cover
* Cloud Shadow
* Snow Ice
* Fill (No Data)
* Spacecraft Identifier
* Sensor Identifier

Error messages by value:

# python download_m2m.py -d ~/data/l2-albers/ -u user1234 --fields '{"Horizontal": 29, "Veritcal": 29, "Sensor": "ETM+"}' --dataset ARD_TILE --products QA

"Sensor Identifier" invalid value not found: ETM+
* OLI_TIRS: OLI_TIRS
* All: None
* TM: TM
* ETM: ETM

Credentials:

# cat ~/.m2m-config.ini
[user]
username=user1234
password=secret

# export M2M_DOWNLOAD_CREDENTIALS_FILE="~/.m2m-config.ini"
# export M2M_DOWNLOAD_CREDENTIALS_PROFILE="user"
"""
Data download script for EarthExplorer Machine-to-Machine
download_m2m('/path/to/downloads', username='user1234', dataset='ARD_TILE',
products='TOA,BT,SR,PQA', threads=40,
fields={'Region': 'CU', 'Spacecraft': 'LANDSAT_8'})
More M2M documentation: https://earthexplorer.usgs.gov/inventory/documentation
Author: Jake Brinkmann <jacob.brinkmann.ctr@usgs.gov>
LSDS Science Research and Development (LSRD) Project
Date: 10/30/2017
This code is in the public domain.
It is provided AS-IS WITH NO WARRANTY WHATSOEVER.
"""
import requests, json, os, sys, getpass, urllib3, time
from multiprocessing import Pool # use threads for I/O bound tasks
from argparse import ArgumentParser
import ConfigParser
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def message(msg, stop=False):
stdout = sys.stderr
if isinstance(msg, list):
msg = '\n'.join(msg)
if stop:
msg = '\n' + msg + '\n'
stdout.write(msg)
stdout.flush()
if stop:
exit(1)
class EarthExplorer(object):
""" Web-Service interface for EarthExplorer JSON Machine-to-Machine API """
def __init__(self, version='1.4.1'):
self.baseurl = 'https://earthexplorer.usgs.gov/inventory/json/v/%s/' % version
def _api(self, endpoint='login', body=None):
body = {'jsonRequest': json.dumps(body)} if body else {}
r = requests.post(self.baseurl+endpoint, data=body)
r.raise_for_status()
dat = r.json()
if dat.get('error'):
message(': '.join([dat.get('errorCode'), dat.get('error')]), stop=True)
return dat
@classmethod
def login(cls, username, password=None):
if password is None:
password = getpass.getpass('Password (%s): ' % username)
payload = {'username': username, 'password': password}
return cls()._api('login', payload).get('data')
@classmethod
def search(cls, **kwargs):
return cls()._api('search', kwargs).get('data')
@classmethod
def idlookup(cls, **kwargs):
return cls()._api('idlookup', kwargs).get('data')
@classmethod
def metadata(cls, **kwargs):
return cls()._api('metadata', kwargs).get('data')
@classmethod
def download(cls, **kwargs):
return cls()._api('download', kwargs).get('data')
@classmethod
def downloadoptions(cls, **kwargs):
return cls()._api('downloadoptions', kwargs).get('data')
@classmethod
def datasets(cls, **kwargs):
return cls()._api('datasets', kwargs).get('data')
@classmethod
def datasetfields(cls, **kwargs):
return cls()._api('datasetfields', kwargs).get('data')
@classmethod
def additionalCriteriaValues(cls, api_key, dataset, filters):
""" Attempts to build a complex search based on some simple JSON input
Example: filters = {"Path": 29, "Row": 29, "Sensor": "ETM+" }
TODO: Add support for AND/OR/BETWEEN searches
"""
fields = cls.datasetfields(apiKey=api_key, datasetName=dataset)
k = 'additionalCriteria'
criteria = {k: {"filterType": "and", "childFilters": []}}
for look_for, compare in filters.items():
field_matches = [f for f in fields if look_for in f['name']]
if len(field_matches) > 1:
message(['Search "%s" not unique. Found:' % look_for]
+ ['* %s' % f['name'] for f in field_matches], stop=True)
elif len(field_matches) < 1:
message(['Search "%s" failed. Available:' % look_for]
+ ['* %s' % f['name'] for f in fields], stop=True)
field_id = int(field_matches[0]['fieldId'])
field_name = field_matches[0]['name']
selections = field_matches[0].get('valueList')
mapping = {str(s['name']): str(s['value']) for s in selections}
if not isinstance(compare, (list, dict)):
if mapping and compare not in mapping.values():
message(['"%s" invalid value not found: %s' % (field_name, compare)]
+ ['* %s: %s' % (k, v) for k, v in mapping.items()],
stop=True)
search = {"filterType": "value", "fieldId": field_id, "value": compare}
criteria[k]['childFilters'].append(search)
return criteria
@staticmethod
def temporalCriteria(temporal):
dates = temporal.split(',')
sd, ed = dates if len(dates) == 2 else dates * 2
return {"temporalFilter":{"dateField":"search_date","startDate":sd,"endDate":ed}}
def build_command_line_arguments():
description = ('Search and download data (skip those already downloaded)')
parser = ArgumentParser(description=description, add_help=False)
parser.add_argument('--help', action='help', help='show this help message and exit')
parser.add_argument('-d', '--directory', type=str, dest='directory', required=True, metavar='PATH',
help='Relative path to download all data')
parser.add_argument('-u', '--username', type=str, dest='username', default=None, metavar='STR',
help='ERS Username (with full M2M download access)')
parser.add_argument('-t', '--threads', type=int, dest='threads', default=40, metavar='INT',
help='Number of parallel download threads [Default: 40]')
parser.add_argument('-b', '--batch', type=int, dest='batch', default=1000, metavar='INT',
help='Batch size iteration of URLs to receive [Default: 1000]')
parser.add_argument('-m', '--max', type=int, dest='N', default=50000, metavar='INT',
help='Maximum number of search results to return [Default: 50000]')
parser.add_argument('--dataset', type=str, dest='dataset', default='ARD_TILE', metavar='STR',
help='EE Catalog dataset [Default: ARD_TILE]')
parser.add_argument('--products', type=str, dest='products', default='STANDARD', metavar='STR',
help='Comma-delimited products to download [Default: STANDARD]')
parser.add_argument('--temporal', type=str, dest='temporal', default=None, metavar='STR',
help='Search Date Acquired (YYYY-MM-DD or YYYY-MM-DD,YYYY-MM-DD)')
parser.add_argument('--fields', type=json.loads, dest='fields', default=None, metavar='JSON',
help='Filter results based on dataset-specific metadata fields')
args = parser.parse_args()
return args
def download_url(x):
fileurl, directory = x[0], x[1]
head = requests.head(fileurl, timeout=300)
filename = head.headers['Content-Disposition'].split('filename=')[-1]
local_fname = os.path.join(directory, filename)
if os.path.exists(local_fname):
message('Already exists: %s \n' % local_fname)
return
file_size = None
if 'Content-Length' in head.headers:
file_size = int(head.headers['Content-Length'])
bytes_recv = 0
if os.path.exists(local_fname + '.part'):
bytes_recv = os.path.getsize(local_fname + '.part')
message("Downloading %s ... \n" % local_fname)
resume_header = {'Range': 'bytes=%d-' % bytes_recv}
sock = requests.get(fileurl, headers=resume_header, timeout=300,
stream=True, verify=False, allow_redirects=True)
start = time.time()
f = open(local_fname + '.part', 'ab')
bytes_in_mb = 1024*1024
for block in sock.iter_content(chunk_size=bytes_in_mb):
if block:
f.write(block)
bytes_recv += len(block)
f.close()
ns = time.time() - start
mb = bytes_recv/float(bytes_in_mb)
message("%s (%3.2f (MB) in %3.2f (s), or %3.2f (MB/s)) \n" % (filename, mb, ns, mb/ns))
if bytes_recv >= file_size:
os.rename(local_fname + '.part', local_fname)
def download_url_wrapper(x):
try:
download_url(x)
except Exception as e:
message('\n\n *** Failed download %s: %s \n' % (x, str(e)))
def chunkify(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
def read_credentials(config_path, section='default'):
config = ConfigParser.SafeConfigParser()
config.read(config_path)
cfg_info = dict()
for opt in config.options(section):
cfg_info[opt] = config.get(section, opt)
return cfg_info
def credentials(username=None):
if username is None:
cfgpath = os.getenv('M2M_DOWNLOAD_CREDENTIALS_FILE')
cfgsection = os.getenv('M2M_DOWNLOAD_CREDENTIALS_PROFILE', 'default')
if cfgpath:
return read_credentials(os.path.expanduser(cfgpath), cfgsection)
else:
return {'username': raw_input('ERS Username: ')}
else:
return {'username': username}
def download_m2m(directory, username=None, products='STANDARD', dataset='ARD_TILE',
N=50000, temporal=None, batch=1000, threads=40, fields=None):
"""
Search for and download Landsat Level-2 products to local directory
Args:
directory: Relative path to local directory (will be created)
username: ERS Username (with full M2M download access) [Optional]
dataset: EarthExplorer Catalog datasetName [Default: ARD_TILE]
N: Maximum number of search results to return
products: Comma-delmited list of download products [Default: STANDARD]
temporal: Search Date image acquired [ Format: %Y-%m-%d or %Y-%m-%d,%Y-%m-%d ]
batch: How many URLs to request before working on downloads
threads: Number of download threads to launch in parallel
fields: JSON dataset-specific metadata fields (see #additionalCriteria)
"""
api_key = EarthExplorer.login(**credentials(username))
datasets = EarthExplorer.datasets(apiKey=api_key, datasetName=dataset, publicOnly=False)
matches = [m['datasetName'] for m in datasets]
if len(matches) > 1 and not any([m == dataset for m in matches]):
message(['Multiple dataset matches found, please select only 1: ']
+ ['* [%s]: %s' % (m['datasetName'], m['datasetFullName']) for m in datasets], stop=True)
search = dict(apiKey=api_key, datasetName=dataset, maxResults=N)
if fields:
search.update(EarthExplorer.additionalCriteriaValues(api_key, dataset, fields))
if temporal:
search.update(EarthExplorer.temporalCriteria(temporal=temporal))
results = EarthExplorer.search(**search)
n_results = results['totalHits']
product_ids = results['results']
message('Total search results: %d \n' % n_results)
if len(product_ids) < 1:
return
if not os.path.exists(directory):
os.makedirs(directory)
product_selection_verified = False
for pids in chunkify(product_ids, batch):
pool = Pool(threads)
entities = [p['entityId'] for p in pids]
if not product_selection_verified:
product_avail = EarthExplorer.downloadoptions(apiKey=api_key, datasetName=dataset, entityIds=entities)
product_avail = set(p.get('productcode') or p.get('downloadCode') for x in product_avail for p in x['downloadOptions'])
valid_prods = list(set(products.split(',')) & product_avail)
if len(valid_prods) < 1:
message(['"%s" products not available. Choose from:' % products]
+ ['* %s' % m for m in list(product_avail)], stop=True)
else:
product_selection_verified = True
results = EarthExplorer.download(apiKey=api_key, datasetName=dataset,
products=products.split(','), entityIds=entities)
urls = [(r['url'], directory) for r in results]
urls = [u for u in urls if all(u)]
if urls:
pool.map_async(download_url_wrapper, urls).get(6000)
pool.close()
pool.join()
if __name__ == '__main__':
download_m2m(**vars(build_command_line_arguments()))
requests==2.18.4
@evan-blaisdell
Copy link

evan-blaisdell commented Jul 29, 2019

I stumbled across your delightful script. I just can't get the Cloud Cover parameter to work, though. Providing the parameter, for example, {"Cloud Cover" : "20"}, always returns zero results. However, providing a nonsense value for cloud cover results in an error and gives me the list of valid values, including 20. Are you still maintaining this? Thanks!

$ python ~/m2m.py -d . -u XXXX --dataset ARD_TILE --products METADATA --temporal "2019-01-01,2019-12-31" --fields '{"Cloud Cover": "asdf"}'
Password (XXXX):
Fields: {u'Cloud Cover': u'asdf'}
"Cloud Cover" invalid value not found: asdf
* Less than 50%: 50
* All: None
* Less than 80%: 80
* Less than 10%: 10
* Less than 40%: 40
* Less than 70%: 70
* Less than 30%: 30
* Less than 60%: 60
* Less than 100%: 100
* Less than 90%: 90
* Less than 20%: 20

$ python ~/m2m.py -d . -u XXXX --dataset ARD_TILE --products METADATA --temporal "2019-01-01,2019-12-31" --fields '{"Cloud Cover": "20"}'
Password (XXXX):
Fields: {u'Cloud Cover': u'20'}
Total search results: 0

@jakebrinkmann
Copy link
Author

jakebrinkmann commented Jul 30, 2019 via email

@evan-blaisdell
Copy link

Thanks. I hacked around it a bit and got it to work!

@aklyussef
Copy link

Thanks. I hacked around it a bit and got it to work!

Do you mind sharing your solution?

@evan-blaisdell
Copy link

Thanks. I hacked around it a bit and got it to work!

Do you mind sharing your solution?

Hello. The way I got the cloud cover search to work was by simply adding search['maxCloudCover'] = '20' to the search dictionary before line 254 in the original code, instead of adding it to the --fields parameter. I'm sure you could parameterize it more neatly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment