Created
March 28, 2022 19:19
-
-
Save nilsleh/f89abf7d24db74e0ce18a602c92ccc30 to your computer and use it in GitHub Desktop.
Split large area into smaller manageable polygons to download via Planetary Computer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import planetary_computer as pc | |
import rasterio | |
import stackstac | |
from pystac_client import Client | |
from shapely.geometry import mapping, shape | |
from shapely.geometry.polygon import Polygon | |
# 10m resolution in lat/lon degrees | |
RES_DEGREE = 0.0000898315209823975 | |
def polygon_grid(area_polygon, size, save_path, plot=False): | |
bounds = area_polygon.bounds | |
xmin = bounds[0] | |
xmax = bounds[2] | |
ymin = bounds[1] | |
ymax = bounds[3] | |
num_xs = int((xmax - xmin) // size) | |
num_ys = int((ymax - ymin) // size) | |
xs = [xmin + n * size for n in range(num_xs + 1)] | |
ys = [ymin + n * size for n in range(num_ys + 1)] | |
proposed_xs = [xs[0], xs[-1], xs[-1], xs[0], xs[0]] | |
proposed_ys = [ys[0], ys[0], ys[-1], ys[-1], ys[0]] | |
results = [] | |
for idx in range(len(xs) - 1): | |
for idy in range(len(ys) - 1): | |
top_left = [xs[idx], ys[idy]] | |
top_right = [xs[idx + 1], ys[idy]] | |
bottom_left = [xs[idx], ys[idy + 1]] | |
bottom_right = [xs[idx + 1], ys[idy + 1]] | |
sub_poly = Polygon([top_left, top_right, bottom_right, bottom_left]) | |
if area_polygon.contains(sub_poly): | |
results.append(sub_poly) | |
# plot results | |
if plot: | |
fig = plt.figure() | |
# plot entire area_polygon | |
x, y = area_polygon.exterior.xy | |
plt.plot(x, y, c="blue", marker="o") | |
for g in results: | |
x, y = g.exterior.xy | |
plt.plot(x, y, c="orange") | |
plt.plot(proposed_xs, proposed_ys, c="red", marker="o") | |
plt.axis("equal") | |
plt.show() | |
feature_collection = {"type": "FeatureCollection", "features": []} | |
for r in results: | |
feature_collection["features"].append(mapping(r)) | |
with open(save_path, "w") as f: | |
f.write(json.dumps(feature_collection)) | |
return feature_collection | |
def create_file(data_array, path: str, crs: int, bands) -> None: | |
"""Create a tif file from a retrieved data array. | |
Args: | |
path: path where to save tif file | |
data_array: np array of shape bands X height X width | |
crs: crs of array | |
""" | |
profile = {} | |
profile["driver"] = "GTiff" | |
profile["dtype"] = np.int32 | |
profile["crs"] = "epsg:{}".format(str(crs)) | |
profile["count"] = data_array.shape[0] | |
profile["height"] = data_array.shape[1] | |
profile["width"] = data_array.shape[2] | |
profile["transform"] = rasterio.transform.from_bounds(0, 0, 1, 1, 1, 1) | |
src = rasterio.open(path, "w", **profile) | |
src.write(data_array) | |
src.descriptions = tuple(bands) | |
def retrieve_imagery(aoi_path): | |
with open(aoi_path, "r") as f: | |
content = json.load(f) | |
catalog = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1") | |
time_of_interest = "2017-01-01/2017-12-31" | |
bands = ["B04", "B03", "B02", "B08"] | |
for feature in content["features"]: | |
c = feature["coordinates"][0] | |
c1 = [x[0] for x in c] # first coordinate | |
c2 = [x[1] for x in c] | |
bounds = (min(c1), min(c2), max(c1), max(c2)) | |
search = catalog.search( | |
collections=["sentinel-2-l2a"], | |
intersects=feature, | |
datetime=time_of_interest, | |
query={"eo:cloud_cover": {"lt": 20}}, | |
) | |
items = list(search.get_items()) | |
signed_items = [pc.sign(item).to_dict() for item in items] | |
data = ( | |
stackstac.stack( | |
signed_items, | |
assets=bands, # red, green, blue, near_infrared | |
epsg=4326, | |
resolution=RES_DEGREE, | |
bounds_latlon=bounds, | |
) | |
.where(lambda x: x > 0, other=np.nan) # sentinel-2 uses 0 as nodata | |
.assign_coords(band=lambda x: x.common_name.rename("band")) # use common names | |
) | |
median = data.median(dim="time").compute(scheduler="single-threaded") | |
img_id = signed_items[0]["id"] + ".tif" | |
save_path = os.path.join("data", "forest", img_id) | |
create_file(median, save_path, crs=4326, bands=bands) | |
def main(): | |
area_polygon = { | |
"coordinates": [ | |
[ | |
[9.5141602, 49.9477957], | |
[9.5167351, 49.8975095], | |
[9.6647072, 49.8936391], | |
[9.6662521, 49.9485689], | |
[9.5141602, 49.9477957], | |
] | |
], | |
"type": "Polygon", | |
} | |
geom = shape(area_polygon) | |
desired_chipsize = 200 # in pixels so 200x200 at 10m resolution | |
desired_chipsize_degrees = desired_chipsize * RES_DEGREE | |
poly_box_path = os.path.join("data", "aois.geojson") | |
_ = polygon_grid(geom, desired_chipsize_degrees, save_path=poly_box_path) | |
retrieve_imagery(poly_box_path) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment