Skip to content

Instantly share code, notes, and snippets.

@sixy6e
Last active September 24, 2021 12:33
Show Gist options
  • Save sixy6e/cbb346262cc87ed87dc04bade5536d05 to your computer and use it in GitHub Desktop.
Save sixy6e/cbb346262cc87ed87dc04bade5536d05 to your computer and use it in GitHub Desktop.
QA algorithm extraction prototype
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
A toy prototype of a py-module that contains the algorithm components from the [hyo2_qc](https://github.com/hydroffice/hyo2_qc)
GUI toolkit module.
Only the fliers and gridqa submodules were extracted.
The base algorithms haven't been changed, but variable names were changed to be more descriptive (well those that I could interpret).
Most functions were converted to using straight numpy as there wasn't a need for them to be in Cython.
Some components used numexpr for a memory reduction as the array equations were lengthy in some instances, which would result in
a lot of temporary arrays being created behind the scenes.
A couple of the funcs still needed to have the per-pixel logic, but these have been rewritten in plain Python but will get compiled by
[Numba](https://numba.pydata.org/). This should give the same performance, if not better, whilst removing the need for knowing Cython.
from datetime import datetime, timedelta
from pathlib import Path
from enum import Enum
import numpy
import pdb
import attr
from typing import List
CHECKSUM_BIT = 0x80000000
NANO_SECONDS_SF = 1e-9
class RecordTypes(Enum):
"""The various record type contained within the GSF file."""
GSF_HEADER = 1
GSF_SWATH_BATHYMETRY_PING = 2
GSF_SOUND_VELOCITY_PROFILE = 3
GSF_PROCESSING_PARAMETERS = 4
GSF_SENSOR_PARAMETERS = 5
GSF_COMMENT = 6
GSF_HISTORY = 7
GSF_NAVIGATION_ERROR = 8
GSF_SWATH_BATHY_SUMMARY = 9
GSF_SINGLE_BEAM_PING = 10
GSF_HV_NAVIGATION_ERROR = 11
GSF_ATTITUDE = 12
@attr.s()
class RecordCount:
record_type: RecordTypes = attr.ib()
count: int = attr.ib(default=0)
@attr.s()
class Record:
record_type: RecordTypes = attr.ib()
data_size: int = attr.ib()
checksum_flag: bool = attr.ib()
index: int = attr.ib()
@attr.s()
class FileRecordIndex:
record_type: RecordTypes = attr.ib()
record_count: int = attr.ib(init=False)
data_size: List[int] = attr.ib(repr=False)
checksum_flag: List[bool] = attr.ib(repr=False)
indices: List[int] = attr.ib(repr=False)
def __attrs_post_init__(self):
self.record_count = len(self.indices)
def record(self, index):
result = Record(
record_type=self.record_type,
data_size=self.data_size[index],
checksum_flag=self.checksum_flag[index],
index=self.indices[index]
)
return result
def file_info(stream):
fname = Path(stream.name)
fsize = fname.stat().st_size
current_pos = stream.tell()
stream.seek(0)
results = {rtype: {"indices": [], "data_size": [], "checksum_flag": []} for rtype in RecordTypes}
while stream.tell() < fsize:
data_size, record_id, flag = record_info(stream)
results[RecordTypes(record_id)]["indices"].append(stream.tell())
results[RecordTypes(record_id)]["data_size"].append(data_size)
results[RecordTypes(record_id)]["checksum_flag"].append(flag)
_ = numpy.fromfile(stream, f"S{data_size}", count=1)
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
stream.seek(current_pos)
r_index = [
FileRecordIndex(
record_type=rtype,
data_size=results[rtype]["data_size"],
checksum_flag=results[rtype]["checksum_flag"],
indices=results[rtype]["indices"]
)
for rtype in RecordTypes
]
return r_index
def count_records(stream):
fname = Path(stream.name)
fsize = fname.stat().st_size
current_pos = stream.tell()
zeroed = stream.seek(0)
results = {rtype: {"count": 0, "indices": [], "data_size": [], "checksum_flag": []} for rtype in RecordTypes}
idx = 0
while stream.tell() < fsize:
data_size, record_id, flag = record_info(stream)
results[RecordTypes(record_id)]["count"] += 1
results[RecordTypes(record_id)]["indices"].append(stream.tell())
results[RecordTypes(record_id)]["data_size"].append(data_size)
results[RecordTypes(record_id)]["checksum_flag"].append(flag)
_ = numpy.fromfile(stream, f"S{data_size}", count=1)
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
idx = stream.tell()
stream.seek(current_pos)
for record_type in results:
results[record_type]["indices"] = numpy.array(results[record_type]["indices"])
return results
def record_info(stream):
data_size = numpy.fromfile(stream, ">u4", count=1)[0]
record_identifier = numpy.fromfile(stream, ">i4", count=1)[0]
checksum_flag = bool(record_identifier & CHECKSUM_BIT)
return data_size, record_identifier, checksum_flag
def read_header(stream, data_size, checksum_flag):
if checksum_flag:
checksum = numpy.fromfile(stream, ">i4", count=1)[0]
data = numpy.fromfile(stream, f"S{data_size}", count=1)[0]
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
return data
def processing_parameters(stream, data_size, checksum_flag):
if checksum_flag:
checksum = numpy.fromfile(stream, ">i4", count=1)[0]
params = dict()
params["TIME"] = numpy.fromfile(stream, ">i2", count=4)
params["NUM_PARAMS"] = numpy.fromfile(stream, ">i2", count=1)[0]
for i in range(params["NUM_PARAMS"]):
param_size = numpy.fromfile(stream, ">i2", count=1)[0]
data = numpy.fromfile(stream, f"S{param_size}", count=1)[0]
key, value = data.decode("utf-8").strip().split("=")
params[key] = value
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
return params
def _proc_param_parser(value):
"""Convert any strings that have known types such as bools, floats."""
if isinstance(value, datetime): # nothing to do already parsed
return value
booleans = {
"yes": True,
"no": False,
"true": True,
"false": False,
}
if "," in value: # dealing with an array
array = value.split(",")
if "." in value: # assumption on period being a decimal point
parsed = numpy.array(array, dtype="float").tolist()
else:
# should be dealing with an array of "UNKNWN" or "UNKNOWN"
parsed = ["unknown"]*len(array)
elif "." in value: # assumption on period being a decimal point
parsed = float(value)
elif value.lower() in booleans:
parsed = booleans[value.lower()]
elif value.lower() in ["unknwn", "unknown"]:
parsed = "unknown"
else: # most likely an integer or generic string
try:
parsed = int(value)
except ValueError:
parsed = value.lower()
return parsed
def _standardise_proc_param_keys(key):
"""Convert to lowercase, replace any spaces with underscore."""
return key.lower().replace(" ", "_")
def processing_parameters2(stream, data_size, checksum_flag):
params = dict()
idx = 0
blob = stream.readline(data_size)
if checksum_flag:
checksum = numpy.frombuffer(blob, ">i4", count=1)[0]
idx += 4
dtype = numpy.dtype(
[
("time_seconds", ">i4"),
("time_nano_seconds", ">i4"),
("num_params", ">i2"),
]
)
data = numpy.frombuffer(blob, dtype, count=1)
params["time_seconds"] = int(data["time_seconds"][0])
params["time_nano_seconds"] = int(data["time_nano_seconds"][0])
params["num_params"] = data["num_params"][0]
idx += 10
for i in range(params["num_params"]):
param_size = numpy.frombuffer(blob[idx:], ">i2", count=1)[0]
idx += 2
data = numpy.frombuffer(blob[idx:], f"S{param_size}", count=1)[0]
idx += param_size
key, value = data.decode("utf-8").strip().split("=")
if key == "REFERENCE TIME":
value = datetime.strptime(value, "%Y/%j %H:%M:%S")
params["processed_datetime"] = value + timedelta(
seconds=params["time_seconds"],
milliseconds=params["time_nano_seconds"] * 1e-6
)
params[_standardise_proc_param_keys(key)] = _proc_param_parser(value)
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
return params
def attitude(stream, data_size, checksum_flag):
if checksum_flag:
checksum = numpy.fromfile(stream, ">i4", count=1)[0]
base_time = numpy.fromfile(stream, ">i4", count=2)
acq_time = datetime.utcfromtimestamp(base_time[0] + NANO_SECONDS_SF * base_time[1])
num_measurements = numpy.fromfile(stream, ">i2", count=1)[0]
data = {
"attitude_time": [],
"pitch": [],
"roll": [],
"heave": [],
"heading": [],
}
dtype = numpy.dtype(
[
("attitude_time", ">i2"),
("pitch", ">i2"),
("roll", ">i2"),
("heave", ">i2"),
("heading", ">u2"),
]
)
for i in range(num_measurements):
blob = numpy.fromfile(stream, dtype, count=1)[0]
# pdb.set_trace()
# data["attitude_time"].append(numpy.fromfile(stream, ">i2", count=1)[0] / 100)
# data["pitch"].append(numpy.fromfile(stream, ">i2", count=1)[0] / 100)
# data["roll"].append(numpy.fromfile(stream, ">i2", count=1)[0] / 100)
# data["heave"].append(numpy.fromfile(stream, ">i2", count=1)[0] / 100)
# data["heading"].append(numpy.fromfile(stream, ">u2", count=1)[0] / 100)
data["attitude_time"].append(
acq_time + timedelta(seconds=blob["attitude_time"] / 1000)
)
data["pitch"].append(blob["pitch"] / 100)
data["roll"].append(blob["roll"] / 100)
data["heave"].append(blob["heave"] / 100)
data["heading"].append(blob["heading"] / 100)
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
return data
def attitude2(stream, data_size, checksum_flag):
# using stream.readline() would stop at the first new line
# using stream.readlines() can read more than data_size
blob = numpy.fromfile(stream, f"S{data_size}", count=1)[0]
idx = 0
if checksum_flag:
checksum = numpy.frombuffer(blob, ">i4", count=1)[0]
idx += 4
base_time = numpy.frombuffer(blob[idx:], ">i4", count=2)
idx += 8
acq_time = datetime.utcfromtimestamp(base_time[0] + NANO_SECONDS_SF * base_time[1])
num_measurements = numpy.frombuffer(blob[idx:], ">i2", count=1)[0]
idx += 2
data = {
"attitude_time": [],
"pitch": [],
"roll": [],
"heave": [],
"heading": [],
}
dtype = numpy.dtype(
[
("attitude_time", ">i2"),
("pitch", ">i2"),
("roll", ">i2"),
("heave", ">i2"),
("heading", ">u2"),
]
)
for i in range(num_measurements):
numpy_blob = numpy.frombuffer(blob[idx:], dtype, count=1)[0]
idx += 10
data["attitude_time"].append(
acq_time + timedelta(seconds=numpy_blob["attitude_time"] / 1000)
)
data["pitch"].append(numpy_blob["pitch"] / 100)
data["roll"].append(numpy_blob["roll"] / 100)
data["heave"].append(numpy_blob["heave"] / 100)
data["heading"].append(numpy_blob["heading"] / 100)
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
return data
def read_svp(stream, data_size, flag):
dtype = numpy.dtype(
[
("obs_seconds", ">u4"),
("obs_nano", ">u4"),
("app_seconds", ">u4"),
("app_nano", ">u4"),
("lon", ">i4"),
("lat", ">i4"),
("num_points", ">u4"),
]
)
# obs_time = numpy.fromfile(stream, ">u4", count=2)
# app_time = numpy.fromfile(stream, ">u4", count=2)
# time = numpy.fromfile(stream, ">u4", count=4)
# lon, lat = numpy.fromfile(stream, ">i4", count=2)
# num_points = numpy.fromfile(stream, ">u4", count=1)[0]
blob = numpy.fromfile(stream, dtype, count=1)
# pdb.set_trace()
svp = numpy.fromfile(stream, ">u4", count=2 * blob["num_points"][0]) / 100
data = {
"obs_time2": datetime.utcfromtimestamp(
blob["obs_seconds"][0] + NANO_SECONDS_SF * blob["obs_nano"][0]
),
"app_time2": datetime.utcfromtimestamp(
blob["app_seconds"][0] + NANO_SECONDS_SF * blob["app_nano"][0]
),
"lon": blob["lon"][0],
"lat": blob["lat"][0],
"num_points": blob["num_points"][0],
"svp": svp.reshape((blob["num_points"][0], 2)),
}
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
return data
def swath_bathymetry_summary(stream, data_size, flag):
dtype = numpy.dtype(
[
("time_first_ping_seconds", ">i4"),
("time_first_ping_nano_seconds", ">i4"),
("time_last_ping_seconds", ">i4"),
("time_last_ping_nano_seconds", ">i4"),
("min_latitude", ">i4"),
("min_longitude", ">i4"),
("max_latitude", ">i4"),
("max_longitude", ">i4"),
("min_depth", ">i4"),
("max_depth", ">i4"),
]
)
blob = numpy.fromfile(stream, dtype, count=1)
data = {
"time_first_ping": datetime.utcfromtimestamp(
blob["time_first_ping_seconds"][0] + NANO_SECONDS_SF * blob["time_first_ping_nano_seconds"][0]
),
"time_last_ping": datetime.utcfromtimestamp(
blob["time_last_ping_seconds"][0] + NANO_SECONDS_SF * blob["time_last_ping_nano_seconds"][0]
),
"min_latitude": blob["min_latitude"][0] / 10_000_000,
"min_longitude": blob["min_longitude"][0] / 10_000_000,
"max_latitude": blob["max_latitude"][0] / 10_000_000,
"max_longitude": blob["max_longitude"][0] / 10_000_000,
"min_depth": blob["min_depth"][0] / 100,
"max_depth": blob["max_depth"][0] / 100,
}
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
return data
def comment(stream, data_size, flag):
dtype = numpy.dtype(
[
("time_comment_seconds", ">i4"),
("time_comment_nano_seconds", ">i4"),
("comment_length", ">i4"),
]
)
blob = numpy.fromfile(stream, dtype, count=1)
# pdb.set_trace()
length = blob["comment_length"][0] # + 1
data = {
"time_comment_made": datetime.utcfromtimestamp(
blob["time_comment_seconds"][0] + NANO_SECONDS_SF * blob["time_comment_nano_seconds"][0]
),
"length": length,
"comment": numpy.fromfile(stream, f"S{length}", count=1)[0],
}
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
return data
def comment2(stream, data_size, flag):
dtype = numpy.dtype(
[
("time_comment_seconds", ">i4"),
("time_comment_nano_seconds", ">i4"),
("comment_length", ">i4"),
]
)
blob = stream.readline(data_size)
decoded = numpy.frombuffer(blob, dtype, count=1)
length = decoded["comment_length"][0]
data = {
"time_comment_made": datetime.utcfromtimestamp(
decoded["time_comment_seconds"][0] + NANO_SECONDS_SF * decoded["time_comment_nano_seconds"][0]
),
"length": length,
"comment": blob[12:].decode().strip().rstrip("\x00"),
}
_ = numpy.fromfile(stream, "B", count=stream.tell() % 4)
return data
def bathymetry_ping(stream, data_size, flag):
dtype = numpy.dtype(
[
("time_ping_seconds", ">i4"),
("time_ping_nano_seconds", ">i4"),
("longitude", ">i4"),
("latitude", ">i4"),
("number_beams", ">i2"),
("centre_beam", ">i2"),
("ping_flags", ">i2"),
("reserved", ">i2"),
("tide_corrector", ">i2"),
("depth_corrector", ">i4"),
("heading", ">i2"),
("pitch", ">i2"),
("roll", ">i2"),
("heave", ">i2"),
("course", ">i2"),
("speed", ">i2"),
("height", ">i4"),
("separation", ">i4"),
("gps_tide_corrector", ">i4"),
]
)
blob = numpy.fromfile(stream, dtype, count=1)
_ = numpy.fromfile(stream, "B", count=2) # spare space
# this isn't the end of this record. there are sub-records to decode
subrecord_hdr = numpy.fromfile(stream, ">i4", count=1)
subrecord_id = (subrecord_hdr & 0xFF000000) >> 24
subrecord_size = subrecord_hdr & 0x00FFFFFF
# reading a scale factor record
num_factors = numpy.fromfile(stream, ">i4", count=1)[0]
sfs = []
for i in range(num_factors):
sfs.append(numpy.fromfile(stream, ">i4", count=3))
subrecords = [] # shouldn't be any duplicate id's
for i in range(num_factors):
subhdr = numpy.fromfile(stream, ">i4", count=1)[0]
subid = (subhdr & 0xFF000000) >> 24
subsz = subhdr & 0x00FFFFFF
size = subsz // blob["number_beams"][0]
dtype = f">i{size}"
if dtype == ">i0":
return blob, sfs, subrecords
pdb.set_trace()
subrecords.append((subid, numpy.fromfile(stream, dtype, count=blob["number_beams"][0])))
# current test this is the depth subrecord
# subhdr = numpy.fromfile(stream, ">i4", count=1)
# sid = (subhdr & 0xFF000000) >> 24
# ssz = subhdr & 0x00FFFFFF
# dsize = ssz // blob["number_beams"]
# dtype = f">i{dsize}"
# depth = numpy.fromfile(stream, dtype, count=blob["number_beams"])
return blob, sfs, subrecords
def run(fname):
stream = open(fname, "rb")
data_size, record_id, flag = record_info(stream)
header = read_header(stream, data_size, flag)
fill = numpy.fromfile(stream, "B", count=stream.tell() % 4)
data_size, record_id, flag = record_info(stream)
params = processing_parameters(stream, data_size, flag)
fill = numpy.fromfile(stream, "B", count=stream.tell() % 4)
# pdb.set_trace()
att = []
data_size, record_id, flag = record_info(stream)
while record_id == 12:
att.append(attitude(stream, data_size, flag))
fill = numpy.fromfile(stream, "B", count=stream.tell() % 4)
data_size, record_id, flag = record_info(stream)
svp = read_svp(stream, data_size, flag)
pdb.set_trace()
import numpy
from scipy import ndimage
import numexpr
from numba import jit
import structlog
_LOG = structlog.get_logger()
def laplacian_operator(lap: numpy.ndarray, flag_grid: numpy.ndarray, threshold: float):
"""
Laplacian operator check.
Notes:
The flag_grid is modified inplace which is fine,
unless <ndarray>.flags.writeable is False
Locations could be returned, or we return None and write results into the
flag_grid.
The Cython code also logs the location of each pixel flagged by the check
numexpression is used to reduce the memory footprint from the temp arrays
that get created. We get a speed benefit too.
"""
# (sixy6e) wierd logic if threshold is positive eg:
# lap = [[-9, -5, -4],
# [ 9, 5, 4],
# [-1, 0, 1]]
# th = 5
# would result in the entire array being true.
# Is this the intended behaviour???
locations = numexpr.evaluate("(lap < threshold) | (lap > -threshold)")
flag_grid[locations] = 1 # check number 1
# log the locations
# if really desired, this could be done differently,
# even though the locations are written as GeoPoints later on ...
for row, col in zip(*numpy.where(locations)): # numpy.where is slow but fits the need
_LOG.info("laplacian operator check (#1)", row=row, col=col)
def gaussian_curvature(
gauss_curve: numpy.ndarray, flag_grid: numpy.ndarray, threshold: float
):
"""
Gaussian curvature check.
Notes:
Similar notes in regards to modification in-place as the laplacian operator
check.
The operation could be done in 1 line, but the original code logs the locations
of the flags.
"""
locations = gauss_curve > threshold
flag_grid[locations] = 2 # check number 2
for row, col in zip(*numpy.where(locations)):
_LOG.info("gaussian curvature check (#2)", row=row, col=col)
@jit(nopython=True, cache=True)
def adjacent_cells(
bathy: numpy.ndarray,
flag_grid: numpy.ndarray,
threshold: float,
percent_1: float,
percent_2: float,
):
""""""
rows, cols = bathy.shape
# the grid is traversed row by row
for row in range(rows): # we get the row
# Historically, we were skipping the first and the last row
# if (row == 0) or (row == rows - 1):
# continue
for col in range(cols): # we get the column
# (sixy6e) why not simply loop over [1, n-1]???
if (col == 0) or (col == cols - 1):
continue
if flag_grid[row, col] != 0: # avoid existing flagged nodes
continue
# for each node in the grid, the depth is retrieved
depth_node = bathy[row, col]
# any further calculation is skipped in case of a no-data value
if numpy.isnan(depth_node):
continue
neighbour_count = 0 # initialize the number of neighbors
diff_pos_count = (
0 # initialize the number of neighbors with positive depth diff
)
diff_neg_count = (
0 # initialize the number of neighbors with negative depth diff
)
# ----------- left node -----------
if col > 0: # if we are not on the first column
# attempt to retrieve depth
if flag_grid[row, col - 1] != 0:
continue
depth_neighbour = bathy[row, col - 1]
if numpy.isnan(depth_neighbour) and col > 1:
if flag_grid[row, col - 2] != 0:
continue
depth_neighbour = bathy[row, col - 2]
if numpy.isnan(depth_neighbour) and col > 2:
if flag_grid[row, col - 3] != 0:
continue
depth_neighbour = bathy[row, col - 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_node - depth_neighbour > threshold:
diff_pos_count += 1
if depth_node - depth_neighbour < -threshold:
diff_neg_count += 1
# ----------- right node -----------
if col < cols - 1: # if we are not on the last column
# attempt to retrieve depth
if flag_grid[row, col + 1] != 0:
continue
depth_neighbour = bathy[row, col + 1]
if numpy.isnan(depth_neighbour) and (col < cols - 2):
if flag_grid[row, col + 2] != 0:
continue
depth_neighbour = bathy[row, col + 2]
if numpy.isnan(depth_neighbour) and (col < cols - 3):
if flag_grid[row, col + 3] != 0:
continue
depth_neighbour = bathy[row, col + 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_node - depth_neighbour > threshold:
diff_pos_count += 1
if depth_node - depth_neighbour < -threshold:
diff_neg_count += 1
# ----------- bottom node -----------
if row > 0: # if we are not on the first row
# attempt to retrieve depth
if flag_grid[row - 1, col] != 0:
continue
depth_neighbour = bathy[row - 1, col]
if numpy.isnan(depth_neighbour) and row > 1:
if flag_grid[row - 2, col] != 0:
continue
depth_neighbour = bathy[row - 2, col]
if numpy.isnan(depth_neighbour) and row > 2:
if flag_grid[row - 3, col] != 0:
continue
depth_neighbour = bathy[row - 3, col]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_node - depth_neighbour > threshold:
diff_pos_count += 1
if depth_node - depth_neighbour < -threshold:
diff_neg_count += 1
# ----------- top node -----------
if row < rows - 1: # if we are not on the last row
# attempt to retrieve depth
if flag_grid[row + 1, col] != 0:
continue
depth_neighbour = bathy[row + 1, col]
if numpy.isnan(depth_neighbour) and (row < rows - 2):
if flag_grid[row + 2, col] != 0:
continue
depth_neighbour = bathy[row + 2, col]
if numpy.isnan(depth_neighbour) and (row < rows - 3):
if flag_grid[row + 3, col] != 0:
continue
depth_neighbour = bathy[row + 3, col]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_node - depth_neighbour > threshold:
diff_pos_count += 1
if depth_node - depth_neighbour < -threshold:
diff_neg_count += 1
# ----------- bottom-left node -----------
if (row > 0) and (col > 0): # if we are not on the first row and col
# attempt to retrieve depth
if flag_grid[row - 1, col - 1] != 0:
continue
depth_neighbour = bathy[row - 1, col - 1]
if numpy.isnan(depth_neighbour) and row > 1 and col > 1:
if flag_grid[row - 2, col - 2] != 0:
continue
depth_neighbour = bathy[row - 2, col - 2]
# if numpy.isnan(depth_neighbour) and row > 2 and col > 2:
# if flag_grid[row - 3, col - 3] != 0:
# continue
# depth_neighbour = bathy[row - 3, col - 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_node - depth_neighbour > threshold:
diff_pos_count += 1
if depth_node - depth_neighbour < -threshold:
diff_neg_count += 1
# ----------- top-right node -----------
if (row < rows - 1) and (
col < cols - 1
): # if we are not on the last row and col
# attempt to retrieve depth
if flag_grid[row + 1, col + 1] != 0:
continue
depth_neighbour = bathy[row + 1, col + 1]
if numpy.isnan(depth_neighbour) and (row < rows - 2) and (col < cols - 2):
if flag_grid[row + 2, col + 2] != 0:
continue
depth_neighbour = bathy[row + 2, col + 2]
# if numpy.isnan(depth_neighbour) and (row < rows - 3) and (col < cols - 3):
# if flag_grid[row + 3, col + 3] != 0:
# continue
# depth_neighbour = bathy[row + 3, col + 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_node - depth_neighbour > threshold:
diff_pos_count += 1
if depth_node - depth_neighbour < -threshold:
diff_neg_count += 1
# ----------- bottom-right node -----------
if (row > 0) and (
col < cols - 1
): # if we are not on the first row and last col
# attempt to retrieve depth
if flag_grid[row - 1, col + 1] != 0:
continue
depth_neighbour = bathy[row - 1, col + 1]
if numpy.isnan(depth_neighbour) and row > 1 and (col < cols - 2):
if flag_grid[row - 2, col + 2] != 0:
continue
depth_neighbour = bathy[row - 2, col + 2]
# if numpy.isnan(depth_neighbour) and row > 2 and col > 2:
# if flag_grid[row - 3, col + 3] != 0:
# continue
# depth_neighbour = bathy[row - 3, col + 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_node - depth_neighbour > threshold:
diff_pos_count += 1
if depth_node - depth_neighbour < -threshold:
diff_neg_count += 1
# ----------- top-left node -----------
if (row < rows - 1) and (
col > 0
): # if we are not on the last row and first col
# attempt to retrieve depth
if flag_grid[row + 1, col - 1] != 0:
continue
depth_neighbour = bathy[row + 1, col - 1]
if numpy.isnan(depth_neighbour) and (row < rows - 2) and col > 1:
if flag_grid[r + 2, col - 2] != 0:
continue
depth_neighbour = bathy[row + 2, col - 2]
# if numpy.isnan(depth_neighbour) and (row < rows - 3) and col > 2:
# if flag_grid[row + 3, col - 3] != 0:
# continue
# depth_neighbour = bathy[row + 3, col - 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_node - depth_neighbour > threshold:
diff_pos_count += 1
if depth_node - depth_neighbour < -threshold:
diff_neg_count += 1
if neighbour_count == 0:
continue
# (sixy6e) this section prohibts from simply looping over [1, n-1]
# calculate the ratio among flagged and total neighbors, then use it to
# decide if a flier
if (row == 0) or (col == 0) or (row == (rows - 1)) or (col == (cols - 1)):
thr = 1.0
elif neighbour_count <= 4:
thr = percent_1
else:
thr = percent_2
pos_ratio = diff_pos_count / neighbour_count
if pos_ratio >= thr:
flag_grid[row, col] = 3 # check #3
_LOG.info(
"adjacency check #3",
row=row,
col=col,
diff_pos_count=diff_pos_count,
neighbour_count=neighbour_count,
pos_ratio=pos_ratio,
thr=thr,
)
continue
neg_ratio = diff_neg_count / neighbour_count
if neg_ratio >= thr:
flag_grid[row, col] = 3 # check #3
_LOG.info(
"adjacency check #3",
row=row,
col=col,
diff_neg_count=diff_neg_count,
neighbour_count=neighbour_count,
neg_ratio=neg_ratio,
thr=thr,
)
continue
@jit(nopython=True, cache=True)
def noisy_edges(bathy: numpy.ndarray, flag_grid: numpy.ndarray, dist: int, cf: float):
""""""
rows, cols = bathy.shape
# the grid is traversed row by row
for row in range(rows): # we get the row
# (sixy6e) why not simply loop over [1, n-1]???
if (row == 0) or (row == rows - 1):
continue
for col in range(cols): # we get the column
# (sixy6e) why not simply loop over [1, n-1]???
if (col == 0) or (col == cols - 1):
continue
if flag_grid[row, col] != 0: # avoid existing flagged nodes
continue
# for each node in the grid, the depth is retrieved
depth_node = bathy[row, col]
# any further calculation is skipped in case of a no-data value
if numpy.isnan(depth_node):
continue
neighbour_count = 0
neighbour_diff = 0.0
min_depth = -9999.9
max_diff = 0.0
# ----------- left node -----------
# attempt to retrieve depth
if flag_grid[row, col - 1] != 0:
continue
depth_neighbour = bathy[row, col - 1]
if numpy.isnan(depth_neighbour) and col > 1:
if flag_grid[row, col - 2] != 0:
continue
depth_neighbour = bathy[row, col - 2]
if numpy.isnan(depth_neighbour) and col > 2 and dist > 2:
if flag_grid[row, col - 3] != 0:
continue
depth_neighbour = bathy[row, col - 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_neighbour > min_depth:
min_depth = depth_neighbour
neighbour_diff = abs(depth_node - depth_neighbour)
if neighbour_diff > max_diff:
max_diff = neighbour_diff
# ----------- right node -----------
# attempt to retrieve depth
if flag_grid[row, col + 1] != 0:
continue
depth_neighbour = bathy[row, col + 1]
if numpy.isnan(depth_neighbour) and (col < cols - 2):
if flag_grid[row, col + 2] != 0:
continue
depth_neighbour = bathy[row, col + 2]
if numpy.isnan(depth_neighbour) and (col < cols - 3) and dist > 2:
if flag_grid[row, col + 3] != 0:
continue
depth_neighbour = bathy[row, col + 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_neighbour > min_depth:
min_depth = depth_neighbour
neighbour_diff = abs(depth_node - depth_neighbour)
if neighbour_diff > max_diff:
max_diff = neighbour_diff
# ----------- bottom node -----------
# attempt to retrieve depth
if flag_grid[row - 1, col] != 0:
continue
depth_neighbour = bathy[row - 1, col]
if numpy.isnan(depth_neighbour) and r > 1:
if flag_grid[row - 2, col] != 0:
continue
depth_neighbour = bathy[row - 2, col]
if numpy.isnan(depth_neighbour) and row > 2 and dist > 2:
if flag_grid[row - 3, col] != 0:
continue
depth_neighbour = bathy[row - 3, col]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_neighbour > min_depth:
min_depth = depth_neighbour
neighbour_diff = abs(depth_node - depth_neighbour)
if neighbour_diff > max_diff:
max_diff = neighbour_diff
# ----------- top node -----------
# attempt to retrieve depth
if flag_grid[row + 1, col] != 0:
continue
depth_neighbour = bathy[row + 1, col]
if numpy.isnan(depth_neighbour) and (row < rows - 2):
if flag_grid[row + 2, col] != 0:
continue
depth_neighbour = bathy[row + 2, col]
if numpy.isnan(depth_neighbour) and (row < rows - 3) and dist > 2:
if flag_grid[row + 3, col] != 0:
continue
depth_neighbour = bathy[row + 3, col]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_neighbour > min_depth:
min_depth = depth_neighbour
neighbour_diff = abs(depth_node - depth_neighbour)
if neighbour_diff > max_diff:
max_diff = neighbour_diff
# ----------- bottom-left node -----------
# attempt to retrieve depth
if flag_grid[row - 1, col - 1] != 0:
continue
depth_neighbour = bathy[row - 1, col - 1]
if numpy.isnan(depth_neighbour) and row > 1 and col > 1 and dist > 2:
if flag_grid[row - 2, col - 2] != 0:
continue
depth_neighbour = bathy[row - 2, col - 2]
# if numpy.isnan(depth_neighbour) and row > 2 and col > 2:
# if flag_grid[row - 3, col - 3] != 0:
# continue
# depth_neighbour = bathy[row - 3, col - 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_neighbour > min_depth:
min_depth = depth_neighbour
neighbour_diff = abs(depth_node - depth_neighbour)
if neighbour_diff > max_diff:
max_diff = neighbour_diff
# ----------- top-right node -----------
# attempt to retrieve depth
if flag_grid[row + 1, col + 1] != 0:
continue
depth_neighbour = bathy[row + 1, col + 1]
if (
numpy.isnan(depth_neighbour)
and (row < rows - 2)
and (col < cols - 2)
and dist > 2
):
if flag_grid[row + 2, col + 2] != 0:
continue
depth_neighbour = bathy[row + 2, col + 2]
# if numpy.isnan(depth_neighbour) and (row < rows - 3) and (col < cols - 3):
# if flag_grid[row + 3, col + 3] != 0:
# continue
# depth_neighbour = bathy[row + 3, col + 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_neighbour > min_depth:
min_depth = depth_neighbour
neighbour_diff = abs(depth_node - depth_neighbour)
if neighbour_diff > max_diff:
max_diff = neighbour_diff
# ----------- bottom-right node ------------
# attempt to retrieve depth
if flag_grid[row - 1, col + 1] != 0:
continue
depth_neighbour = bathy[row - 1, col + 1]
if numpy.isnan(depth_neighbour) and row > 1 and (col < cols - 2) and dist > 2:
if flag_grid[row - 2, col + 2] != 0:
continue
depth_neighbour = bathy[row - 2, col + 2]
# if numpy.isnan(depth_neighbour) and row > 2 and col > 2:
# if flag_grid[row - 3, col + 3] != 0:
# continue
# depth_neighbour = bathy[row - 3, col + 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_neighbour > min_depth:
min_depth = depth_neighbour
neighbour_diff = abs(depth_node - depth_neighbour)
if neighbour_diff > max_diff:
max_diff = neighbour_diff
# ----------- top-left node-----------
# attempt to retrieve depth
if flag_grid[row + 1, col - 1] != 0:
continue
depth_neighbour = bathy[row + 1, col - 1]
if numpy.isnan(depth_neighbour) and (row < rows - 2) and col > 1 and dist > 2:
if flag_grid[row + 2, col - 2] != 0:
continue
depth_neighbour = bathy[row + 2, col - 2]
# if numpy.isnan(depth_neighbour) and (row < rows - 3) and col > 2:
# if flag_grid[row + 3, col - 3] != 0:
# continue
# depth_neighbour = bathy[row + 3, col - 3]
# evaluate depth difference
if not numpy.isnan(depth_neighbour):
neighbour_count += 1
if depth_neighbour > min_depth:
min_depth = depth_neighbour
neighbour_diff = abs(depth_node - depth_neighbour)
if neighbour_diff > max_diff:
max_diff = neighbour_diff
if neighbour_count == 0:
continue
if neighbour_count > 6:
continue
if min_depth >= -100.0:
threshold = (0.25 + (0.013 * -min_depth) ** 2) ** 0.5
else:
threshold = (1.0 + (0.023 * -min_depth) ** 2) ** 0.5
if max_diff > cf * threshold:
flag_grid[row, col] = 6 # check #6
_LOG.info(
"noisy neighbour (check #6)",
row=row,
col=col,
neighbour_count=neighbour_count,
max_diff=max_diff,
min_depth=min_depth,
threshold=threshold,
)
@jit(nopython=True, cache=True)
def _small_groups(
img_labels: numpy.ndarray,
bathy: numpy.ndarray,
flag_grid: numpy.ndarray,
th: float,
area_limit: float,
check_slivers: bool,
check_isolated: bool,
sizes: numpy.ndarray,
):
""""""
rows, cols = img_labels.shape
for i in range(sizes.shape[0]):
# check only small groups
if sizes[i] > area_limit:
continue
i += 1
conn_count = 0
find = False
for row in range(4, rows - 4): # skip bbox boundaries
for col in range(4, cols - 4): # skip bbox boundaries
# skip if the cell does not belong to the current small group
if img_labels[row, col] != i:
continue
last_row, last_col = row, col
nb_rs = []
nb_cs = []
# check for a valid connection to a grid body
nb_rs.append(row + 1) # n1
nb_rs.append(row - 1) # n2
nb_rs.append(row - 1) # n3
nb_rs.append(row + 1) # n4
nb_cs.append(col + 1) # n1
nb_cs.append(col + 1) # n2
nb_cs.append(col - 1) # n3
nb_cs.append(col - 1) # n4
nb_rs.append(row + 2) # n5
nb_rs.append(row + 2) # n6
nb_rs.append(row + 0) # n7
nb_rs.append(row - 2) # n8
nb_rs.append(row - 2) # n9
nb_rs.append(row - 2) # n10
nb_rs.append(row + 0) # n11
nb_rs.append(row + 2) # n12
nb_cs.append(col + 0) # n5
nb_cs.append(col + 2) # n6
nb_cs.append(col + 2) # n7
nb_cs.append(col + 2) # n8
nb_cs.append(col + 0) # n9
nb_cs.append(col - 2) # n10
nb_cs.append(col - 2) # n11
nb_cs.append(col - 2) # n12
nb_rs.append(row + 3) # n13
nb_rs.append(row + 3) # n14
nb_rs.append(row + 0) # n15
nb_rs.append(row - 3) # n16
nb_rs.append(row - 3) # n17
nb_rs.append(row - 3) # n18
nb_rs.append(row + 0) # n19
nb_rs.append(row + 3) # n20
nb_cs.append(col + 0) # n13
nb_cs.append(col + 3) # n14
nb_cs.append(col + 3) # n15
nb_cs.append(col + 3) # n16
nb_cs.append(col + 0) # n17
nb_cs.append(col - 3) # n18
nb_cs.append(col - 3) # n19
nb_cs.append(col - 3) # n20
nb_rs.append(row + 4) # n21
nb_rs.append(row + 4) # n22
nb_rs.append(row + 0) # n23
nb_rs.append(row - 4) # n24
nb_rs.append(row - 4) # n25
nb_rs.append(row - 4) # n26
nb_rs.append(row + 0) # n27
nb_rs.append(row + 4) # n28
nb_cs.append(col + 0) # n21
nb_cs.append(col + 4) # n22
nb_cs.append(col + 4) # n23
nb_cs.append(col + 4) # n24
nb_cs.append(col + 0) # n25
nb_cs.append(col - 4) # n26
nb_cs.append(col - 4) # n27
nb_cs.append(col - 4) # n28
nbs_sz = len(nb_rs)
for ni in range(nbs_sz):
nl = img_labels[nb_rs[ni], nb_cs[ni]]
if (nl != 0) and (nl != i) and (sizes[nl - 1] > area_limit):
conn_count += 1
find = True
if (
abs(bathy[row, col] - bathy[nb_rs[ni], nb_cs[ni]]) > th
) and check_slivers:
flag_grid[row, col] = 4 # check #4
_LOG.info("check (#4)", ni=ni + 1, row=row, col=col)
break
if find:
break
if find:
break
# it is an isolated group
if (
(last_row > 4)
and (last_row < rows - 4)
and (last_col > 4)
and (last_col < cols - 4)
):
if (conn_count == 0) and check_isolated:
flag_grid[last_row, last_col] = 5 # check #5
_LOG.info("isolated group (#5)", last_row=last_row, last_col=last_col)
def small_groups(
grid_bin: numpy.ndarray,
bathy: numpy.ndarray,
flag_grid: numpy.ndarray,
th: float,
area_limit: float,
check_slivers: bool,
check_isolated: bool,
):
""""""
rows, cols = grid_bin.shape
img_labels, n_labels = ndimage.label(grid_bin)
sizes = ndimage.sum(grid_bin, img_labels, range(1, n_labels + 1))
_small_groups(
img_labels, bathy, flag_grid, th, area_limit, check_slivers, check_isolated, sizes
)
import numpy
import numexpr
import structlog
_LOG = structlog.get_logger()
def calc_tvu_qc(
bathy: numpy.ndarray,
product_uncertainty: numpy.ndarray,
pu_nodata: float,
tvu_qc: numpy.ndarray,
):
"""
IHO S-44 TVU QC Calculations
Total vertical uncertainty. (TODO; confirm that is the correct acronym)
"""
idx = numpy.isnan(bathy) | product_uncertainty == pu_nodata
tvu_qc[idx] = numpy.nan
idx = bathy <= 100.0
# subsetted arrays
bathy_subs = bathy[idx] # pylint: disable=unused-variable
pr_unc_subs = product_uncertainty[idx] # pylint: disable=unused-variable
# using numexpr to reduce the temp arrays that are required for this long expression
expression = "pr_unc_subs / ((0.25 + (0.013 * bathy_subs) ** 2) ** 0.5)"
tvu_qc[idx] = numexpr.evaluate(expression)
# inverse, i.e. > 100.0
idx = ~idx
bathy_subs = bathy[idx] # noqa F841
pr_unc_subs = product_uncertainty[idx] # noqa F841
expression = "pr_unc_subs / ((1.0 + (0.23 * bathy_subs) ** 2) ** 0.5)"
tvu_qc[idx] = numexpr.evaluate(expression)
def calc_tvu_qc_a1(
bathy: numpy.ndarray,
product_uncertainty: numpy.ndarray,
pu_nodata: float,
tvu_qc: numpy.ndarray,
):
"""
CATZOC A1 TVU Calculations
"""
idx = numpy.isnan(bathy) | product_uncertainty == pu_nodata
tvu_qc[idx] = numpy.nan
# inverse, and subsetted arrays
idx = ~idx
bathy_subs = bathy[idx] # noqa F841 # pylint: disable=unused-variable
pr_unc_subs = product_uncertainty[idx] # noqa F841 # pylint: disable=unused-variable
# again using numexpr as these calcs will be float64
expression = "pr_unc_subs / (0.5 + (0.01 * bathy_subs))"
tvu_qc[idx] = numexpr.evaluate(expression)
def calc_tvu_qc_a2b(
bathy: numpy.ndarray,
product_uncertainty: numpy.ndarray,
pu_nodata: float,
tvu_qc: numpy.ndarray,
):
"""
CATZOC A2/B TVU Calculations
"""
idx = numpy.isnan(bathy) | product_uncertainty == pu_nodata
tvu_qc[idx] = numpy.nan
# inverse, and subsetted arrays
idx = ~idx
bathy_subs = bathy[idx] # noqa F841 # pylint: disable=unused-variable
pr_unc_subs = product_uncertainty[idx] # noqa F841 # pylint: disable=unused-variable
# again using numexpr as these calcs will be float64
expression = "pr_unc_subs / (1.0 + (0.02 * bathy_subs))"
tvu_qc[idx] = numexpr.evaluate(expression)
def calc_tvu_qc_c(
bathy: numpy.ndarray,
product_uncertainty: numpy.ndarray,
pu_nodata: float,
tvu_qc: numpy.ndarray,
):
"""
CATZOC C TVU Calculations
"""
idx = numpy.isnan(bathy) | product_uncertainty == pu_nodata
tvu_qc[idx] = numpy.nan
# inverse, and subsetted arrays
idx = ~idx
bathy_subs = bathy[idx] # noqa F841 # pylint: disable=unused-variable
pr_unc_subs = product_uncertainty[idx] # noqa F841 # pylint: disable=unused-variable
# again using numexpr as these calcs will be float64
expression = "pr_unc_subs / (2.0 + (0.05 * bathy_subs))"
tvu_qc[idx] = numexpr.evaluate(expression)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment