Last active
August 16, 2020 15:24
-
-
Save kontgis/dd52b4f1a8597c6767e075da6eb54740 to your computer and use it in GitHub Desktop.
Using Workflows on Descartes Labs platform to generate training data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import descarteslabs as dl\n", | |
"import descarteslabs.workflows as wf\n", | |
"\n", | |
"import shapely.geometry\n", | |
"import shapely.ops\n", | |
"import shapely.prepared\n", | |
"import rasterio.features\n", | |
"import ipyleaflet\n", | |
"from tqdm.notebook import tqdm" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"sac = shapely.geometry.shape(\n", | |
" dl.places.shape(\"north-america_united-states_california_sacramento-valley\").geometry\n", | |
")\n", | |
"sj = shapely.geometry.shape(\n", | |
" dl.places.shape(\n", | |
" \"north-america_united-states_california_san-joaquin-valley\"\n", | |
" ).geometry\n", | |
")\n", | |
"central_valley_aoi = sac.union(sj)\n", | |
"\n", | |
"tiles = dl.scenes.DLTile.from_shape(\n", | |
" central_valley_aoi, resolution=10, tilesize=500, pad=0\n", | |
")\n", | |
"\n", | |
"start_datetime = \"2019-01-01\"\n", | |
"end_datetime = \"2019-03-28\"" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Pull out some steps we'll repeat into functions for readability" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def cloud_masked_daily_product(\n", | |
" product_id: str, start_datetime: str, end_datetime: str\n", | |
") -> wf.ImageCollection:\n", | |
" \"Get a product by ID, masked by the DL cloud mask and mosaicked by day\"\n", | |
" ic = wf.ImageCollection.from_id(product_id, start_datetime, end_datetime)\n", | |
" cloudmask = (\n", | |
" wf.ImageCollection.from_id(\n", | |
" product_id + \":dlcloud:v1\", start_datetime, end_datetime\n", | |
" ).pick_bands(\"valid_cloudfree\")\n", | |
" == 0\n", | |
" )\n", | |
"\n", | |
" # Make an ImageCollectionGroupby object, for quicker lookups from `ic` by date (you can use it like a dict)\n", | |
" ic_date_groupby = ic.groupby(dates=(\"year\", \"month\", \"day\"))\n", | |
" # For each cloudmask date, pick the corresponding image from `ic` by date, mosiac both, and mask them.\n", | |
" # (Not all scenes have cloudmasks processed, so this ensures we only return scenes that do.)\n", | |
" return cloudmask.groupby(dates=(\"year\", \"month\", \"day\")).map(\n", | |
" lambda ymd, mask_imgs: ic_date_groupby[ymd].mosaic().mask(mask_imgs.mosaic())\n", | |
" )\n", | |
"\n", | |
"def ndvi(ic: wf.ImageCollection) -> wf.ImageCollection:\n", | |
" nir, red = ic.unpack_bands(\"nir red\")\n", | |
" ndvi = (nir - red) / (nir + red)\n", | |
" return ndvi.rename_bands(\"ndvi\")\n", | |
"\n", | |
"def isin(ic: wf.ImageCollection, values: list) -> wf.ImageCollection:\n", | |
" \"Like np.isin, for Workflows\"\n", | |
" assert len(values) > 0\n", | |
" result = False\n", | |
" for value in values:\n", | |
" result = result | (ic == value)\n", | |
" return result" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Define our cloud- and crop-masked imagery stacks using Workflows" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"l8_daily = cloud_masked_daily_product(\n", | |
" \"landsat:LC08:01:T1:TOAR\", start_datetime, end_datetime\n", | |
").pick_bands(\"red green blue nir swir1\")\n", | |
"l8_with_ndvi = l8_daily.concat_bands(ndvi(l8_daily))\n", | |
"\n", | |
"s2_daily = cloud_masked_daily_product(\"sentinel-2:L1C\", start_datetime, end_datetime).pick_bands(\n", | |
" \"red green blue nir swir1\"\n", | |
")\n", | |
"s2_with_ndvi = s2_daily.concat_bands(ndvi(s2_daily))\n", | |
"\n", | |
"s1 = wf.ImageCollection.from_id(\n", | |
" \"sentinel-1:GRD\", start_datetime=start_datetime, end_datetime=end_datetime\n", | |
").pick_bands(\"vh vv\")\n", | |
"s1_daily = s1.groupby(dates=(\"year\", \"month\", \"day\")).mosaic()\n", | |
"\n", | |
"cdl = wf.ImageCollection.from_id(\n", | |
" \"usda:cdl:v1\", start_datetime=\"2016-12-01\", end_datetime=\"2020-01-01\"\n", | |
").pick_bands(\"class\")\n", | |
"\n", | |
"grains_oils_grass_beans = [1,2,3,4,5,6,10,11,12,13,21,22,23,24,25,26,27,28,29,\n", | |
" 30,31,32,33,34,35,36,37,38,39,41,42,43,44,45,46,51,\n", | |
" 52,53,225,226,228,230,232,234,235,236,237,238,239,240,241,254]\n", | |
"\n", | |
"deli_crops = [14, 48, 49, 50, 54, 55, 57, 206, 207, 208, 209, 213, 214, 216,\n", | |
" 219, 221, 222, 224, 227, 229, 231, 242, 243, 244, 245, 246, 247,\n", | |
" 248, 249, 250]\n", | |
"\n", | |
"tree_crops = [66, 67, 68, 69, 72, 74, 75, 76, 77, 204, 210, 211, 212, 215, 217,\n", | |
" 218,220, 223]\n", | |
"\n", | |
"crops_list = deli_crops + tree_crops\n", | |
"\n", | |
"is_crops = isin(cdl, crops_list)\n", | |
"is_crops_19 = is_crops[-1]\n", | |
"\n", | |
"four_year_combo = is_crops.sum(axis=\"images\") + is_crops_19 # double-weight 2019\n", | |
"four_year_binary = four_year_combo >= 2\n", | |
"cdl_mask = ~four_year_binary\n", | |
"\n", | |
"l8_masked = l8_with_ndvi.mask(cdl_mask)\n", | |
"s2_masked = s2_with_ndvi.mask(cdl_mask)\n", | |
"s1_masked = s1_daily.mask(cdl_mask)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Let's first visualize the layers on a map for quick validation" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"wf.map.center = central_valley_aoi.centroid.coords[0][::-1] # flip to lat, lon order for ipyleaflet\n", | |
"wf.map.zoom = 10\n", | |
"wf.map\n", | |
"# right-click on the map > new view for output, then reposition as you like" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"four_year_combo.visualize(\"four_year_combo\", colormap=\"Blues\", scales=[0, 5])\n", | |
"four_year_binary.visualize(\"four_year_binary\", colormap=\"Greens\", scales=[0, 1])\n", | |
"\n", | |
"l8_with_ndvi.pick_bands(\"red green blue\").mosaic().visualize(\"l8\", scales=[[0, 0.4], [0, 0.4], [0, 0.4]])\n", | |
"s2_with_ndvi.pick_bands(\"red green blue\").mosaic().visualize(\"s2\", scales=[[0, 0.4], [0, 0.4], [0, 0.4]])\n", | |
"s1_daily.pick_bands(\"vv vh vv\").mosaic().visualize(\"s1\", scales=[[0, 0.5], [0, 0.1], [0, 0.5]])" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"#### Add all tiles too for reference, and pick one random one to experiment with" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"wf.map.add_layer(\n", | |
" ipyleaflet.GeoJSON(name=\"Tiles\", data=shapely.geometry.GeometryCollection([t.geometry for t in tiles]).__geo_interface__)\n", | |
")\n", | |
"wf.map.control_other_layers = True" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"tile = tiles[-1000] # pick one that isn't all masked out" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"wf.map.add_layer(\n", | |
" ipyleaflet.GeoJSON(name=\"One tile\", data=tile.__geo_interface__, style={\"color\": \"red\", \"fillColor\": \"red\"})\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"wf.map.center = tile.geometry.centroid.coords[0][::-1]\n", | |
"wf.map.zoom = 12" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"#### Compute the data and look at it\n", | |
"\n", | |
"By passing all three things we want to `wf.compute` in a list, the backend can compute them in parallel, and only compute the `cdl_mask` once, which is reused for all three." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"l8_data, s2_data, s1_data = wf.compute([l8_masked.ndarray, s2_masked.ndarray, s1_masked.ndarray], tile)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"l8_data.shape, s2_data.shape, s1_data.shape" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"dl.scenes.display(*l8_data[:, :3], size=3)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Notice that a large number of our tiles don't contain any of the crops we're looking for—let's get rid of those\n", | |
"\n", | |
"Unfortunately, the Platform doesn't have built-in functionality to do this yet. But by computing our `four_year_binary` mask and using `rasterio` to vectorize it, and then intersecting the crop mask with our tiles, we can do this locally." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# make a GeoContext for the whole central valley.\n", | |
"# rather than thinking through the resolution, we'll just tell the platform to fit it within a 2048x2048 array.\n", | |
"central_valley_ctx = dl.scenes.AOI(central_valley_aoi, shape=(2048, 2048), crs=\"EPSG:4326\")\n", | |
"all_cdl = four_year_binary.compute(central_valley_ctx)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"dl.scenes.display(all_cdl.ndarray, size=5)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# Workflows gives us back the geotransform for the GeoContext we used,\n", | |
"# which we'll need to pass on to rasterio so it can convert from pixel coordinates to lat-lon coordinates\n", | |
"all_cdl.geocontext[\"gdal_geotrans\"]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# vectorize the whole raster, only keeping the shapes with a pixel value of 1 (i.e. pixels that meet our CDL criteria)\n", | |
"shapes = list(\n", | |
" geom for geom, value in\n", | |
" rasterio.features.shapes(\n", | |
" all_cdl.ndarray.astype(\"uint8\"), transform=rasterio.transform.Affine.from_gdal(*all_cdl.geocontext[\"gdal_geotrans\"])\n", | |
" )\n", | |
" if value == 1\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"len(shapes)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"shapes[:2]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# union all of the individual shapes into one, and simplify a little (we don't need that many points!)\n", | |
"all_valid = shapely.ops.unary_union([shapely.geometry.shape(s) for s in shapes]).simplify(0.3)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"type(all_valid)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# now, we're going to intersect our tiles with the `all_valid` geometry. it's slightly faster if we use a shapely prepared geometry.\n", | |
"all_valid_prepped = shapely.prepared.prep(all_valid)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"valid_tiles = [t for t in tqdm(tiles) if all_valid_prepped.intersects(t.geometry)]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# nice, that cut out more than half of the tiles we don't need!\n", | |
"len(valid_tiles) / len(tiles)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# display the new valid tiles on the map\n", | |
"wf.map.add_layer(\n", | |
" ipyleaflet.GeoJSON(name=\"Valid Tiles\", data=shapely.geometry.GeometryCollection([t.geometry for t in valid_tiles]).__geo_interface__, style={\"color\": \"green\", \"fillColor\": \"green\"})\n", | |
")" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Compute at scale: submit all the tiles to run, and process them\n", | |
"\n", | |
"We could just do\n", | |
"```python\n", | |
"for tile in valid_tiles:\n", | |
" l8_data, s2_data, s1_data = wf.compute([l8_masked.ndarray, s2_masked.ndarray, s1_masked.ndarray], tile)\n", | |
" ...\n", | |
"```\n", | |
"\n", | |
"but that would be relatively slow, since we'd be computing each tile one-at-a-time. Instead, we can submit _every_ tile to the Platform to compute at once, in parallel, and then process each one as it completes.\n", | |
"\n", | |
"Right now, this requires a bit of boilerplate code, but eventually we'll have all this built into the client, so it'll be a bit easier." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# we'll just run on a few tiles initially to test\n", | |
"tiles_to_run = valid_tiles[:10]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# submit all the tiles to run, asynchronously! (note the `block=False`, which makes it immediately return a `wf.Job` object)\n", | |
"jobs = [wf.compute([l8_masked.ndarray, s2_masked.ndarray, s1_masked.ndarray], tile, block=False) for tile in tqdm(tiles_to_run)]\n", | |
"job_to_tile = {j: t for j, t in zip(jobs, tiles_to_run)}\n", | |
"# note: you could make submission significantly faster with `concurrent.futures.ThreadPoolExecutor.map`" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# some code to wait for jobs, and process them in whatever order they complete.\n", | |
"# (unfortunately this isn't built into the platform yet, but it will be eventually.)\n", | |
"\n", | |
"from typing import Iterator, Sequence\n", | |
"import time\n", | |
"\n", | |
"import descarteslabs.workflows as wf\n", | |
"\n", | |
"\n", | |
"def as_completed(jobs: Sequence[wf.Job], interval_sec: int = 5) -> Iterator[wf.Job]:\n", | |
" \"\"\"\n", | |
" Iterator over Jobs that yields each Job when it completes.\n", | |
" \n", | |
" Parameters\n", | |
" ----------\n", | |
" jobs: Sequence[wf.Job]\n", | |
" Jobs to wait for\n", | |
" interval_sec: int, optional, default 5\n", | |
" Wait at least this many seconds between polling for job updates.\n", | |
" \n", | |
" Yields\n", | |
" ------\n", | |
" job: wf.Job\n", | |
" A completed job (either succeeded or failed).\n", | |
" \"\"\"\n", | |
" jobs = list(jobs)\n", | |
" while len(jobs) > 0:\n", | |
" loop_start = time.perf_counter()\n", | |
"\n", | |
" i = 0\n", | |
" while i < len(jobs):\n", | |
" job = jobs[i]\n", | |
" if not job.done: # in case it's already loaded\n", | |
" try:\n", | |
" job.refresh()\n", | |
" except Exception:\n", | |
" continue # be resilient to transient errors for now\n", | |
"\n", | |
" if job.done:\n", | |
" yield job\n", | |
" del jobs[i] # \"advances\" i\n", | |
" else:\n", | |
" i += 1\n", | |
"\n", | |
" loop_duration = time.perf_counter() - loop_start\n", | |
" if len(jobs) > 0 and loop_duration < interval_sec:\n", | |
" time.sleep(interval_sec - loop_duration)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def handle_result(tile, l8_data, s2_data, s1_data):\n", | |
" print(tile.key, l8_data.shape)\n", | |
" # TODO: do whatever you'd do with the result!" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"failed = []\n", | |
"for job in tqdm(as_completed(jobs), total=len(jobs)):\n", | |
" if job.error is not None:\n", | |
" failed.append(job)\n", | |
" print(job.error)\n", | |
" else:\n", | |
" l8_data, s2_data, s1_data = job.result(progress_bar=False)\n", | |
" tile = job_to_tile[job]\n", | |
" handle_result(tile, l8_data, s2_data, s1_data)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# if you need to rerun any failures, you can just do\n", | |
"# new_jobs = [job.resubmit() for job in failed]" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.7.6" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment