Skip to content

Instantly share code, notes, and snippets.

@TomAugspurger
Created June 6, 2023 02:00
Show Gist options
  • Save TomAugspurger/ae9bc974a5c28e44bc95234c72750c47 to your computer and use it in GitHub Desktop.
Save TomAugspurger/ae9bc974a5c28e44bc95234c72750c47 to your computer and use it in GitHub Desktop.
import dask.dataframe as dd
import adlfs
import geopandas
import dask_geopandas
import pyarrow.fs
import pyproj
import json
import dask.distributed
from typing import Dict, Any, Union, Optional
def add_geo_metadata(
prefix: str,
geometry_name: str = "geometry",
geoparquet_version: str = "1.0.0-beta.1",
storage_options: Optional[Dict[str, Any]] = None,
schema: Optional[Dict] = None,
) -> str:
storage_options = storage_options or {}
fs = adlfs.AzureBlobFileSystem(**storage_options)
arrow_fs = pyarrow.fs.PyFileSystem(pyarrow.fs.FSSpecHandler(fs))
for path in fs.find(prefix):
table = pyarrow.parquet.read_table(path, filesystem=fs)
if b"geo" in table.schema.metadata:
# Already present
continue
bbox = list(
geopandas.array.from_wkb(table[geometry_name].to_numpy()).total_bounds
)
geo_metadata = {
"version": geoparquet_version,
"primary_column": geometry_name,
"columns": {
geometry_name: {
"encoding": "WKB",
"crs": json.loads(pyproj.CRS("WGS 84").to_json()),
"geometry_types": ["Polygon"],
"bbox": bbox,
}
},
}
metadata = {
**table.schema.metadata,
b"geo": json.dumps(geo_metadata).encode(),
}
if schema:
jsonschema.validate(geo_metadata, schema)
metadata = {
**table.schema.metadata,
b"geo": json.dumps(geo_metadata).encode(),
}
new_table = table.replace_schema_metadata(metadata)
# TODO: robust error handling here
pyarrow.parquet.write_table(new_table, path, filesystem=arrow_fs)
return prefix
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment