Skip to content

Instantly share code, notes, and snippets.

@nilsleh
Created March 28, 2022 19:19
Show Gist options
  • Save nilsleh/f89abf7d24db74e0ce18a602c92ccc30 to your computer and use it in GitHub Desktop.
Save nilsleh/f89abf7d24db74e0ce18a602c92ccc30 to your computer and use it in GitHub Desktop.
Split large area into smaller manageable polygons to download via Planetary Computer
import json
import os
import matplotlib.pyplot as plt
import numpy as np
import planetary_computer as pc
import rasterio
import stackstac
from pystac_client import Client
from shapely.geometry import mapping, shape
from shapely.geometry.polygon import Polygon
# 10m resolution in lat/lon degrees
RES_DEGREE = 0.0000898315209823975
def polygon_grid(area_polygon, size, save_path, plot=False):
bounds = area_polygon.bounds
xmin = bounds[0]
xmax = bounds[2]
ymin = bounds[1]
ymax = bounds[3]
num_xs = int((xmax - xmin) // size)
num_ys = int((ymax - ymin) // size)
xs = [xmin + n * size for n in range(num_xs + 1)]
ys = [ymin + n * size for n in range(num_ys + 1)]
proposed_xs = [xs[0], xs[-1], xs[-1], xs[0], xs[0]]
proposed_ys = [ys[0], ys[0], ys[-1], ys[-1], ys[0]]
results = []
for idx in range(len(xs) - 1):
for idy in range(len(ys) - 1):
top_left = [xs[idx], ys[idy]]
top_right = [xs[idx + 1], ys[idy]]
bottom_left = [xs[idx], ys[idy + 1]]
bottom_right = [xs[idx + 1], ys[idy + 1]]
sub_poly = Polygon([top_left, top_right, bottom_right, bottom_left])
if area_polygon.contains(sub_poly):
results.append(sub_poly)
# plot results
if plot:
fig = plt.figure()
# plot entire area_polygon
x, y = area_polygon.exterior.xy
plt.plot(x, y, c="blue", marker="o")
for g in results:
x, y = g.exterior.xy
plt.plot(x, y, c="orange")
plt.plot(proposed_xs, proposed_ys, c="red", marker="o")
plt.axis("equal")
plt.show()
feature_collection = {"type": "FeatureCollection", "features": []}
for r in results:
feature_collection["features"].append(mapping(r))
with open(save_path, "w") as f:
f.write(json.dumps(feature_collection))
return feature_collection
def create_file(data_array, path: str, crs: int, bands) -> None:
"""Create a tif file from a retrieved data array.
Args:
path: path where to save tif file
data_array: np array of shape bands X height X width
crs: crs of array
"""
profile = {}
profile["driver"] = "GTiff"
profile["dtype"] = np.int32
profile["crs"] = "epsg:{}".format(str(crs))
profile["count"] = data_array.shape[0]
profile["height"] = data_array.shape[1]
profile["width"] = data_array.shape[2]
profile["transform"] = rasterio.transform.from_bounds(0, 0, 1, 1, 1, 1)
src = rasterio.open(path, "w", **profile)
src.write(data_array)
src.descriptions = tuple(bands)
def retrieve_imagery(aoi_path):
with open(aoi_path, "r") as f:
content = json.load(f)
catalog = Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")
time_of_interest = "2017-01-01/2017-12-31"
bands = ["B04", "B03", "B02", "B08"]
for feature in content["features"]:
c = feature["coordinates"][0]
c1 = [x[0] for x in c] # first coordinate
c2 = [x[1] for x in c]
bounds = (min(c1), min(c2), max(c1), max(c2))
search = catalog.search(
collections=["sentinel-2-l2a"],
intersects=feature,
datetime=time_of_interest,
query={"eo:cloud_cover": {"lt": 20}},
)
items = list(search.get_items())
signed_items = [pc.sign(item).to_dict() for item in items]
data = (
stackstac.stack(
signed_items,
assets=bands, # red, green, blue, near_infrared
epsg=4326,
resolution=RES_DEGREE,
bounds_latlon=bounds,
)
.where(lambda x: x > 0, other=np.nan) # sentinel-2 uses 0 as nodata
.assign_coords(band=lambda x: x.common_name.rename("band")) # use common names
)
median = data.median(dim="time").compute(scheduler="single-threaded")
img_id = signed_items[0]["id"] + ".tif"
save_path = os.path.join("data", "forest", img_id)
create_file(median, save_path, crs=4326, bands=bands)
def main():
area_polygon = {
"coordinates": [
[
[9.5141602, 49.9477957],
[9.5167351, 49.8975095],
[9.6647072, 49.8936391],
[9.6662521, 49.9485689],
[9.5141602, 49.9477957],
]
],
"type": "Polygon",
}
geom = shape(area_polygon)
desired_chipsize = 200 # in pixels so 200x200 at 10m resolution
desired_chipsize_degrees = desired_chipsize * RES_DEGREE
poly_box_path = os.path.join("data", "aois.geojson")
_ = polygon_grid(geom, desired_chipsize_degrees, save_path=poly_box_path)
retrieve_imagery(poly_box_path)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment