Skip to content

Instantly share code, notes, and snippets.

@plouvart
Last active June 21, 2023 08:33
Show Gist options
  • Save plouvart/c14a6d60bf3840794e008fb7046e41a0 to your computer and use it in GitHub Desktop.
Save plouvart/c14a6d60bf3840794e008fb7046e41a0 to your computer and use it in GitHub Desktop.
Download Sentinel2 data from LSA
import typing
import time
import datetime
import pathlib
import xml.etree.ElementTree as ET
import re
import fiona
import geopandas as gpd
from shapely.geometry import Polygon
import shutil
import subprocess
import requests
"""
LSA helper functions
"""
def _extract_date(product):
return datetime.datetime.fromisoformat(
product.getroot().find("./{http://purl.org/dc/elements/1.1/}date").text[:-1]
)
def _is_s2_product(product):
return (
"S2_MSIL2A" in product.getroot().find("./{http://www.w3.org/2005/Atom}id").text
)
def _extract_footprint(product):
ns2 = "{http://www.georss.org/georss}"
ns3 = "{http://www.opengis.net/gml}"
coords = (
product.getroot()
.find(f"./{ns2}where/{ns3}Polygon/{ns3}exterior/{ns3}LinearRing/{ns3}posList")
.text
)
return Polygon(
[
(float(identified[2]), float(identified[1]))
for identified in re.finditer("(-?\d+\.\d+)\s+(-?\d+\.\d+)", coords)
]
)
def _extract_cloud_coverage(product):
summary = product.getroot().find("./{http://www.w3.org/2005/Atom}summary").text
root = ET.fromstring("<summary>" + summary.replace("&", "&amp;") + "</summary>")
text = root.find(".//*[.='Cloud Coverage (%)']/../../td[4]").text
return float(text if "None" not in text else 0)
def _get_preview_url(input_file):
root = ET.parse(input_file).getroot()
if root.find("./id"):
raise Exception("Error reading .xml file")
if (
matched := re.fullmatch(
"https://collgs\.lu/catalog/oseo/.*?parentId=(.*?)&uid=(.*?)&.*",
root.find("./{http://www.w3.org/2005/Atom}id").text,
)
) is None:
raise Exception("Couldn't match correct id and preview filename")
parentId, filename = matched.groups()
return (
f"https://collgs.lu/catalog/oseo/quicklook?parentId={parentId}&uid={filename}"
)
def _get_product_identifier(input_file):
root = ET.parse(input_file).getroot()
if root.find("./id"):
raise Exception("Error reading .xml file")
return root.find("./{http://purl.org/dc/elements/1.1/}identifier").text
def _get_product_url(input_file):
root = ET.parse(input_file).getroot()
if root.find("./id"):
raise Exception("Error reading .xml file")
if (
matched := re.fullmatch(
"repository/(.*/)(.*\.zip)",
root.find("./{http://www.w3.org/2005/Atom}link[@rel='enclosure']").attrib[
"href"
],
)
) is None:
raise Exception("Couldn't match correct path and filename")
path, filename = matched.groups()
return f"https://data.collgs.lu/repository_standard/{path}{filename}?"
def _sizeof_fmt(num, suffix="B"):
for unit in ["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"]:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Yi{suffix}"
def _format_shape(shapefile: pathlib.Path) -> str:
geometry = gpd.read_file(shapefile).to_crs(epsg=4326).geometry[0]
print(geometry)
type_ = geometry.type
if type_ == "Polygon":
x, y = geometry.exterior.coords.xy
return "POLYGON(({}))".format(",".join([f"{x_} {y_}" for x_, y_ in zip(x, y)]))
raise Exception("Invalid shape")
def _format_date(date: datetime.datetime) -> str:
return date.replace(microsecond=0).isoformat() + ".000Z"
"""
LSA exposed functions
"""
def download(
username: str,
password: typing.Annotated[str, ("writeOnly", True)],
input_file: typing.Annotated[
pathlib.Path,
("format", "file-path"),
("acceptMode", "open"),
("filter", "*.xml"),
],
output_file: typing.Annotated[
pathlib.Path,
("format", "file-path"),
("acceptMode", "save"),
("filter", "*.zip"),
],
attempts: typing.Annotated[int, ("minimum", 1)] = 3,
delay: int = 600,
property: typing.Literal["product", "preview"] = "product",
):
"""Download from LSA Data Center
Download available satellite products from LSA Data center based on query results.
Parameters
----------
username : str
Username to connect to LSA Data Center
password : str
Password to connect to LSA Data Center
input_file : pathlib.Path
Input file
output_file : pathlib.Path
Output archive/preview downloaded
attempts : int
Number of attempts for each download
delay : int
Delay between each attempts in seconds
property : typing.Literal
The product type to download (default=product)
"""
url = None
if property == "product":
url = _get_product_url(input_file)
elif property == "preview":
url = _get_preview_url(input_file)
else:
raise Exception(f"Unknown type of product to download ({property})")
chunk_size = 1024 * 1024
while 1:
try:
# Attempt to open a session on LSA data center website
session = requests.session()
c = session.get(
"https://sso.collgs.lu/auth/realms/lucollgs/protocol/openid-connect/auth?client_id=account&response_mode=fragment&response_type=code&redirect_uri=https://collgs.lu/geocatalog.html"
).content.decode()
url_post = (
re.match('.*action="(.*?)".*', c, re.DOTALL)
.groups()[0]
.replace("&amp;", "&")
)
login_data = dict(username=username, password=password)
r = session.post(url_post, data=login_data)
login_search = re.search("-//W3C//DTD XHTML 1.0 Transitional//EN", r.text)
# Check login was successful
if not login_search == None:
raise Exception("Login unsuccessful, please check username or password")
elif login_search == None:
print("Login successful")
# GET session
headers = session.get(url, stream=True).headers
size = (
0 if "Content-length" not in headers else int(headers["Content-length"])
)
if "Content-Disposition" in headers:
try:
filename = re.match(
".*?filename=(.*)", headers["Content-Disposition"]
).groups()[0]
except:
pass
print(f"Attempting to download {input_file} from {url}")
with session.get(url, stream=True) as r:
r.raise_for_status()
with open(output_file, "wb") as f:
cumul = 0
for chunk in r.iter_content(chunk_size=chunk_size):
cumul_str = _sizeof_fmt(cumul)
if size:
print(
f"Downloading {input_file}: downloaded: {cumul_str} - {cumul / size * 100:.2g}%\033[K",
end="\r",
)
else:
print(
f"Downloading {input_file}: downloaded: {cumul_str} - ??%\033[K",
end="\r",
)
f.write(chunk)
cumul += chunk_size
print("\nProduct successfuly downloaded\n")
return
except Exception as e:
print(
f"\nCouldn't download {output_file} from {url} (Caught exception {e})\n"
)
if attempts <= 0:
raise Exception(
f"Maximum attempts reached. LSA product {input_file} could not be downloaded"
)
attempts -= 1
# Wait until next try
time.sleep(delay)
def to_geojson(
input_file: typing.Annotated[
pathlib.Path,
("format", "file-path"),
("acceptMode", "open"),
("filter", "*.xml"),
],
output_file: typing.Annotated[
pathlib.Path,
("format", "file-path"),
("acceptMode", "save"),
("filter", "*.json"),
],
):
"""LSA to GeoJSON
Extract geometry from LSA XML file and create a GeoJSON from it
Parameters
----------
input_file: typing.Annotated[pathlib.Path]
LSA XML file to extract geometry from
output_file: typing.Annotated[pathlib.Path]
GeoJSON generated from the XML file
"""
poly = _extract_footprint(ET.parse(input_file))
schema = {
"geometry": "Polygon",
"properties": {"id": "int"},
}
with fiona.open(output_file, "w", "GeoJSON", schema, crs="epsg:4326") as handle:
handle.write(
{
"geometry": mapping(poly),
"properties": {"id": 0},
}
)
return
def query(
username: str,
password: typing.Annotated[str, ("writeOnly", True)],
output_folder: typing.Annotated[
pathlib.Path, ("format", "directory-path"), ("acceptMode", "save")
],
product_type: typing.Literal["S2_MSIL1C", "S2_MSIL2A", "S1_SAR_GRD"],
time_start: datetime.datetime,
time_end: datetime.datetime,
footprint: typing.Annotated[
pathlib.Path,
("format", "file-path"),
("acceptMode", "open"),
("filter", "*.shp"),
],
minimum_cloud_coverage: typing.Annotated[int, ("minimum", 0), ("maximum", 100)] = 0,
maximum_cloud_coverage: typing.Annotated[
int, ("minimum", 0), ("maximum", 100)
] = 100,
count: typing.Union[
typing.Annotated[int, ("minimum", 0)], typing.Literal["all"]
] = "all",
start: typing.Annotated[int, ("minimum", 0)] = 0,
):
"""Query LSA Data Center
Request available satellite products from LSA Data Center and save query results to a specified folder.
Parameters
----------
username : str
Username to connect to LSA Data Center
password : str
Password to connect to LSA Data Center
output_folder : pathlib.Path
Output folder to store query results
product_type : typing.Literal
Type of satellite product to query
time_start : datetime.datetime
Start date of the query
time_end : datetime.datetime
End date of the query
footprint : pathlib.Path
Path to a shapefile of the region to query where the CRS must be EPSG:4326
minimum_cloud_coverage : int
Minimum cloud cover percentage in [0, 100]
maximum_cloud_coverage : int
Maximum cloud cover percentage in [0, 100]
count : int
Quantity of products to query (default=all)
start : int
Index of the product to start querying from (default=0)
"""
print(output_folder, output_folder.is_dir(), output_folder.name)
parameters = {}
download_all = count == "all"
if count != "all":
parameters["count"] = f"count={count}"
rows_max = count
else:
parameters["count"] = f"count=50"
rows_max = 50
if product_type:
parameters["product_type"] = f"parentId={product_type}"
if time_start:
parameters["time_start"] = f"timeStart={_format_date(time_start)}"
if time_end:
parameters["time_end"] = f"timeEnd={_format_date(time_end)}"
if footprint:
parameters["footprint"] = f"geometry={_format_shape(footprint)}"
# put a limit in the number of points of the footprint. Maybe 30?
# raise error to input something easier to read/smaller
if minimum_cloud_coverage is not None or maximum_cloud_coverage is not None:
minimum_cloud_coverage = (
int(minimum_cloud_coverage) if minimum_cloud_coverage is not None else 0
)
maximum_cloud_coverage = (
int(maximum_cloud_coverage) if maximum_cloud_coverage is not None else 100
)
parameters[
"cloud_cover_percentage"
] = f"cloudCover=%5B{minimum_cloud_coverage},{maximum_cloud_coverage}%5D"
search_parameters = "&".join([value for value in parameters.values()])
output_folder.mkdir(parents=True, exist_ok=True)
rows = min(rows_max, 50)
start = start or 0
page = 0
blank = False
while True:
extra_parameters = {}
extra_parameters["start"] = f"startIndex={start+1}"
search_extra_parameters = "&".join(
[value for value in extra_parameters.values()]
)
query = f"https://collgs.lu/catalog/oseo/search?{search_parameters}&{search_extra_parameters}&httpAccept=atom"
output_file = output_folder / f"lsa_lsa_query_query_{page}-{start}-{rows}.xml"
cmd = [
"wget",
"--no-check-certificate",
f"--user={username}",
f"--password={password}",
f"--output-document={output_file}",
f"{query}",
]
print(" ".join(cmd))
if not blank:
attempt = 0
while True:
print(
f"Offset {start}{'' if not attempt else attempt}: launching {' '.join(cmd)}"
)
try:
p = subprocess.Popen(cmd)
p.wait()
time.sleep(2)
break
except:
print("Too many requests, trying again in 10 seconds...")
print(f"On attempt {attempt}...")
time.sleep(10)
attempt += 1
else:
break
rows_total = int(
ET.parse(output_file)
.getroot()
.find("{http://a9.com/-/spec/opensearch/1.1/}totalResults")
.text
)
start += 50
page += 1
rows = min(rows_total - start, 50)
if not download_all:
rows = min(rows_max - start, rows)
if rows <= 0:
break
print("Query ended!")
def split_query(
input_file: typing.Annotated[
pathlib.Path,
("format", "file-path"),
("acceptMode", "open"),
("filter", "*.xml"),
],
output_folder: typing.Annotated[
pathlib.Path, ("format", "directory-path"), ("acceptMode", "save")
],
):
"""Split LSA query
Split LSA Data Center query results into individual XML entries.
Parameters
----------
input_file : pathlib.Path
LSA Data Center XML query file for splitting
output_folder : pathlib.Path
Output folder to store split XML files
"""
output_folder.mkdir(parents=True, exist_ok=True)
entries = (
ET.parse(input_file).getroot().findall("./{http://www.w3.org/2005/Atom}entry")
)
for entry in entries:
title = re.fullmatch(
"https://collgs\.lu/catalog/oseo/.*?&uid=(.*?)&.*",
entry.find("./{http://www.w3.org/2005/Atom}id").text,
).groups()[0]
filename = output_folder / f"{title}.xml"
with open(filename, "wb") as f:
ET.ElementTree(element=entry).write(f)
def to_epsg(
input_file: typing.Annotated[
pathlib.Path,
("format", "file-path"),
("acceptMode", "open"),
("filter", "*.xml"),
]
) -> str:
"""LSA Info File to EPSG
Get the EPSG of the given LSA xml info file.
Parameters
----------
input_file: pathlib.Path
LSA info xml file
Returns
-------
epsg: str
The EPSG of the file
"""
identifier = _get_product_identifier(input_file)
# 326 == North, 327 == South
epsg = ("326" if identifier.split("_")[5][3] > "M" else "327") + identifier.split(
"_"
)[5][1:3]
return epsg
if __name__ == "__main__":
username = "<your_username>"
password = "<password_test>"
lsa_query_results_folder = pathlib.Path("./lsa_query_files")
lsa_product_info_folder = pathlib.Path("./lsa_product_info")
sentinel_products_folder = pathlib.Path("./s2_products")
lsa_query_results_folder.mkdir(parents=True, exist_ok=True)
lsa_product_info_folder.mkdir(parents=True, exist_ok=True)
sentinel_products_folder.mkdir(parents=True, exist_ok=True)
date_start = "2022-01-01"
date_end = "2022-01-16"
footprint_file = pathlib.Path("roi/roi_all_convex_4326.shp")
# First query info into a foyou want info to be !
query(
username=username,
password=password,
output_folder=lsa_query_results_folder,
product_type="S2_MSIL2A",
time_start=datetime.datetime.fromisoformat(date_start),
time_end=datetime.datetime.fromisoformat(date_end),
footprint=pathlib.Path(footprint_file),
)
# Split each query file
for input_file in lsa_query_results_folder.glob("*.xml"):
split_query(
input_file=input_file,
output_folder=lsa_product_info_folder,
)
# Download each product
for input_file in lsa_product_info_folder.glob("*.xml"):
download(
username=username,
password=password,
input_file=input_file,
output_file=sentinel_products_folder
/ pathlib.Path(input_file.with_suffix(".zip").name),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment