Created
November 30, 2022 15:11
-
-
Save vincentsarago/b00f50f8b66ab5ccbd4449f6a1bd8b71 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Rasterize Points | |
requirements: | |
- rasterio | |
- sparse | |
""" | |
import typing | |
import math | |
import warnings | |
import numpy | |
import rasterio | |
from rasterio.crs import CRS | |
from rasterio.dtypes import _gdal_typename | |
from rasterio.enums import Resampling | |
from rasterio.shutil import copy | |
from rasterio.transform import Affine, from_bounds | |
from rasterio.warp import transform as transform_points | |
from sparse import COO | |
in_crs = CRS.from_epsg(4326) | |
out_crs = CRS.from_epsg(3857) | |
# Point values in form of value, lon, lat | |
point_values: typing.List[typing.Tuple[float, float, float]] | |
# unpack points | |
values, lon, lat = zip(*point_values) | |
# Translate the lat/lon coordinates to `out_crs` | |
x, y = transform_points(in_crs, out_crs, lon, lat) | |
nodata_value: typing.Any = 0 | |
# Either define the output resolution (in out_crs projection) or the output shape | |
out_shape: typing.Optional[typing.Tuple[int, int]] | |
resolution: typing.Optional[float] | |
xmin = numpy.amin(x) | |
xmax = numpy.amax(x) | |
ymin = numpy.amin(y) | |
ymax = numpy.amax(y) | |
bounds = [xmin, ymin, xmax, ymax] | |
if out_shape is not None: | |
height, width = out_shape | |
resolution = max( | |
(bounds[2] - bounds[0]) / width, | |
(bounds[3] - bounds[1]) / height, | |
) | |
elif resolution is not None: | |
width = round((bounds[2] - bounds[0]) / resolution) + 1 | |
height = round((bounds[3] - bounds[1]) / resolution) + 1 | |
else: | |
raise Exception("missing output resolution or shape") | |
# Output Affine Transform | |
transform = Affine.translation(bounds[0] - resolution / 2, bounds[1] - resolution / 2) * Affine.scale(resolution, resolution) | |
# Get Row/Col indexes for each point | |
rows, cols = rasterio.transform.rowcol(transform, x, y, op=math.floor) # return rows, cols | |
# Rasterize Dense Matrix using Sparse COO | |
data = COO((rows, cols), values, fill_value=nodata_value).todense() | |
# Make sure width/height match array shapes | |
assert (height, width) == data.shape | |
# Create the Output Raster | |
info = { | |
"DATAPOINTER": data.__array_interface__["data"][0], | |
"PIXELS": width, | |
"LINES": height, | |
"BANDS": 1, | |
"DATATYPE": _gdal_typename(data.dtype.name), | |
} | |
strides = data.__array_interface__.get("strides", None) | |
if strides is not None: | |
if len(strides) == 2: | |
lineoffset, pixeloffset = strides | |
info.update(LINEOFFSET=lineoffset, PIXELOFFSET=pixeloffset) | |
else: | |
bandoffset, lineoffset, pixeloffset = strides | |
info.update( | |
BANDOFFSET=bandoffset, | |
LINEOFFSET=lineoffset, | |
PIXELOFFSET=pixeloffset, | |
) | |
dataset_options = ",".join(f"{name}={val}" for name, val in info.items()) | |
datasetname = f"MEM:::{dataset_options}" | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore", rasterio.errors.NotGeoreferencedWarning) | |
with rasterio.open(datasetname, "r+") as src: | |
src.crs = out_crs | |
src.transform = transform | |
src.nodata = nodata_value | |
dst_profile = { | |
"driver": "COG", | |
"blocksize": 256, | |
"compress": "DEFLATE", | |
"sparse_ok": "TRUE", | |
"nodata": nodata_value, | |
} | |
fout = f"cog.tif" | |
copy(src, fout, **dst_profile) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment