Last active
June 21, 2023 08:33
-
-
Save plouvart/c14a6d60bf3840794e008fb7046e41a0 to your computer and use it in GitHub Desktop.
Download Sentinel2 data from LSA
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import typing | |
import time | |
import datetime | |
import pathlib | |
import xml.etree.ElementTree as ET | |
import re | |
import fiona | |
import geopandas as gpd | |
from shapely.geometry import Polygon | |
import shutil | |
import subprocess | |
import requests | |
""" | |
LSA helper functions | |
""" | |
def _extract_date(product): | |
return datetime.datetime.fromisoformat( | |
product.getroot().find("./{http://purl.org/dc/elements/1.1/}date").text[:-1] | |
) | |
def _is_s2_product(product): | |
return ( | |
"S2_MSIL2A" in product.getroot().find("./{http://www.w3.org/2005/Atom}id").text | |
) | |
def _extract_footprint(product): | |
ns2 = "{http://www.georss.org/georss}" | |
ns3 = "{http://www.opengis.net/gml}" | |
coords = ( | |
product.getroot() | |
.find(f"./{ns2}where/{ns3}Polygon/{ns3}exterior/{ns3}LinearRing/{ns3}posList") | |
.text | |
) | |
return Polygon( | |
[ | |
(float(identified[2]), float(identified[1])) | |
for identified in re.finditer("(-?\d+\.\d+)\s+(-?\d+\.\d+)", coords) | |
] | |
) | |
def _extract_cloud_coverage(product): | |
summary = product.getroot().find("./{http://www.w3.org/2005/Atom}summary").text | |
root = ET.fromstring("<summary>" + summary.replace("&", "&") + "</summary>") | |
text = root.find(".//*[.='Cloud Coverage (%)']/../../td[4]").text | |
return float(text if "None" not in text else 0) | |
def _get_preview_url(input_file): | |
root = ET.parse(input_file).getroot() | |
if root.find("./id"): | |
raise Exception("Error reading .xml file") | |
if ( | |
matched := re.fullmatch( | |
"https://collgs\.lu/catalog/oseo/.*?parentId=(.*?)&uid=(.*?)&.*", | |
root.find("./{http://www.w3.org/2005/Atom}id").text, | |
) | |
) is None: | |
raise Exception("Couldn't match correct id and preview filename") | |
parentId, filename = matched.groups() | |
return ( | |
f"https://collgs.lu/catalog/oseo/quicklook?parentId={parentId}&uid={filename}" | |
) | |
def _get_product_identifier(input_file): | |
root = ET.parse(input_file).getroot() | |
if root.find("./id"): | |
raise Exception("Error reading .xml file") | |
return root.find("./{http://purl.org/dc/elements/1.1/}identifier").text | |
def _get_product_url(input_file): | |
root = ET.parse(input_file).getroot() | |
if root.find("./id"): | |
raise Exception("Error reading .xml file") | |
if ( | |
matched := re.fullmatch( | |
"repository/(.*/)(.*\.zip)", | |
root.find("./{http://www.w3.org/2005/Atom}link[@rel='enclosure']").attrib[ | |
"href" | |
], | |
) | |
) is None: | |
raise Exception("Couldn't match correct path and filename") | |
path, filename = matched.groups() | |
return f"https://data.collgs.lu/repository_standard/{path}{filename}?" | |
def _sizeof_fmt(num, suffix="B"): | |
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]: | |
if abs(num) < 1024.0: | |
return f"{num:3.1f}{unit}{suffix}" | |
num /= 1024.0 | |
return f"{num:.1f}Yi{suffix}" | |
def _format_shape(shapefile: pathlib.Path) -> str: | |
geometry = gpd.read_file(shapefile).to_crs(epsg=4326).geometry[0] | |
print(geometry) | |
type_ = geometry.type | |
if type_ == "Polygon": | |
x, y = geometry.exterior.coords.xy | |
return "POLYGON(({}))".format(",".join([f"{x_} {y_}" for x_, y_ in zip(x, y)])) | |
raise Exception("Invalid shape") | |
def _format_date(date: datetime.datetime) -> str: | |
return date.replace(microsecond=0).isoformat() + ".000Z" | |
""" | |
LSA exposed functions | |
""" | |
def download( | |
username: str, | |
password: typing.Annotated[str, ("writeOnly", True)], | |
input_file: typing.Annotated[ | |
pathlib.Path, | |
("format", "file-path"), | |
("acceptMode", "open"), | |
("filter", "*.xml"), | |
], | |
output_file: typing.Annotated[ | |
pathlib.Path, | |
("format", "file-path"), | |
("acceptMode", "save"), | |
("filter", "*.zip"), | |
], | |
attempts: typing.Annotated[int, ("minimum", 1)] = 3, | |
delay: int = 600, | |
property: typing.Literal["product", "preview"] = "product", | |
): | |
"""Download from LSA Data Center | |
Download available satellite products from LSA Data center based on query results. | |
Parameters | |
---------- | |
username : str | |
Username to connect to LSA Data Center | |
password : str | |
Password to connect to LSA Data Center | |
input_file : pathlib.Path | |
Input file | |
output_file : pathlib.Path | |
Output archive/preview downloaded | |
attempts : int | |
Number of attempts for each download | |
delay : int | |
Delay between each attempts in seconds | |
property : typing.Literal | |
The product type to download (default=product) | |
""" | |
url = None | |
if property == "product": | |
url = _get_product_url(input_file) | |
elif property == "preview": | |
url = _get_preview_url(input_file) | |
else: | |
raise Exception(f"Unknown type of product to download ({property})") | |
chunk_size = 1024 * 1024 | |
while 1: | |
try: | |
# Attempt to open a session on LSA data center website | |
session = requests.session() | |
c = session.get( | |
"https://sso.collgs.lu/auth/realms/lucollgs/protocol/openid-connect/auth?client_id=account&response_mode=fragment&response_type=code&redirect_uri=https://collgs.lu/geocatalog.html" | |
).content.decode() | |
url_post = ( | |
re.match('.*action="(.*?)".*', c, re.DOTALL) | |
.groups()[0] | |
.replace("&", "&") | |
) | |
login_data = dict(username=username, password=password) | |
r = session.post(url_post, data=login_data) | |
login_search = re.search("-//W3C//DTD XHTML 1.0 Transitional//EN", r.text) | |
# Check login was successful | |
if not login_search == None: | |
raise Exception("Login unsuccessful, please check username or password") | |
elif login_search == None: | |
print("Login successful") | |
# GET session | |
headers = session.get(url, stream=True).headers | |
size = ( | |
0 if "Content-length" not in headers else int(headers["Content-length"]) | |
) | |
if "Content-Disposition" in headers: | |
try: | |
filename = re.match( | |
".*?filename=(.*)", headers["Content-Disposition"] | |
).groups()[0] | |
except: | |
pass | |
print(f"Attempting to download {input_file} from {url}") | |
with session.get(url, stream=True) as r: | |
r.raise_for_status() | |
with open(output_file, "wb") as f: | |
cumul = 0 | |
for chunk in r.iter_content(chunk_size=chunk_size): | |
cumul_str = _sizeof_fmt(cumul) | |
if size: | |
print( | |
f"Downloading {input_file}: downloaded: {cumul_str} - {cumul / size * 100:.2g}%\033[K", | |
end="\r", | |
) | |
else: | |
print( | |
f"Downloading {input_file}: downloaded: {cumul_str} - ??%\033[K", | |
end="\r", | |
) | |
f.write(chunk) | |
cumul += chunk_size | |
print("\nProduct successfuly downloaded\n") | |
return | |
except Exception as e: | |
print( | |
f"\nCouldn't download {output_file} from {url} (Caught exception {e})\n" | |
) | |
if attempts <= 0: | |
raise Exception( | |
f"Maximum attempts reached. LSA product {input_file} could not be downloaded" | |
) | |
attempts -= 1 | |
# Wait until next try | |
time.sleep(delay) | |
def to_geojson( | |
input_file: typing.Annotated[ | |
pathlib.Path, | |
("format", "file-path"), | |
("acceptMode", "open"), | |
("filter", "*.xml"), | |
], | |
output_file: typing.Annotated[ | |
pathlib.Path, | |
("format", "file-path"), | |
("acceptMode", "save"), | |
("filter", "*.json"), | |
], | |
): | |
"""LSA to GeoJSON | |
Extract geometry from LSA XML file and create a GeoJSON from it | |
Parameters | |
---------- | |
input_file: typing.Annotated[pathlib.Path] | |
LSA XML file to extract geometry from | |
output_file: typing.Annotated[pathlib.Path] | |
GeoJSON generated from the XML file | |
""" | |
poly = _extract_footprint(ET.parse(input_file)) | |
schema = { | |
"geometry": "Polygon", | |
"properties": {"id": "int"}, | |
} | |
with fiona.open(output_file, "w", "GeoJSON", schema, crs="epsg:4326") as handle: | |
handle.write( | |
{ | |
"geometry": mapping(poly), | |
"properties": {"id": 0}, | |
} | |
) | |
return | |
def query( | |
username: str, | |
password: typing.Annotated[str, ("writeOnly", True)], | |
output_folder: typing.Annotated[ | |
pathlib.Path, ("format", "directory-path"), ("acceptMode", "save") | |
], | |
product_type: typing.Literal["S2_MSIL1C", "S2_MSIL2A", "S1_SAR_GRD"], | |
time_start: datetime.datetime, | |
time_end: datetime.datetime, | |
footprint: typing.Annotated[ | |
pathlib.Path, | |
("format", "file-path"), | |
("acceptMode", "open"), | |
("filter", "*.shp"), | |
], | |
minimum_cloud_coverage: typing.Annotated[int, ("minimum", 0), ("maximum", 100)] = 0, | |
maximum_cloud_coverage: typing.Annotated[ | |
int, ("minimum", 0), ("maximum", 100) | |
] = 100, | |
count: typing.Union[ | |
typing.Annotated[int, ("minimum", 0)], typing.Literal["all"] | |
] = "all", | |
start: typing.Annotated[int, ("minimum", 0)] = 0, | |
): | |
"""Query LSA Data Center | |
Request available satellite products from LSA Data Center and save query results to a specified folder. | |
Parameters | |
---------- | |
username : str | |
Username to connect to LSA Data Center | |
password : str | |
Password to connect to LSA Data Center | |
output_folder : pathlib.Path | |
Output folder to store query results | |
product_type : typing.Literal | |
Type of satellite product to query | |
time_start : datetime.datetime | |
Start date of the query | |
time_end : datetime.datetime | |
End date of the query | |
footprint : pathlib.Path | |
Path to a shapefile of the region to query where the CRS must be EPSG:4326 | |
minimum_cloud_coverage : int | |
Minimum cloud cover percentage in [0, 100] | |
maximum_cloud_coverage : int | |
Maximum cloud cover percentage in [0, 100] | |
count : int | |
Quantity of products to query (default=all) | |
start : int | |
Index of the product to start querying from (default=0) | |
""" | |
print(output_folder, output_folder.is_dir(), output_folder.name) | |
parameters = {} | |
download_all = count == "all" | |
if count != "all": | |
parameters["count"] = f"count={count}" | |
rows_max = count | |
else: | |
parameters["count"] = f"count=50" | |
rows_max = 50 | |
if product_type: | |
parameters["product_type"] = f"parentId={product_type}" | |
if time_start: | |
parameters["time_start"] = f"timeStart={_format_date(time_start)}" | |
if time_end: | |
parameters["time_end"] = f"timeEnd={_format_date(time_end)}" | |
if footprint: | |
parameters["footprint"] = f"geometry={_format_shape(footprint)}" | |
# put a limit in the number of points of the footprint. Maybe 30? | |
# raise error to input something easier to read/smaller | |
if minimum_cloud_coverage is not None or maximum_cloud_coverage is not None: | |
minimum_cloud_coverage = ( | |
int(minimum_cloud_coverage) if minimum_cloud_coverage is not None else 0 | |
) | |
maximum_cloud_coverage = ( | |
int(maximum_cloud_coverage) if maximum_cloud_coverage is not None else 100 | |
) | |
parameters[ | |
"cloud_cover_percentage" | |
] = f"cloudCover=%5B{minimum_cloud_coverage},{maximum_cloud_coverage}%5D" | |
search_parameters = "&".join([value for value in parameters.values()]) | |
output_folder.mkdir(parents=True, exist_ok=True) | |
rows = min(rows_max, 50) | |
start = start or 0 | |
page = 0 | |
blank = False | |
while True: | |
extra_parameters = {} | |
extra_parameters["start"] = f"startIndex={start+1}" | |
search_extra_parameters = "&".join( | |
[value for value in extra_parameters.values()] | |
) | |
query = f"https://collgs.lu/catalog/oseo/search?{search_parameters}&{search_extra_parameters}&httpAccept=atom" | |
output_file = output_folder / f"lsa_lsa_query_query_{page}-{start}-{rows}.xml" | |
cmd = [ | |
"wget", | |
"--no-check-certificate", | |
f"--user={username}", | |
f"--password={password}", | |
f"--output-document={output_file}", | |
f"{query}", | |
] | |
print(" ".join(cmd)) | |
if not blank: | |
attempt = 0 | |
while True: | |
print( | |
f"Offset {start}{'' if not attempt else attempt}: launching {' '.join(cmd)}" | |
) | |
try: | |
p = subprocess.Popen(cmd) | |
p.wait() | |
time.sleep(2) | |
break | |
except: | |
print("Too many requests, trying again in 10 seconds...") | |
print(f"On attempt {attempt}...") | |
time.sleep(10) | |
attempt += 1 | |
else: | |
break | |
rows_total = int( | |
ET.parse(output_file) | |
.getroot() | |
.find("{http://a9.com/-/spec/opensearch/1.1/}totalResults") | |
.text | |
) | |
start += 50 | |
page += 1 | |
rows = min(rows_total - start, 50) | |
if not download_all: | |
rows = min(rows_max - start, rows) | |
if rows <= 0: | |
break | |
print("Query ended!") | |
def split_query( | |
input_file: typing.Annotated[ | |
pathlib.Path, | |
("format", "file-path"), | |
("acceptMode", "open"), | |
("filter", "*.xml"), | |
], | |
output_folder: typing.Annotated[ | |
pathlib.Path, ("format", "directory-path"), ("acceptMode", "save") | |
], | |
): | |
"""Split LSA query | |
Split LSA Data Center query results into individual XML entries. | |
Parameters | |
---------- | |
input_file : pathlib.Path | |
LSA Data Center XML query file for splitting | |
output_folder : pathlib.Path | |
Output folder to store split XML files | |
""" | |
output_folder.mkdir(parents=True, exist_ok=True) | |
entries = ( | |
ET.parse(input_file).getroot().findall("./{http://www.w3.org/2005/Atom}entry") | |
) | |
for entry in entries: | |
title = re.fullmatch( | |
"https://collgs\.lu/catalog/oseo/.*?&uid=(.*?)&.*", | |
entry.find("./{http://www.w3.org/2005/Atom}id").text, | |
).groups()[0] | |
filename = output_folder / f"{title}.xml" | |
with open(filename, "wb") as f: | |
ET.ElementTree(element=entry).write(f) | |
def to_epsg( | |
input_file: typing.Annotated[ | |
pathlib.Path, | |
("format", "file-path"), | |
("acceptMode", "open"), | |
("filter", "*.xml"), | |
] | |
) -> str: | |
"""LSA Info File to EPSG | |
Get the EPSG of the given LSA xml info file. | |
Parameters | |
---------- | |
input_file: pathlib.Path | |
LSA info xml file | |
Returns | |
------- | |
epsg: str | |
The EPSG of the file | |
""" | |
identifier = _get_product_identifier(input_file) | |
# 326 == North, 327 == South | |
epsg = ("326" if identifier.split("_")[5][3] > "M" else "327") + identifier.split( | |
"_" | |
)[5][1:3] | |
return epsg | |
if __name__ == "__main__": | |
username = "<your_username>" | |
password = "<password_test>" | |
lsa_query_results_folder = pathlib.Path("./lsa_query_files") | |
lsa_product_info_folder = pathlib.Path("./lsa_product_info") | |
sentinel_products_folder = pathlib.Path("./s2_products") | |
lsa_query_results_folder.mkdir(parents=True, exist_ok=True) | |
lsa_product_info_folder.mkdir(parents=True, exist_ok=True) | |
sentinel_products_folder.mkdir(parents=True, exist_ok=True) | |
date_start = "2022-01-01" | |
date_end = "2022-01-16" | |
footprint_file = pathlib.Path("roi/roi_all_convex_4326.shp") | |
# First query info into a foyou want info to be ! | |
query( | |
username=username, | |
password=password, | |
output_folder=lsa_query_results_folder, | |
product_type="S2_MSIL2A", | |
time_start=datetime.datetime.fromisoformat(date_start), | |
time_end=datetime.datetime.fromisoformat(date_end), | |
footprint=pathlib.Path(footprint_file), | |
) | |
# Split each query file | |
for input_file in lsa_query_results_folder.glob("*.xml"): | |
split_query( | |
input_file=input_file, | |
output_folder=lsa_product_info_folder, | |
) | |
# Download each product | |
for input_file in lsa_product_info_folder.glob("*.xml"): | |
download( | |
username=username, | |
password=password, | |
input_file=input_file, | |
output_file=sentinel_products_folder | |
/ pathlib.Path(input_file.with_suffix(".zip").name), | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment