Skip to content

Instantly share code, notes, and snippets.

@chuckwondo
Forked from TomAugspurger/async-pystac-client.ipynb
Last active September 21, 2023 15:16
Show Gist options
  • Save chuckwondo/285ad95d95102f754942ad8a4d9632f9 to your computer and use it in GitHub Desktop.
Save chuckwondo/285ad95d95102f754942ad8a4d9632f9 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "7b75ec23-3572-4d49-b7d3-88bdc3cedb5b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Note: you may need to restart the kernel to use updated packages.\n"
]
}
],
"source": [
"# require spatial partitioning features from dask-geopandas main\n",
"%pip install -qU git+https://github.com/geopandas/dask-geopandas httpx pystac-client gevent requests"
]
},
{
"cell_type": "markdown",
"id": "ddd03f09-d76c-4906-ab3c-447e5340503a",
"metadata": {},
"source": [
"# STAC query perf\n",
"\n",
"This notebook looks into the performance of the STAC section from Caleb's mosaiks tutorial. The problem: given a bunch of (semi-randomly distributed) Points, find the \"best\" STAC item, where\n",
"\n",
"1. The STAC item covers the point\n",
"2. probably other stuff... haven't checked.\n",
"\n",
"And do it quickly. Our baseline is ~30 minutes.\n",
"\n",
"The dataset is 100,000 points in the US."
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "22d75e27-b488-4dc9-a68b-b388abac21b1",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/chuck/src/chuckwondo/gists/.venv/lib/python3.10/site-packages/dask/dataframe/core.py:7086: FutureWarning: Meta is not valid, `map_partitions` and `map_overlap` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.\n",
" warnings.warn(\n"
]
}
],
"source": [
"import gevent\n",
"from gevent import monkey\n",
"\n",
"monkey.patch_socket()\n",
"monkey.patch_ssl()\n",
"\n",
"import requests\n",
"import pandas as pd\n",
"import dask_geopandas\n",
"import geopandas\n",
"import shapely.geometry\n",
"import pystac_client\n",
"import dask\n",
"import tlz\n",
"import dask.array\n",
"import httpx\n",
"import asyncio\n",
"\n",
"df = pd.read_csv(\n",
" \"https://files.codeocean.com/files/verified/fa908bbc-11f9-4421-8bd3-72a4bf00427f_v2.0/data/int/applications/population/outcomes_sampled_population_CONTUS_16_640_UAR_100000_0.csv?download\", # noqa: E501\n",
" index_col=0,\n",
" na_values=[-999],\n",
")\n",
"points = df.dropna()[[\"lon\", \"lat\"]]\n",
"population = df.dropna()[\"population\"]\n",
"\n",
"gdf = geopandas.GeoDataFrame(df, geometry=geopandas.points_from_xy(df.lon, df.lat))\n",
"\n",
"ddf = dask_geopandas.from_geopandas(gdf, npartitions=1)\n",
"hd = ddf.hilbert_distance().compute()\n",
"gdf[\"hd\"] = hd\n",
"gdf = gdf.sort_values(\"hd\")\n",
"\n",
"NPARTITIONS=250\n",
"dgdf = dask_geopandas.from_geopandas(gdf, npartitions=NPARTITIONS, sort=False)\n",
"hulls = dgdf.map_partitions(lambda x: shapely.geometry.mapping(x.unary_union.convex_hull), meta=object).compute()"
]
},
{
"cell_type": "markdown",
"id": "a824a2f8-4323-4c27-a66f-4e12f30f1b16",
"metadata": {},
"source": [
"dask-geopandas implements spatial partitioning. This will assign an integer to each Point so that nearby points in lat / lon are nearby in hilbert space. We'll sort by that."
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "5babc208-dac4-4d6c-8c12-8c0c81b553bf",
"metadata": {},
"outputs": [],
"source": [
"async def query(intersects, max_connections=20):\n",
" search_start = \"2018-01-01\"\n",
" search_end = \"2019-12-31\"\n",
" catalog = pystac_client.Client.open(\"https://planetarycomputer.microsoft.com/api/stac/v1\")\n",
"\n",
" # The time frame in which we search for non-cloudy imagery\n",
" search = catalog.search(\n",
" collections=[\"sentinel-2-l2a\"],\n",
" intersects=intersects,\n",
" datetime=[search_start, search_end],\n",
" query={\"eo:cloud_cover\": {\"lt\": 10}},\n",
" limit=500\n",
" )\n",
" parameters = search.get_parameters()\n",
" results = []\n",
" timeout = httpx.Timeout(None, connect=20, read=120)\n",
" \n",
" if isinstance(max_connections, int):\n",
" max_connections = asyncio.Semaphore(max_connections)\n",
"\n",
" async with httpx.AsyncClient(timeout=timeout) as client:\n",
" async with max_connections:\n",
" r = await client.post(search.url, json=parameters)\n",
" resp = r.json()\n",
" results.extend(resp[\"features\"])\n",
" next_link = [x for x in resp[\"links\"] if x[\"rel\"] == \"next\"]\n",
" if next_link:\n",
" next_link, = next_link\n",
"\n",
" while next_link:\n",
" async with max_connections:\n",
" r = await client.post(next_link[\"href\"], json=next_link[\"body\"])\n",
" resp = r.json()\n",
" results.extend(resp[\"features\"])\n",
" \n",
" next_link = [x for x in resp[\"links\"] if x[\"rel\"] == \"next\"]\n",
" if next_link:\n",
" next_link, = next_link\n",
"\n",
" return results"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "43f5f87b-9ee3-48be-9749-db93c1137036",
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"import time"
]
},
{
"cell_type": "markdown",
"id": "5f214e45-fc50-4f17-bb7d-587333955c57",
"metadata": {},
"source": [
"Let's get a sense for single-threaded, serial perf"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "dd5257e6-0cb4-433c-b001-be87604b3174",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"82.00\n"
]
}
],
"source": [
"N = 20\n",
"t0 = time.time()\n",
"for hull in hulls.to_list()[:N]:\n",
" _ = await query(hull)\n",
"t1 = time.time()\n",
"print(f\"{t1 - t0:0.2f}\")"
]
},
{
"cell_type": "markdown",
"id": "6212370d-cc5a-4190-ad42-9038a8a70a8c",
"metadata": {},
"source": [
"And now for concurrent perf;"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "5ca4bb9a-297b-474b-88a1-5037a1587245",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"23.88\n"
]
}
],
"source": [
"t0 = time.time()\n",
"\n",
"result = await asyncio.gather(\n",
" *[query(hull) for hull in hulls.to_list()[:N]]\n",
")\n",
"\n",
"t1 = time.time()\n",
"print(f\"{t1 - t0:0.2f}\")"
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "9a9666d0",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"20"
]
},
"execution_count": 27,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(result)"
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "5fc53386",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"22003"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sum(len(x) for x in result)"
]
},
{
"cell_type": "markdown",
"id": "2aaa9dc3-ef9c-4c8f-a664-210d2d3309b0",
"metadata": {},
"source": [
"So a 5x speedup by moving from serial to concurrent."
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "eb77dfaf",
"metadata": {},
"outputs": [],
"source": [
"def sync_query(intersects):\n",
" search_start = \"2018-01-01\"\n",
" search_end = \"2019-12-31\"\n",
" catalog = pystac_client.Client.open(\"https://planetarycomputer.microsoft.com/api/stac/v1\")\n",
"\n",
" # The time frame in which we search for non-cloudy imagery\n",
" search = catalog.search(\n",
" collections=[\"sentinel-2-l2a\"],\n",
" intersects=intersects,\n",
" datetime=[search_start, search_end],\n",
" query={\"eo:cloud_cover\": {\"lt\": 10}},\n",
" limit=500\n",
" )\n",
" parameters = search.get_parameters()\n",
" results = []\n",
" \n",
" with requests.Session() as session:\n",
" r = session.post(search.url, json=parameters)\n",
" resp = r.json()\n",
" results.extend(resp[\"features\"])\n",
" next_link = [x for x in resp[\"links\"] if x[\"rel\"] == \"next\"]\n",
" if next_link:\n",
" next_link, = next_link\n",
"\n",
" while next_link:\n",
" r = session.post(next_link[\"href\"], json=next_link[\"body\"])\n",
" resp = r.json()\n",
" results.extend(resp[\"features\"])\n",
" \n",
" next_link = [x for x in resp[\"links\"] if x[\"rel\"] == \"next\"]\n",
" if next_link:\n",
" next_link, = next_link\n",
"\n",
" return results\n"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "f09b3063",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"16.91\n"
]
}
],
"source": [
"t0 = time.time()\n",
"\n",
"jobs = [gevent.spawn(sync_query, hull) for hull in hulls.to_list()[:N]]\n",
"\n",
"sync_results = gevent.wait(jobs)\n",
"\n",
"t1 = time.time()\n",
"print(f\"{t1 - t0:0.2f}\")\n"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "bfc30db0",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"20"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"len(sync_results)"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "caafe8ff",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"22003"
]
},
"execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sum(len(x.value) for x in sync_results)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
},
"widgets": {
"application/vnd.jupyter.widget-state+json": {
"state": {},
"version_major": 2,
"version_minor": 0
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment