Skip to content

Instantly share code, notes, and snippets.

@sixy6e
Last active March 12, 2024 21:25
Show Gist options
  • Save sixy6e/3a0d857de5185a1736d6defb1fc01218 to your computer and use it in GitHub Desktop.
Save sixy6e/3a0d857de5185a1736d6defb1fc01218 to your computer and use it in GitHub Desktop.
density-ingest
Sample code for tiledb data ingestion to be used as input into cellular density calcs.
ausseabed-pl019-ingested-data/L2/0331_NETasmania_Bergersen_10G/ga_ausseabed_419ae2f5-48c1-4d81-bd1f-18bf80e658cd_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/0337_NETasmania_Challenge072012/ga_ausseabed_db85ad19-235e-4ec9-a0c7-d5dbc4cb635b_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2015_T01-em122/ga_ausseabed_0a505d7e-8e6d-4dfc-9560-8d0833508f29_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2015_T01-em710/ga_ausseabed_25625f0b-ea69-46f5-bceb-edfc6791374e_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2015_T02-em122/ga_ausseabed_ac199f29-20eb-4e9c-956f-c562e069f3c1_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2016_E02-em122/ga_ausseabed_24b4168e-05ae-4061-abb1-da0b6876305b_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2016_T02-em122/ga_ausseabed_cfcffeff-abec-4736-b380-a19c189ef52f_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2016_T02-em710/ga_ausseabed_83ebeb43-4a11-4c85-9596-e2e2867b53f7_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2017_V03-em122/ga_ausseabed_38e02f71-29c5-454c-9788-6c45d06d21bd_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2017_V03-em710/ga_ausseabed_b3d4643d-6b32-4e99-94bf-b5eacd26ed99_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2018_T01-em122/ga_ausseabed_89348d5b-19f0-4b1b-b946-90cdfe73bdfd_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2018_T02-em122/ga_ausseabed_06470ef3-7a88-4097-89a1-ee3cea817241_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2018_T02-em710/ga_ausseabed_e229a7ca-d65b-4bb7-9bd8-22274f07890f_stac-metadata.geojson
ausseabed-pl019-ingested-data/L2/IN2018_V04-em122/ga_ausseabed_2ee400ee-d8ce-4fda-a15c-e494d0facb9a_stac-metadata.geojson
import json
import numpy
import tiledb
import boto3
import click
import structlog
from shapely import geometry
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="ISO"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(sort_keys=True),
],
)
LOG = structlog.get_logger()
ROI = geometry.shape(
{
"coordinates": (
(
(148.526, -40.153),
(149.483, -40.153),
(149.483, -40.716),
(148.526, -40.716),
(148.526, -40.153),
),
),
"type": "Polygon",
}
)
STAC_LISTING = "datasets.txt"
OUT_URI = "s3://ardc-gmrt-test-data/density-mapping/gmrt-roi-point-cloud.tiledb"
def read_stac_doc(uri, ctx):
vfs = tiledb.vfs.VFS(ctx=ctx)
with vfs.open(uri) as src:
md = json.load(src)
return md
def context(profile):
session = boto3.Session(profile_name=profile)
creds = session.get_credentials()
# region = "us-east-1" if profile == "pl019-data" else "ap-southeast-2"
# region = "" if profile == "pl019-data" else "ap-southeast-2"
# was having issues when passing None to config;
# results in 'None' which is not correct
if profile == "pl019-data":
config = tiledb.Config(
{
"vfs.s3.aws_access_key_id": creds.access_key,
"vfs.s3.aws_secret_access_key": creds.secret_key,
}
)
else:
config = tiledb.Config(
{
"vfs.s3.aws_access_key_id": creds.access_key,
"vfs.s3.aws_secret_access_key": creds.secret_key,
"vfs.s3.region": "ap-southeast-2",
"vfs.s3.aws_session_token": creds.token,
}
)
ctx = tiledb.Ctx(config=config)
return ctx
def lonlat_domain():
"""Set array lon/lat domain."""
index_filters = tiledb.FilterList([tiledb.ZstdFilter(level=16)])
xdim = tiledb.Dim(
"X",
domain=(None, None),
tile=1000,
dtype=numpy.float64,
filters=index_filters,
)
ydim = tiledb.Dim(
"Y",
domain=(None, None),
tile=1000,
dtype=numpy.float64,
filters=index_filters,
)
domain = tiledb.Domain(xdim, ydim)
return domain
def xyz_schema(ctx=None):
domain = lonlat_domain() # only 2 dims for the GMRT project
attributes = [
tiledb.Attr("Z", dtype=numpy.float32, filters=[tiledb.ZstdFilter(level=16)]),
]
schema = tiledb.ArraySchema(
domain=domain,
sparse=True,
attrs=attributes,
cell_order="hilbert",
tile_order="row-major",
capacity=100_000,
allows_duplicates=True,
ctx=ctx,
)
return schema
def create_mbes_array(array_uri, schema, ctx=None):
"""Create the TileDB array."""
# with tiledb.scope_ctx(ctx):
# tiledb.Array.create(array_uri, schema)
tiledb.Array.create(array_uri, schema, ctx=ctx, overwrite=True)
def append_ping_dataframe(dataframe, array_uri, ctx=None, chunks=100000, timestamp=None):
"""
Append the ping dataframe read from a GSF file.
Only to be used with sparse arrays.
"""
kwargs = {
"mode": "append",
"sparse": True,
"ctx": ctx,
"timestamp": timestamp,
}
idxs = [
(start, start + chunks) for start in numpy.arange(0, dataframe.shape[0], chunks)
]
for idx in idxs:
subset = dataframe[idx[0] : idx[1]]
tiledb.dataframe_.from_pandas(array_uri, subset, **kwargs)
@click.command()
@click.option("--profile-name", type=str, required=True, help="AWS profile to use for writing")
def main(profile_name):
"""Main func."""
with open(STAC_LISTING, "r") as src:
stac_docs = [d.strip() for d in src.readlines()]
pl019_ctx = context("pl019-data")
ctx = context(profile_name)
vfs = tiledb.vfs.VFS(ctx=ctx)
if vfs.is_dir(OUT_URI):
LOG.info("Removing previous array", array_uri=OUT_URI)
vfs.remove_dir(OUT_URI)
# need to remove the array if we already have one established
LOG.info("Creating array", array_uri=OUT_URI)
schema = xyz_schema(ctx)
tiledb.Array.create(OUT_URI, schema)
# tiledb.Array.create(OUT_URI, schema, overwrite=True)
# create_mbes_array(OUT_URI, schema, ctx)
minx, miny, maxx, maxy = ROI.bounds
n_points = 0
versioning_md = []
for i, doc_name in enumerate(stac_docs):
tstamp = i + 1
stac_uri = f"s3://{doc_name}"
LOG.info("Processing STAC document:", stac_uri=stac_uri, tiledb_timestamp=tstamp)
md = read_stac_doc(stac_uri, pl019_ctx)
array_uri = md["assets"]["bathymetry_data"]["href"]
survey_points = 0
with tiledb.open(array_uri, ctx=pl019_ctx) as ds:
LOG.info("Querying points:", array_uri=array_uri)
query = ds.query(attrs=["Z"], return_incomplete=True)
for df in query.df[minx:maxx, miny:maxy]:
n_points += df.shape[0]
survey_points += df.shape[0]
append_ping_dataframe(df, OUT_URI, ctx=ctx, timestamp=tstamp)
versioning_md.append(
{
"dataset_uri": stac_uri,
"tiledb_timestamp": tstamp,
"points_contributed": survey_points,
}
)
LOG.info("Survey Ingested:", stac_uri=stac_uri, points_ingested=f"{survey_points:_}")
with tiledb.open(OUT_URI, "w", ctx=ctx) as outds:
outds.meta["tiledb_timestamps"] = json.dumps(versioning_md)
LOG.info("Finished point cloud ingestion", uri=OUT_URI, total_point=f"{n_points:_}")
if __name__ == "__main__":
main()
import geopandas
import json
import matplotlib.pyplot as plt
import s3fs
from shapely import geometry
roi = geometry.shape(
{
"coordinates": (
(
(148.526, -40.153),
(149.483, -40.153),
(149.483, -40.716),
(148.526, -40.716),
(148.526, -40.153),
),
),
"type": "Polygon",
}
)
fs = s3fs.S3FileSystem(profile="pl019-data")
with open("datasets.txt") as src:
datasets = [i.strip() for i in src.readlines()]
geoms = []
for uri in datasets:
with fs.open(uri) as src:
md = json.load(src)
geoms.append(roi.intersection(geometry.shape(md["geometry"])))
basenames = [k.split("/")[2] for k in datasets]
gdf = geopandas.GeoDataFrame(
{"uri": datasets, "geometry": geoms, "name": basenames}, crs="epsg:4326"
)
gdf.plot(
column="name",
categorical=True,
legend=True,
legend_kwds={"loc": "center left", "bbox_to_anchor": (1, 0.5), "fmt": "{:.0f}"},
cmap="tab20",
)
import json
import click
import structlog
LOG = structlog.get_logger()
ROI_TILES = "roi-tiles-info.json"
ARRAY_URI = "s3://ardc-gmrt-test-data/density-mapping/gmrt-roi-point-cloud.tiledb/"
OUT_CRS = "EPSG:6933"
TILEDB_CONFIG = "tdb-config.cfg"
def reader_pipe():
pipe = {
"type": "readers.tiledb",
"strict": False,
"array_name": ARRAY_URI,
"config_file": TILEDB_CONFIG,
"override_srs": "EPSG:4326",
}
return pipe
def proj_pipe():
pipe = {
"type": "filters.reprojection",
"out_srs": OUT_CRS,
}
return pipe
def writer_pipe_density(tile_info):
cell = f"{tile_info['tile_idx'][0]}_{tile_info['tile_idx'][1]}"
res = tile_info["transform"][1]
out_uri = f"s3://ardc-gmrt-test-data/density-mapping/cell-{cell}-density-{res}.tiledb"
pipe = {
"type": "writers.gdal",
"binmode": True,
"filename": out_uri,
"gdaldriver": "TileDB",
"resolution": res,
"origin_x": tile_info["bbox"][0][0],
"origin_y": tile_info["bbox"][0][1],
"width": tile_info["width"],
"height": tile_info["height"],
"override_srs": OUT_CRS,
"nodata": 0,
"output_type": [
"count",
],
"gdalopts": [
"COMPRESSION=ZSTD",
"COMPRESSION_LEVEL=16",
"BLOCKXSIZE=256",
"BLOCKYSIZE=256",
f"TILEDB_CONFIG={TILEDB_CONFIG}",
],
"data_type": "uint32",
}
return pipe
def writer_pipe_stats(tile_info):
cell = f"{tile_info['tile_idx'][0]}_{tile_info['tile_idx'][1]}"
res = tile_info["transform"][1]
out_uri = f"s3://ardc-gmrt-test-data/density-mapping/cell-{cell}-stats-{res}.tiledb"
pipe = {
"type": "writers.gdal",
"binmode": True,
"filename": out_uri,
"gdaldriver": "TileDB",
"resolution": res,
"origin_x": tile_info["bbox"][0][0],
"origin_y": tile_info["bbox"][0][1],
"width": tile_info["width"],
"height": tile_info["height"],
"override_srs": OUT_CRS,
"nodata": -9999,
"output_type": [
"min",
"max",
"mean",
"stdev",
],
"gdalopts": [
"COMPRESSION=ZSTD",
"COMPRESSION_LEVEL=16",
"BLOCKXSIZE=256",
"BLOCKYSIZE=256",
f"TILEDB_CONFIG={TILEDB_CONFIG}",
],
"data_type": "float32",
}
return pipe
def main():
with open(ROI_TILES, "r") as src:
tiles_info = json.load(src)
for tile in tiles_info:
LOG.info("Processing tile:", **tile)
pipeline = [
reader_pipe(),
proj_pipe(),
writer_pipe_density(tile),
]
cell = f"{tile['tile_idx'][0]}_{tile['tile_idx'][1]}"
res = tile["transform"][1]
out_fname = f"cell-{cell}-pdal-pipeline-{res}-density.json"
with open(out_fname, "w") as out_src:
json.dump(pipeline, out_src, indent=4)
pipeline = [
reader_pipe(),
proj_pipe(),
writer_pipe_stats(tile),
]
out_fname = f"cell-{cell}-pdal-pipeline-{res}-stats.json"
with open(out_fname, "w") as out_src:
json.dump(pipeline, out_src, indent=4)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment