Skip to content

Instantly share code, notes, and snippets.

@vincentsarago
Created November 30, 2022 15:11
Show Gist options
  • Save vincentsarago/b00f50f8b66ab5ccbd4449f6a1bd8b71 to your computer and use it in GitHub Desktop.
Save vincentsarago/b00f50f8b66ab5ccbd4449f6a1bd8b71 to your computer and use it in GitHub Desktop.
"""Rasterize Points
requirements:
- rasterio
- sparse
"""
import typing
import math
import warnings
import numpy
import rasterio
from rasterio.crs import CRS
from rasterio.dtypes import _gdal_typename
from rasterio.enums import Resampling
from rasterio.shutil import copy
from rasterio.transform import Affine, from_bounds
from rasterio.warp import transform as transform_points
from sparse import COO
in_crs = CRS.from_epsg(4326)
out_crs = CRS.from_epsg(3857)
# Point values in form of value, lon, lat
point_values: typing.List[typing.Tuple[float, float, float]]
# unpack points
values, lon, lat = zip(*point_values)
# Translate the lat/lon coordinates to `out_crs`
x, y = transform_points(in_crs, out_crs, lon, lat)
nodata_value: typing.Any = 0
# Either define the output resolution (in out_crs projection) or the output shape
out_shape: typing.Optional[typing.Tuple[int, int]]
resolution: typing.Optional[float]
xmin = numpy.amin(x)
xmax = numpy.amax(x)
ymin = numpy.amin(y)
ymax = numpy.amax(y)
bounds = [xmin, ymin, xmax, ymax]
if out_shape is not None:
height, width = out_shape
resolution = max(
(bounds[2] - bounds[0]) / width,
(bounds[3] - bounds[1]) / height,
)
elif resolution is not None:
width = round((bounds[2] - bounds[0]) / resolution) + 1
height = round((bounds[3] - bounds[1]) / resolution) + 1
else:
raise Exception("missing output resolution or shape")
# Output Affine Transform
transform = Affine.translation(bounds[0] - resolution / 2, bounds[1] - resolution / 2) * Affine.scale(resolution, resolution)
# Get Row/Col indexes for each point
rows, cols = rasterio.transform.rowcol(transform, x, y, op=math.floor) # return rows, cols
# Rasterize Dense Matrix using Sparse COO
data = COO((rows, cols), values, fill_value=nodata_value).todense()
# Make sure width/height match array shapes
assert (height, width) == data.shape
# Create the Output Raster
info = {
"DATAPOINTER": data.__array_interface__["data"][0],
"PIXELS": width,
"LINES": height,
"BANDS": 1,
"DATATYPE": _gdal_typename(data.dtype.name),
}
strides = data.__array_interface__.get("strides", None)
if strides is not None:
if len(strides) == 2:
lineoffset, pixeloffset = strides
info.update(LINEOFFSET=lineoffset, PIXELOFFSET=pixeloffset)
else:
bandoffset, lineoffset, pixeloffset = strides
info.update(
BANDOFFSET=bandoffset,
LINEOFFSET=lineoffset,
PIXELOFFSET=pixeloffset,
)
dataset_options = ",".join(f"{name}={val}" for name, val in info.items())
datasetname = f"MEM:::{dataset_options}"
with warnings.catch_warnings():
warnings.simplefilter("ignore", rasterio.errors.NotGeoreferencedWarning)
with rasterio.open(datasetname, "r+") as src:
src.crs = out_crs
src.transform = transform
src.nodata = nodata_value
dst_profile = {
"driver": "COG",
"blocksize": 256,
"compress": "DEFLATE",
"sparse_ok": "TRUE",
"nodata": nodata_value,
}
fout = f"cog.tif"
copy(src, fout, **dst_profile)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment