Created
January 7, 2024 17:48
-
-
Save ipsod/d9aec95cdbb44899366f168323edb2f2 to your computer and use it in GitHub Desktop.
python trajectory planner WIP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import enum | |
from dataclasses import dataclass, field | |
import pytest # noqa | |
import numpy as np | |
from typing import Optional, List, Iterable | |
NUM_AXES = 3 | |
ZERO = np.array([0] * NUM_AXES) | |
ONE = np.array([1] * NUM_AXES) | |
FREE_TURN_ANGLE_THRESHOLD = np.radians(24) | |
STOP_TURN_ANGLE_THRESHOLD = np.radians(128) | |
def is_zero(speed: np.array) -> bool: | |
return np.array_equal(speed, ZERO) | |
@dataclass | |
class Command: | |
point: np.array | |
speed: np.array | |
@dataclass | |
class Block: | |
p1: np.array | |
p2: np.array | |
prev: Optional['Block'] = None | |
next: Optional['Block'] = None | |
enter_speed: np.array = field(default_factory=lambda: ZERO) | |
exit_speed: np.array = field(default_factory=lambda: ZERO) | |
def __repr__(self): | |
return f'Block(travel={self.travel}, enter={self.enter_speed}, exit={self.exit_speed})' | |
@property | |
def travel(self): | |
return self.p2 - self.p1 | |
@property | |
def speed(self): | |
return (self.enter_speed + self.exit_speed) / 2 | |
def speed_average(self) -> np.array: | |
return (self.enter_speed + self.exit_speed) / 2 | |
@dataclass | |
class Path: | |
CORNER_LENGTH: float = 3 | |
enter_speed: np.array = field(default_factory=lambda: ZERO) | |
exit_speed: np.array = field(default_factory=lambda: ONE) | |
accel_limit: np.array = field(default_factory=lambda: ONE) | |
blocks: List[Block] = field(default_factory=list) | |
def max_corner_speed(self, block: Block) -> np.ndarray: | |
if not block.next: | |
return ZERO | |
return calculate_max_corner_speed(block.travel, block.next.travel, self.accel_limit) | |
def look_ahead(self): | |
for i, block in enumerate(self.blocks): | |
if not block.next: | |
block.exit_speed = ZERO # Explicitly setting exit speed to 0 for last block | |
else: | |
position_delta = block.next.travel - block.travel | |
speed_delta = max_speed_delta_for_position_delta(position_delta, self.accel_limit) | |
# Calculate minimum speed for each axis, considering directionality | |
min_speed_each_axis = np.where(position_delta >= 0, | |
np.maximum(block.speed - speed_delta, 0), | |
np.minimum(block.speed + speed_delta, 0)) | |
max_exit_speed = self.max_corner_speed(block) | |
print(f"Line {i}: max exit speed: {max_exit_speed}") | |
# Compare each axis separately and choose the minimum | |
calculated_exit_speed = ( | |
np.minimum( | |
np.abs(max_exit_speed), | |
np.abs(min_speed_each_axis) | |
) | |
* np.sign(max_exit_speed) | |
) | |
block.exit_speed = calculated_exit_speed | |
print(f"Line {i}: Calculated exit speed: {calculated_exit_speed}, Enter speed: {block.enter_speed}") | |
def calculate_max_corner_speed(travel1: np.ndarray, travel2: np.ndarray, max_acceleration: np.ndarray, deviation_factor: float = 0.1) -> np.ndarray: | |
# Application of GRBL cornering algorithm. See reference: https://mattwidmann.net/notes/grbls-cornering-algorithm/ | |
# Normalize the vectors | |
e_i = travel1 / np.linalg.norm(travel1) | |
e_o = travel2 / np.linalg.norm(travel2) | |
# Calculate the cosine of the angle between vectors | |
cos_theta = np.dot(e_i, e_o) | |
# Half-angle sine calculation | |
sin_half_theta = np.sqrt((1 - cos_theta) / 2) | |
# Calculate the radius of the approximating circle | |
r = deviation_factor / (sin_half_theta / (1 - sin_half_theta)) | |
# Calculate the cornering velocity for each axis | |
v_c = np.sqrt(max_acceleration * r) | |
# Ensuring the velocity is a vector with the same dimensions as the acceleration limit | |
v_c = np.minimum(v_c, np.linalg.norm(travel1)) * travel1 / np.linalg.norm(travel1) | |
return v_c | |
def compute_circle_radius(theta: float, delta: float) -> float: | |
# Handling the case when theta is close to 180 degrees (pi radians) | |
if np.isclose(theta, np.pi, atol=1e-3): # Adjust tolerance as needed | |
return float('inf') | |
sin_theta_over_2 = np.sqrt((1 - np.cos(theta)) / 2) | |
radius = delta * sin_theta_over_2 / (1 - sin_theta_over_2) | |
print(f"Radius: {radius}") | |
return delta * sin_theta_over_2 / (1 - sin_theta_over_2) | |
def angle_between(v1: np.array, v2: np.array) -> float: | |
if np.allclose(v1, 0) or np.allclose(v2, 0): | |
return np.pi | |
dot_product = np.dot(v1, v2) | |
magnitude_product = np.linalg.norm(v1) * np.linalg.norm(v2) | |
if np.isclose(magnitude_product, 0): | |
return 0 | |
cos_theta = dot_product / magnitude_product | |
return np.arccos(np.clip(cos_theta, -1, 1)) # Clipping for numerical stability | |
def closest_to_zero(arr1: np.array, arr2: np.array) -> np.array: | |
return np.where(np.abs(arr1) < np.abs(arr2), arr1, arr2) | |
def max_speed_delta_for_position_delta(position_delta: np.array, acceleration_limits: np.array) -> np.array: | |
return np.sqrt(2 * acceleration_limits * np.abs(position_delta)) | |
def invert_blocks(blocks: List[Block]) -> List[Block]: | |
""" | |
Inverts the acceleration for each block. | |
""" | |
inverted_blocks = [] | |
for block in blocks: | |
inverted_enter_speed = block.exit_speed # WARN: switched | |
inverted_exit_speed = block.enter_speed # WARN: switched | |
# Create a new Block with inverted acceleration | |
inverted_block = Block( | |
p1=block.p1, | |
p2=block.p2, | |
prev=block.prev, | |
next=block.next, | |
enter_speed=inverted_enter_speed, | |
exit_speed=inverted_exit_speed, | |
) | |
inverted_blocks.append(inverted_block) | |
return inverted_blocks | |
def initialize_path(commands: List[Command]) -> Path: | |
# Run acceleration forward, to figure out how to accelerate from stops. | |
forward_blocks = apply_acceleration(commands) | |
print("initialize_path forward_blocks", [block for block in forward_blocks]) | |
forward_path = Path(blocks=forward_blocks) | |
# Run acceleration backward, to figure out when to slow down for stops. | |
backward_commands = commands[::-1] | |
backward_blocks = apply_acceleration(backward_commands)[::-1] | |
inverted_backward_blocks = invert_blocks(backward_blocks) | |
backward_path = Path(blocks=inverted_backward_blocks) | |
print("Forward path before processing:") | |
for block in forward_path.blocks: | |
print(f"Enter Speed: {block.enter_speed}, Exit Speed: {block.exit_speed}, Block: {block}, ") | |
print("Backward path before processing:") | |
for block in backward_path.blocks: | |
print(f"Enter Speed: {block.enter_speed}, Exit Speed: {block.exit_speed}, Block: {block}, ") | |
print("ZIPP", list(forward_path.blocks)) | |
print("ZIPP", list(backward_path.blocks)) | |
for f_block, b_block in zip(forward_path.blocks, backward_path.blocks): | |
f_block.enter_speed = closest_to_zero(f_block.enter_speed, b_block.enter_speed) | |
f_block.exit_speed = closest_to_zero(f_block.exit_speed, b_block.exit_speed) | |
assert not np.any(np.isnan(f_block.enter_speed)), "f_block.enter_speed contains nan" | |
assert not np.any(np.isnan(f_block.exit_speed)), "f_block.exit_speed contains nan" | |
print(f"Forward path: {forward_path.blocks}") | |
for block1, block2 in zip(forward_path.blocks[:-1], forward_path.blocks[1:]): | |
assert np.array_equal(block1.exit_speed, block2.enter_speed) | |
return forward_path | |
def apply_acceleration(commands: List[Command], accel_limits=ONE, enter_speed=ZERO) -> List[Block]: | |
current_speed = enter_speed | |
blocks = [] | |
for i in range(len(commands) - 1): | |
p1 = commands[i].point | |
p2 = commands[i + 1].point | |
travel = p2 - p1 | |
maximum_speed_delta = max_speed_delta_for_position_delta(travel, accel_limits) | |
print(f"i={i}, maximum_speed_delta={maximum_speed_delta}") | |
direction = np.sign(travel) | |
max_next_speed = current_speed + direction * maximum_speed_delta | |
print(f"max_next_speed={max_next_speed}, commands[i + 1].cmd_speed={commands[i + 1].speed}") | |
exit_speed = np.where(direction >= 0, | |
np.minimum(max_next_speed, commands[i + 1].speed), | |
np.maximum(max_next_speed, -commands[i + 1].speed)) | |
if i < len(commands) - 2: | |
travel1 = travel | |
p3 = commands[i + 2].point | |
travel2 = p3 - p2 | |
corner_speed = calculate_max_corner_speed(travel1, travel2, accel_limits) | |
# Limit exit_speed for each axis based on corner speed | |
exit_speed = closest_to_zero(exit_speed, corner_speed) # TODO: this is wrong | |
print(f"corner_speed={corner_speed}, limited exit_speed={exit_speed}") | |
if np.any(np.isnan(exit_speed)): | |
print(f"Warning: NaN detected in exit_speed at command index {i}") | |
block = Block(p1=p1, p2=p2, enter_speed=current_speed, exit_speed=exit_speed) | |
blocks.append(block) | |
current_speed = exit_speed # Update current speed for the next block | |
# Set exit speed of the last block to zero | |
blocks[-1].exit_speed = ZERO | |
# Link blocks | |
for i in range(len(blocks) - 1): | |
blocks[i].next = blocks[i + 1] | |
blocks[i + 1].prev = blocks[i] | |
for block in blocks: | |
print(block) | |
return blocks | |
def generate_commands(positions: List[np.array | List], speed: Iterable = ONE) -> List[Command]: | |
speed = np.array(speed) | |
positions = np.array(positions) | |
return [Command(point=point, speed=speed) for point in positions] | |
""" | |
TESTS | |
""" | |
def test_calculate_max_corner_speed_smooth_turn(): | |
travel1 = np.array([1, 0, 0]) | |
travel2 = np.array([1, 1, 0]) | |
accel_limit = np.array([1, 1, 1]) | |
speed = calculate_max_corner_speed(travel1, travel2, accel_limit) | |
assert not np.array_equal(speed, np.array([0, 0, 0])) | |
def test_calculate_max_corner_speed_direction_change(): | |
travel1 = np.array([1, 1, 1]) | |
travel2 = np.array([-1, -1, -1]) | |
accel_limit = np.array([1, 1, 1]) | |
speed = calculate_max_corner_speed(travel1, travel2, accel_limit) | |
assert np.array_equal(speed, np.array([0, 0, 0])) | |
def test_calculate_max_corner_speed_straight_line(): | |
travel1 = np.array([1, 1, 1]) | |
travel2 = np.array([-1, -1, -1]) | |
accel_limit = np.array([1, 1, 1]) | |
speed = calculate_max_corner_speed(travel1, travel2, accel_limit) | |
assert np.array_equal(speed, np.array([0, 0, 0])) | |
def test_calculate_max_corner_speed_sharp_turn_positive_to_origin(): | |
travel1 = np.array([1, 1, 1]) | |
travel2 = np.array([-1, -1, -1]) | |
accel_limit = np.array([1, 1, 1]) | |
speed = calculate_max_corner_speed(travel1, travel2, accel_limit) | |
assert np.array_equal(speed, np.array([0, 0, 0])) | |
def test_calculate_max_corner_speed_sharp_turn_negative_to_origin(): | |
travel1 = np.array([-1, -1, -1]) | |
travel2 = np.array([1, 1, 1]) | |
accel_limit = np.array([1, 1, 1]) | |
speed = calculate_max_corner_speed(travel1, travel2, accel_limit) | |
assert np.array_equal(speed, np.array([0, 0, 0])) | |
class StopState(enum.Enum): | |
IGNORE_FIRST = 100 | |
STOP = 0 | |
SLOW = 1 | |
GO = 2 | |
STOP = StopState.STOP | |
GO = StopState.GO | |
SLOW = StopState.SLOW | |
IGNORE_FIRST = StopState.IGNORE_FIRST | |
@dataclass | |
class Check: | |
position: List[int] | |
state: StopState | |
def run_acceleration_tests(checks: List[Check]): | |
positions = [check.position for check in checks] | |
stop_states = [check.state for check in checks[1:]] # Ignore first | |
commands = generate_commands(positions, ONE) | |
blocks = apply_acceleration(commands) | |
assert len(blocks) == len(commands) - 1 | |
for stop_state, block in zip(stop_states, blocks): | |
if stop_state == STOP: | |
assert is_zero(block.exit_speed) | |
elif stop_state == GO: | |
assert not is_zero(block.exit_speed) | |
elif stop_state == SLOW: | |
assert not is_zero(block.exit_speed) | |
# TODO: more | |
def test_apply_acceleration(): | |
checks = [ | |
Check([0, 0, 0], IGNORE_FIRST), | |
Check([1, 1, 1], GO), | |
Check([2, 2, 2], STOP) | |
] | |
positions = [check.position for check in checks] | |
commands = generate_commands(positions, ONE) | |
blocks = apply_acceleration(commands) | |
assert not is_zero(blocks[0].exit_speed) # Check([1, 1, 1], GO) | |
assert is_zero(blocks[1].exit_speed) # Check([2, 2, 2], STOP) | |
def test_apply_acceleration_backward_directions(): | |
checks = [ | |
Check([0, 0, 0], IGNORE_FIRST), | |
Check([-1, -1, -1], GO), | |
Check([-2, -2, -2], STOP) | |
] | |
positions = [check.position for check in checks] | |
commands = generate_commands(positions, ONE) | |
blocks = apply_acceleration(commands) | |
assert not is_zero(blocks[0].exit_speed) # Check([-1, -1, -1], GO) | |
assert is_zero(blocks[1].exit_speed) # Check([-2, -2, -2], STOP) | |
def test_apply_acceleration_mixed_directions(): | |
checks = [ | |
Check([0, 0, 0], IGNORE_FIRST), | |
Check([1, -1, 0], GO), | |
Check([-1, 1, 0], STOP) | |
] | |
positions = [check.position for check in checks] | |
commands = generate_commands(positions, ONE) | |
blocks = apply_acceleration(commands) | |
assert not is_zero(blocks[0].exit_speed) # Check([1, -1, 0], GO) | |
assert is_zero(blocks[1].exit_speed) # Check([-1, 1, 0], STOP) | |
def test_apply_acceleration_with_direction_change(): | |
checks = [ | |
Check([0, 0, 0], IGNORE_FIRST), | |
Check([1, 1, 1], GO), | |
Check([0, 0, 0], STOP) | |
] | |
positions = [check.position for check in checks] | |
commands = generate_commands(positions, ONE) | |
blocks = apply_acceleration(commands) | |
assert is_zero(blocks[0].exit_speed) # Check([1, 1, 1], GO) | |
assert is_zero(blocks[1].exit_speed) # Check([0, 0, 0], STOP) | |
def test_apply_acceleration_with_direction_change_backward(): | |
checks = [ | |
Check([0, 0, 0], IGNORE_FIRST), | |
Check([-1, -1, -1], STOP), | |
Check([0, 0, 0], STOP) | |
] | |
positions = [check.position for check in checks] | |
commands = generate_commands(positions, ONE) | |
blocks = apply_acceleration(commands) | |
assert is_zero(blocks[0].exit_speed) # Check([-1, -1, -1], STOP) | |
assert is_zero(blocks[1].exit_speed) # Check([0, 0, 0], STOP) | |
def test_apply_acceleration_with_sharp_corners(): | |
checks = [ | |
Check([0, 0, 0], IGNORE_FIRST), | |
Check([1, 1, 1], GO), | |
Check([2, 2, 2], STOP), | |
Check([-1, -1, -1], STOP), | |
Check([1, 1, 1], GO), | |
Check([2, 2, 2], STOP), | |
Check([0, 0, 0], STOP) | |
] | |
positions = [check.position for check in checks] | |
commands = generate_commands(positions, ONE) | |
blocks = apply_acceleration(commands) | |
assert not is_zero(blocks[0].exit_speed) # Check([1, 1, 1], GO) | |
assert is_zero(blocks[1].exit_speed) # Check([2, 2, 2], STOP) | |
assert is_zero(blocks[2].exit_speed) # Check([-1, -1, -1], STOP) | |
assert not is_zero(blocks[3].exit_speed) # Check([1, 1, 1], GO) | |
assert is_zero(blocks[4].exit_speed) # Check([2, 2, 2], STOP) | |
assert is_zero(blocks[5].exit_speed) # Check([0, 0, 0], STOP) | |
def test_path_speed(): | |
checks = [ | |
Check([0, 0, 0], IGNORE_FIRST), | |
Check([1, 1, 1], GO), | |
Check([2, 2, 2], STOP), | |
] | |
positions = [check.position for check in checks] | |
commands = generate_commands(positions, speed=ONE) | |
path = initialize_path(commands) | |
assert is_zero(path.blocks[0].enter_speed) | |
assert not is_zero(path.blocks[0].exit_speed) | |
assert is_zero(path.blocks[-1].exit_speed) | |
def test_path_speed_backward(): | |
commands = generate_commands( | |
positions=[[n] * NUM_AXES for n in [0, -1, -2]], | |
speed=[1, 1, 1] | |
) | |
path = initialize_path(commands) | |
assert is_zero(path.blocks[0].enter_speed) | |
assert not is_zero(path.blocks[0].exit_speed) | |
assert is_zero(path.blocks[-1].exit_speed) | |
def test_path_speed_reverse(): | |
commands = generate_commands( | |
positions=[[0, 0, 0], [1, 1, 1], [0, 0, 0]], | |
speed=[1, 1, 1] | |
) | |
path = initialize_path(commands) | |
assert is_zero(path.blocks[0].enter_speed) | |
assert is_zero(path.blocks[0].exit_speed) | |
assert is_zero(path.blocks[1].enter_speed) | |
assert is_zero(path.blocks[1].exit_speed) | |
def test_path_with_mixed_corners(): | |
commands = generate_commands( | |
positions=[[n] * NUM_AXES for n in [0, 1, 2, 3, 2, 3, 4]], | |
speed=[1, 1, 1] | |
) | |
path = initialize_path(commands) | |
expected_travels = [...] # Fill in with expected travel distances for each block | |
expected_exit_speeds = [...] # Fill in with expected exit speeds for each block | |
# 0 - 1 | |
assert is_zero(path.blocks[0].enter_speed) | |
assert not is_zero(path.blocks[0].exit_speed) | |
# 1 - 2 | |
assert not is_zero(path.blocks[1].exit_speed) | |
# 2 - 3 | |
assert is_zero(path.blocks[2].exit_speed) | |
# 3 - 2 | |
assert is_zero(path.blocks[3].exit_speed) | |
# 2 - 3 | |
assert not is_zero(path.blocks[4].exit_speed) | |
# 3 - 4 | |
assert is_zero(path.blocks[5].exit_speed) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment