Skip to content

Instantly share code, notes, and snippets.

@kontgis
Last active August 16, 2020 15:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kontgis/dd52b4f1a8597c6767e075da6eb54740 to your computer and use it in GitHub Desktop.
Save kontgis/dd52b4f1a8597c6767e075da6eb54740 to your computer and use it in GitHub Desktop.
Using Workflows on Descartes Labs platform to generate training data
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import descarteslabs as dl\n",
"import descarteslabs.workflows as wf\n",
"\n",
"import shapely.geometry\n",
"import shapely.ops\n",
"import shapely.prepared\n",
"import rasterio.features\n",
"import ipyleaflet\n",
"from tqdm.notebook import tqdm"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sac = shapely.geometry.shape(\n",
" dl.places.shape(\"north-america_united-states_california_sacramento-valley\").geometry\n",
")\n",
"sj = shapely.geometry.shape(\n",
" dl.places.shape(\n",
" \"north-america_united-states_california_san-joaquin-valley\"\n",
" ).geometry\n",
")\n",
"central_valley_aoi = sac.union(sj)\n",
"\n",
"tiles = dl.scenes.DLTile.from_shape(\n",
" central_valley_aoi, resolution=10, tilesize=500, pad=0\n",
")\n",
"\n",
"start_datetime = \"2019-01-01\"\n",
"end_datetime = \"2019-03-28\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Pull out some steps we'll repeat into functions for readability"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def cloud_masked_daily_product(\n",
" product_id: str, start_datetime: str, end_datetime: str\n",
") -> wf.ImageCollection:\n",
" \"Get a product by ID, masked by the DL cloud mask and mosaicked by day\"\n",
" ic = wf.ImageCollection.from_id(product_id, start_datetime, end_datetime)\n",
" cloudmask = (\n",
" wf.ImageCollection.from_id(\n",
" product_id + \":dlcloud:v1\", start_datetime, end_datetime\n",
" ).pick_bands(\"valid_cloudfree\")\n",
" == 0\n",
" )\n",
"\n",
" # Make an ImageCollectionGroupby object, for quicker lookups from `ic` by date (you can use it like a dict)\n",
" ic_date_groupby = ic.groupby(dates=(\"year\", \"month\", \"day\"))\n",
" # For each cloudmask date, pick the corresponding image from `ic` by date, mosiac both, and mask them.\n",
" # (Not all scenes have cloudmasks processed, so this ensures we only return scenes that do.)\n",
" return cloudmask.groupby(dates=(\"year\", \"month\", \"day\")).map(\n",
" lambda ymd, mask_imgs: ic_date_groupby[ymd].mosaic().mask(mask_imgs.mosaic())\n",
" )\n",
"\n",
"def ndvi(ic: wf.ImageCollection) -> wf.ImageCollection:\n",
" nir, red = ic.unpack_bands(\"nir red\")\n",
" ndvi = (nir - red) / (nir + red)\n",
" return ndvi.rename_bands(\"ndvi\")\n",
"\n",
"def isin(ic: wf.ImageCollection, values: list) -> wf.ImageCollection:\n",
" \"Like np.isin, for Workflows\"\n",
" assert len(values) > 0\n",
" result = False\n",
" for value in values:\n",
" result = result | (ic == value)\n",
" return result"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Define our cloud- and crop-masked imagery stacks using Workflows"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"l8_daily = cloud_masked_daily_product(\n",
" \"landsat:LC08:01:T1:TOAR\", start_datetime, end_datetime\n",
").pick_bands(\"red green blue nir swir1\")\n",
"l8_with_ndvi = l8_daily.concat_bands(ndvi(l8_daily))\n",
"\n",
"s2_daily = cloud_masked_daily_product(\"sentinel-2:L1C\", start_datetime, end_datetime).pick_bands(\n",
" \"red green blue nir swir1\"\n",
")\n",
"s2_with_ndvi = s2_daily.concat_bands(ndvi(s2_daily))\n",
"\n",
"s1 = wf.ImageCollection.from_id(\n",
" \"sentinel-1:GRD\", start_datetime=start_datetime, end_datetime=end_datetime\n",
").pick_bands(\"vh vv\")\n",
"s1_daily = s1.groupby(dates=(\"year\", \"month\", \"day\")).mosaic()\n",
"\n",
"cdl = wf.ImageCollection.from_id(\n",
" \"usda:cdl:v1\", start_datetime=\"2016-12-01\", end_datetime=\"2020-01-01\"\n",
").pick_bands(\"class\")\n",
"\n",
"grains_oils_grass_beans = [1,2,3,4,5,6,10,11,12,13,21,22,23,24,25,26,27,28,29,\n",
" 30,31,32,33,34,35,36,37,38,39,41,42,43,44,45,46,51,\n",
" 52,53,225,226,228,230,232,234,235,236,237,238,239,240,241,254]\n",
"\n",
"deli_crops = [14, 48, 49, 50, 54, 55, 57, 206, 207, 208, 209, 213, 214, 216,\n",
" 219, 221, 222, 224, 227, 229, 231, 242, 243, 244, 245, 246, 247,\n",
" 248, 249, 250]\n",
"\n",
"tree_crops = [66, 67, 68, 69, 72, 74, 75, 76, 77, 204, 210, 211, 212, 215, 217,\n",
" 218,220, 223]\n",
"\n",
"crops_list = deli_crops + tree_crops\n",
"\n",
"is_crops = isin(cdl, crops_list)\n",
"is_crops_19 = is_crops[-1]\n",
"\n",
"four_year_combo = is_crops.sum(axis=\"images\") + is_crops_19 # double-weight 2019\n",
"four_year_binary = four_year_combo >= 2\n",
"cdl_mask = ~four_year_binary\n",
"\n",
"l8_masked = l8_with_ndvi.mask(cdl_mask)\n",
"s2_masked = s2_with_ndvi.mask(cdl_mask)\n",
"s1_masked = s1_daily.mask(cdl_mask)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Let's first visualize the layers on a map for quick validation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"wf.map.center = central_valley_aoi.centroid.coords[0][::-1] # flip to lat, lon order for ipyleaflet\n",
"wf.map.zoom = 10\n",
"wf.map\n",
"# right-click on the map > new view for output, then reposition as you like"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"four_year_combo.visualize(\"four_year_combo\", colormap=\"Blues\", scales=[0, 5])\n",
"four_year_binary.visualize(\"four_year_binary\", colormap=\"Greens\", scales=[0, 1])\n",
"\n",
"l8_with_ndvi.pick_bands(\"red green blue\").mosaic().visualize(\"l8\", scales=[[0, 0.4], [0, 0.4], [0, 0.4]])\n",
"s2_with_ndvi.pick_bands(\"red green blue\").mosaic().visualize(\"s2\", scales=[[0, 0.4], [0, 0.4], [0, 0.4]])\n",
"s1_daily.pick_bands(\"vv vh vv\").mosaic().visualize(\"s1\", scales=[[0, 0.5], [0, 0.1], [0, 0.5]])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Add all tiles too for reference, and pick one random one to experiment with"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"wf.map.add_layer(\n",
" ipyleaflet.GeoJSON(name=\"Tiles\", data=shapely.geometry.GeometryCollection([t.geometry for t in tiles]).__geo_interface__)\n",
")\n",
"wf.map.control_other_layers = True"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tile = tiles[-1000] # pick one that isn't all masked out"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"wf.map.add_layer(\n",
" ipyleaflet.GeoJSON(name=\"One tile\", data=tile.__geo_interface__, style={\"color\": \"red\", \"fillColor\": \"red\"})\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"wf.map.center = tile.geometry.centroid.coords[0][::-1]\n",
"wf.map.zoom = 12"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Compute the data and look at it\n",
"\n",
"By passing all three things we want to `wf.compute` in a list, the backend can compute them in parallel, and only compute the `cdl_mask` once, which is reused for all three."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"l8_data, s2_data, s1_data = wf.compute([l8_masked.ndarray, s2_masked.ndarray, s1_masked.ndarray], tile)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"l8_data.shape, s2_data.shape, s1_data.shape"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dl.scenes.display(*l8_data[:, :3], size=3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Notice that a large number of our tiles don't contain any of the crops we're looking for—let's get rid of those\n",
"\n",
"Unfortunately, the Platform doesn't have built-in functionality to do this yet. But by computing our `four_year_binary` mask and using `rasterio` to vectorize it, and then intersecting the crop mask with our tiles, we can do this locally."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# make a GeoContext for the whole central valley.\n",
"# rather than thinking through the resolution, we'll just tell the platform to fit it within a 2048x2048 array.\n",
"central_valley_ctx = dl.scenes.AOI(central_valley_aoi, shape=(2048, 2048), crs=\"EPSG:4326\")\n",
"all_cdl = four_year_binary.compute(central_valley_ctx)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"dl.scenes.display(all_cdl.ndarray, size=5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Workflows gives us back the geotransform for the GeoContext we used,\n",
"# which we'll need to pass on to rasterio so it can convert from pixel coordinates to lat-lon coordinates\n",
"all_cdl.geocontext[\"gdal_geotrans\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# vectorize the whole raster, only keeping the shapes with a pixel value of 1 (i.e. pixels that meet our CDL criteria)\n",
"shapes = list(\n",
" geom for geom, value in\n",
" rasterio.features.shapes(\n",
" all_cdl.ndarray.astype(\"uint8\"), transform=rasterio.transform.Affine.from_gdal(*all_cdl.geocontext[\"gdal_geotrans\"])\n",
" )\n",
" if value == 1\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"len(shapes)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"shapes[:2]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# union all of the individual shapes into one, and simplify a little (we don't need that many points!)\n",
"all_valid = shapely.ops.unary_union([shapely.geometry.shape(s) for s in shapes]).simplify(0.3)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"type(all_valid)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# now, we're going to intersect our tiles with the `all_valid` geometry. it's slightly faster if we use a shapely prepared geometry.\n",
"all_valid_prepped = shapely.prepared.prep(all_valid)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"valid_tiles = [t for t in tqdm(tiles) if all_valid_prepped.intersects(t.geometry)]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# nice, that cut out more than half of the tiles we don't need!\n",
"len(valid_tiles) / len(tiles)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# display the new valid tiles on the map\n",
"wf.map.add_layer(\n",
" ipyleaflet.GeoJSON(name=\"Valid Tiles\", data=shapely.geometry.GeometryCollection([t.geometry for t in valid_tiles]).__geo_interface__, style={\"color\": \"green\", \"fillColor\": \"green\"})\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Compute at scale: submit all the tiles to run, and process them\n",
"\n",
"We could just do\n",
"```python\n",
"for tile in valid_tiles:\n",
" l8_data, s2_data, s1_data = wf.compute([l8_masked.ndarray, s2_masked.ndarray, s1_masked.ndarray], tile)\n",
" ...\n",
"```\n",
"\n",
"but that would be relatively slow, since we'd be computing each tile one-at-a-time. Instead, we can submit _every_ tile to the Platform to compute at once, in parallel, and then process each one as it completes.\n",
"\n",
"Right now, this requires a bit of boilerplate code, but eventually we'll have all this built into the client, so it'll be a bit easier."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# we'll just run on a few tiles initially to test\n",
"tiles_to_run = valid_tiles[:10]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# submit all the tiles to run, asynchronously! (note the `block=False`, which makes it immediately return a `wf.Job` object)\n",
"jobs = [wf.compute([l8_masked.ndarray, s2_masked.ndarray, s1_masked.ndarray], tile, block=False) for tile in tqdm(tiles_to_run)]\n",
"job_to_tile = {j: t for j, t in zip(jobs, tiles_to_run)}\n",
"# note: you could make submission significantly faster with `concurrent.futures.ThreadPoolExecutor.map`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# some code to wait for jobs, and process them in whatever order they complete.\n",
"# (unfortunately this isn't built into the platform yet, but it will be eventually.)\n",
"\n",
"from typing import Iterator, Sequence\n",
"import time\n",
"\n",
"import descarteslabs.workflows as wf\n",
"\n",
"\n",
"def as_completed(jobs: Sequence[wf.Job], interval_sec: int = 5) -> Iterator[wf.Job]:\n",
" \"\"\"\n",
" Iterator over Jobs that yields each Job when it completes.\n",
" \n",
" Parameters\n",
" ----------\n",
" jobs: Sequence[wf.Job]\n",
" Jobs to wait for\n",
" interval_sec: int, optional, default 5\n",
" Wait at least this many seconds between polling for job updates.\n",
" \n",
" Yields\n",
" ------\n",
" job: wf.Job\n",
" A completed job (either succeeded or failed).\n",
" \"\"\"\n",
" jobs = list(jobs)\n",
" while len(jobs) > 0:\n",
" loop_start = time.perf_counter()\n",
"\n",
" i = 0\n",
" while i < len(jobs):\n",
" job = jobs[i]\n",
" if not job.done: # in case it's already loaded\n",
" try:\n",
" job.refresh()\n",
" except Exception:\n",
" continue # be resilient to transient errors for now\n",
"\n",
" if job.done:\n",
" yield job\n",
" del jobs[i] # \"advances\" i\n",
" else:\n",
" i += 1\n",
"\n",
" loop_duration = time.perf_counter() - loop_start\n",
" if len(jobs) > 0 and loop_duration < interval_sec:\n",
" time.sleep(interval_sec - loop_duration)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def handle_result(tile, l8_data, s2_data, s1_data):\n",
" print(tile.key, l8_data.shape)\n",
" # TODO: do whatever you'd do with the result!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"failed = []\n",
"for job in tqdm(as_completed(jobs), total=len(jobs)):\n",
" if job.error is not None:\n",
" failed.append(job)\n",
" print(job.error)\n",
" else:\n",
" l8_data, s2_data, s1_data = job.result(progress_bar=False)\n",
" tile = job_to_tile[job]\n",
" handle_result(tile, l8_data, s2_data, s1_data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# if you need to rerun any failures, you can just do\n",
"# new_jobs = [job.resubmit() for job in failed]"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment