Skip to content

Instantly share code, notes, and snippets.

@chuckwondo
Created September 27, 2023 16:19
Show Gist options
  • Save chuckwondo/6e16cbbc44f8b0e0be41f493c4511796 to your computer and use it in GitHub Desktop.
Save chuckwondo/6e16cbbc44f8b0e0be41f493c4511796 to your computer and use it in GitHub Desktop.
Concurrent pystac-client search queries

Concurrent pystac-client Search Queries

Motivation: Implement async requests

This is a very simple performance comparison between approaches for running search queries via pystac-client:

  • sequential (baseline)
  • asyncio with HTTPX library
  • greenlets with gevent library

In an effort to aid comparison (not only performance, but also code differences), the code is intentionally simple, and has been construction such that diffing the files should make it readily apparent as to the required changes from the baseline to the 2 concurrent approaches.

The code here is based upon the code in the notebook in this other gist: https://gist.github.com/TomAugspurger/50c3573d39213a2cb450d02074e4db01

Setup

python -m venv .venv
source .venv/bin/activate
python -m pip install -U pip
python -m pip install dask-geopandas gevent httpx pystac-client requests scalene

Generate polygons (written to hulls.json) to use for search intersection by the 3 approaches listed above:

python hulls.py

Produce Performance Profiles

We'll use Scalene so we can get more information than simply elapsed times. The general form is as follows:

scalene --cli QUERY_SCRIPT [--- N]

where QUERY_SCRIPT is one of:

  • query.py (sequential)
  • query-asyncio.py (HTTPX)
  • query-gevent.py (gevent)

and N is an integer specifying the number of queries to execute (default: 10).

Note that there are 3 dashes (not 2) preceding N, as required by Scalene.

Further, as hulls.py is written here, only 250 polygons are generated, so the maximum value for N is 250.

For example, to run each query implementation, running 50 queries for each:

scalene --cli query.py --- 50
scalene --cli query-asyncio.py --- 50
scalene --cli query-gevent.py --- 50

For other output options, including web output, run scalene -h.

import dask_geopandas as dgpd
import geopandas as gpd
import pandas as pd
import shapely.geometry
df = pd.read_csv(
"https://files.codeocean.com/files/verified/"
"fa908bbc-11f9-4421-8bd3-72a4bf00427f_v2.0/data/int/applications/population/"
"outcomes_sampled_population_CONTUS_16_640_UAR_100000_0.csv?download",
index_col=0,
na_values=["-999"],
)
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.lon, df.lat))
ddf = dgpd.from_geopandas(gdf, npartitions=1)
dgdf = dgpd.from_geopandas(
gdf.assign(hd=ddf.hilbert_distance().compute()).sort_values("hd"),
npartitions=250,
sort=False,
)
dgdf.map_partitions(
lambda gdf: shapely.geometry.mapping(gdf.unary_union.convex_hull)
).compute().to_json("hulls.json", orient="records", indent=4)
import asyncio
from typing import Any, Mapping, Sequence, TypeAlias, TypedDict
import httpx
import pandas as pd
import pystac_client
from pystac_client.item_search import IntersectsLike
Feature: TypeAlias = Mapping[str, Any]
class Link(TypedDict, total=True):
href: str
body: Mapping[str, Any]
async def fetch_features(
link: Link,
client: httpx.AsyncClient,
concurrency: asyncio.BoundedSemaphore,
) -> Sequence[Feature]:
next_link: Link | None = link
features = []
while next_link:
async with concurrency:
r = await client.post(next_link["href"], json=next_link["body"])
payload = r.json()
features.extend(payload["features"])
links = payload["links"]
next_link = next((link for link in links if link["rel"] == "next"), None)
return features
async def query(
intersects: IntersectsLike,
concurrency: asyncio.BoundedSemaphore,
) -> Sequence[Feature]:
catalog = pystac_client.Client.open(
"https://planetarycomputer.microsoft.com/api/stac/v1"
)
search = catalog.search(
collections=["sentinel-2-l2a"],
intersects=intersects,
datetime=["2018-01-01", "2019-12-31"],
query={"eo:cloud_cover": {"lt": 10}},
limit=500,
)
timeout = httpx.Timeout(None, connect=20, read=120)
async with httpx.AsyncClient(timeout=timeout) as client:
link: Link = {"href": search.url, "body": search.get_parameters()}
return await fetch_features(link, client, concurrency)
async def main(n: int) -> None:
hulls = pd.read_json("hulls.json", orient="records", typ="series")[:n]
concurrency = asyncio.BoundedSemaphore(20)
results = await asyncio.gather(*[query(hull, concurrency) for hull in hulls])
features = [feature for features in results for feature in features]
print(len(features))
if __name__ == "__main__":
import sys
n = 10 if len(sys.argv) == 1 else int(sys.argv[1])
asyncio.run(main(n))
import gevent
import gevent.lock
import gevent.monkey
gevent.monkey.patch_socket()
gevent.monkey.patch_ssl()
from typing import Any, Iterable, Mapping, Sequence, TypeAlias, TypedDict
import pandas as pd
import requests
import pystac_client
from pystac_client.item_search import IntersectsLike
Feature: TypeAlias = Mapping[str, Any]
class Link(TypedDict, total=True):
href: str
body: Mapping[str, Any]
def fetch_features(
link: Link,
session: requests.Session,
concurrency: gevent.lock.BoundedSemaphore,
**post_kwargs: Any,
) -> Iterable[Feature]:
next_link: Link | None = link
while next_link:
with concurrency:
r = session.post(next_link["href"], json=next_link["body"], **post_kwargs)
payload = r.json()
yield from payload["features"]
links = payload["links"]
next_link = next((link for link in links if link["rel"] == "next"), None)
def query(
intersects: IntersectsLike,
concurrency: gevent.lock.BoundedSemaphore,
) -> Sequence[Feature]:
catalog = pystac_client.Client.open(
"https://planetarycomputer.microsoft.com/api/stac/v1"
)
search = catalog.search(
collections=["sentinel-2-l2a"],
intersects=intersects,
datetime=["2018-01-01", "2019-12-31"],
query={"eo:cloud_cover": {"lt": 10}},
limit=500,
)
with requests.Session() as session:
link: Link = {"href": search.url, "body": search.get_parameters()}
return tuple(fetch_features(link, session, concurrency, timeout=(20, 120)))
def main(n: int) -> None:
hulls = pd.read_json("hulls.json", orient="records", typ="series")[:n]
concurrency = gevent.lock.BoundedSemaphore(20)
results = gevent.wait([gevent.spawn(query, hull, concurrency) for hull in hulls])
features = [feature for features in results for feature in features.get()]
print(len(features))
if __name__ == "__main__":
import sys
n = 10 if len(sys.argv) == 1 else int(sys.argv[1])
main(n)
from typing import Any, Iterable, Mapping, Sequence, TypeAlias, TypedDict
import pandas as pd
import requests
import pystac_client
from pystac_client.item_search import IntersectsLike
Feature: TypeAlias = Mapping[str, Any]
class Link(TypedDict, total=True):
href: str
body: Mapping[str, Any]
def fetch_features(
link: Link,
session: requests.Session,
**post_kwargs: Any,
) -> Iterable[Feature]:
next_link: Link | None = link
while next_link:
r = session.post(next_link["href"], json=next_link["body"], **post_kwargs)
payload = r.json()
yield from payload["features"]
links = payload["links"]
next_link = next((link for link in links if link["rel"] == "next"), None)
def query(
intersects: IntersectsLike,
) -> Sequence[Feature]:
catalog = pystac_client.Client.open(
"https://planetarycomputer.microsoft.com/api/stac/v1"
)
search = catalog.search(
collections=["sentinel-2-l2a"],
intersects=intersects,
datetime=["2018-01-01", "2019-12-31"],
query={"eo:cloud_cover": {"lt": 10}},
limit=500,
)
with requests.Session() as session:
link: Link = {"href": search.url, "body": search.get_parameters()}
return tuple(fetch_features(link, session, timeout=(20, 120)))
def main(n: int) -> None:
hulls = pd.read_json("hulls.json", orient="records", typ="series")[:n]
results = [query(hull) for hull in hulls]
features = [feature for features in results for feature in features]
print(len(features))
if __name__ == "__main__":
import sys
n = 10 if len(sys.argv) == 1 else int(sys.argv[1])
main(n)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment