Skip to content

Instantly share code, notes, and snippets.

@igorbasko01
Last active June 27, 2023 15:33
Show Gist options
  • Save igorbasko01/c51980df0ce9a516c8bcc4ff8e039eb7 to your computer and use it in GitHub Desktop.
Save igorbasko01/c51980df0ce9a516c8bcc4ff8e039eb7 to your computer and use it in GitHub Desktop.
A OneEuroFilter implementation in Python
import math
import numpy as np
class LowPassFilter:
def __init__(self, alpha):
self.alpha = alpha
self.last_raw_value = None
def apply_with_alpha(self, value, alpha):
if self.last_raw_value is None:
self.last_raw_value = value
else:
self.last_raw_value = alpha * value + (1 - alpha) * self.last_raw_value
return self.last_raw_value
def apply(self, value):
return self.apply_with_alpha(value, self.alpha)
class OneEuroFilter:
def __init__(self, frequency, min_cutoff=1.0, beta=0.0, derivate_cutoff=1.0, to_print=False):
if frequency <= 0:
raise ValueError("Frequency should be > 0")
if min_cutoff <= 0:
raise ValueError("Min cutoff should be > 0")
if derivate_cutoff <= 0:
raise ValueError("Derivate cutoff should be > 0")
self.frequency = frequency
self.min_cutoff = min_cutoff
self.beta = beta
self.derivate_cutoff = derivate_cutoff
self.x = LowPassFilter(self.alpha(self.min_cutoff))
self.dx = LowPassFilter(self.alpha(self.derivate_cutoff))
self.last_time = np.iinfo(np.int64).min
self.to_print = to_print
def alpha(self, cutoff):
te = 1.0 / self.frequency
tau = 1.0 / (2 * math.pi * cutoff)
return 1.0 / (1.0 + tau / te)
def apply(self, value, timestamp, value_scale=1.0):
new_timestamp = timestamp
if self.last_time >= new_timestamp:
print("New timestamp is equal or less than the last one.")
return value
if self.last_time != 0 and new_timestamp != 0:
self.frequency = 1.0 / (new_timestamp - self.last_time)
self.last_time = new_timestamp
dvalue = self.x.last_raw_value if self.x.last_raw_value is not None else 0.0
dvalue = (value - dvalue) * value_scale * self.frequency
edvalue = self.dx.apply_with_alpha(dvalue, self.alpha(self.derivate_cutoff))
cutoff = self.min_cutoff + self.beta * abs(edvalue)
result = self.x.apply_with_alpha(value, self.alpha(cutoff))
if self.to_print:
print(f"original: {value}, new: {result}")
return result
# Define the filter parameters
min_cutoff = 0.05
beta = 80.0
derivate_cutoff = 1.0
# Create an array to hold the filters
num_landmarks = 33
num_coordinates = 3 # x, y, z
# Use a nested list comprehension to create the 3D array of filters
filters = np.array([[
OneEuroFilter(frequency=30, min_cutoff=min_cutoff, beta=beta, derivate_cutoff=derivate_cutoff,
to_print=(i == 0 and j == 0))
for j in range(num_coordinates)]
for i in range(num_landmarks)])
global_annotated_image = None
def get_object_scale(landmarks):
xs = [landmark.x for landmark in landmarks]
ys = [landmark.y for landmark in landmarks]
x_min = min(xs)
x_max = max(xs)
y_min = min(ys)
y_max = max(ys)
object_width = x_max - x_min
object_height = y_max - y_min
return (object_width + object_height) / 2.0
def default_inference_draw(result: PoseLandmarkerResult, output_image: mp.Image, timestamp_ms: int):
global global_annotated_image
pose_landmarks_list = result.pose_landmarks
annotated_image = np.copy(output_image.numpy_view())
for idx in range(len(pose_landmarks_list)):
pose_landmarks = pose_landmarks_list[idx]
object_scale = get_object_scale(pose_landmarks)
pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
pose_landmarks_proto.landmark.extend([
landmark_pb2.NormalizedLandmark(
x=filters[i][0].apply(landmark.x, timestamp_ms, object_scale),
y=filters[i][1].apply(landmark.y, timestamp_ms, object_scale),
z=filters[i][2].apply(landmark.z, timestamp_ms, object_scale))
for i, landmark in enumerate(pose_landmarks)
])
solutions.drawing_utils.draw_landmarks(
annotated_image,
pose_landmarks_proto,
solutions.pose.POSE_CONNECTIONS,
solutions.drawing_styles.get_default_pose_landmarks_style()
)
global_annotated_image = np.copy(annotated_image)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment