Skip to content

Instantly share code, notes, and snippets.

@ipsod
Created January 7, 2024 17:48
Show Gist options
  • Save ipsod/d9aec95cdbb44899366f168323edb2f2 to your computer and use it in GitHub Desktop.
Save ipsod/d9aec95cdbb44899366f168323edb2f2 to your computer and use it in GitHub Desktop.
python trajectory planner WIP
from __future__ import annotations
import enum
from dataclasses import dataclass, field
import pytest # noqa
import numpy as np
from typing import Optional, List, Iterable
NUM_AXES = 3
ZERO = np.array([0] * NUM_AXES)
ONE = np.array([1] * NUM_AXES)
FREE_TURN_ANGLE_THRESHOLD = np.radians(24)
STOP_TURN_ANGLE_THRESHOLD = np.radians(128)
def is_zero(speed: np.array) -> bool:
return np.array_equal(speed, ZERO)
@dataclass
class Command:
point: np.array
speed: np.array
@dataclass
class Block:
p1: np.array
p2: np.array
prev: Optional['Block'] = None
next: Optional['Block'] = None
enter_speed: np.array = field(default_factory=lambda: ZERO)
exit_speed: np.array = field(default_factory=lambda: ZERO)
def __repr__(self):
return f'Block(travel={self.travel}, enter={self.enter_speed}, exit={self.exit_speed})'
@property
def travel(self):
return self.p2 - self.p1
@property
def speed(self):
return (self.enter_speed + self.exit_speed) / 2
def speed_average(self) -> np.array:
return (self.enter_speed + self.exit_speed) / 2
@dataclass
class Path:
CORNER_LENGTH: float = 3
enter_speed: np.array = field(default_factory=lambda: ZERO)
exit_speed: np.array = field(default_factory=lambda: ONE)
accel_limit: np.array = field(default_factory=lambda: ONE)
blocks: List[Block] = field(default_factory=list)
def max_corner_speed(self, block: Block) -> np.ndarray:
if not block.next:
return ZERO
return calculate_max_corner_speed(block.travel, block.next.travel, self.accel_limit)
def look_ahead(self):
for i, block in enumerate(self.blocks):
if not block.next:
block.exit_speed = ZERO # Explicitly setting exit speed to 0 for last block
else:
position_delta = block.next.travel - block.travel
speed_delta = max_speed_delta_for_position_delta(position_delta, self.accel_limit)
# Calculate minimum speed for each axis, considering directionality
min_speed_each_axis = np.where(position_delta >= 0,
np.maximum(block.speed - speed_delta, 0),
np.minimum(block.speed + speed_delta, 0))
max_exit_speed = self.max_corner_speed(block)
print(f"Line {i}: max exit speed: {max_exit_speed}")
# Compare each axis separately and choose the minimum
calculated_exit_speed = (
np.minimum(
np.abs(max_exit_speed),
np.abs(min_speed_each_axis)
)
* np.sign(max_exit_speed)
)
block.exit_speed = calculated_exit_speed
print(f"Line {i}: Calculated exit speed: {calculated_exit_speed}, Enter speed: {block.enter_speed}")
def calculate_max_corner_speed(travel1: np.ndarray, travel2: np.ndarray, max_acceleration: np.ndarray, deviation_factor: float = 0.1) -> np.ndarray:
# Application of GRBL cornering algorithm. See reference: https://mattwidmann.net/notes/grbls-cornering-algorithm/
# Normalize the vectors
e_i = travel1 / np.linalg.norm(travel1)
e_o = travel2 / np.linalg.norm(travel2)
# Calculate the cosine of the angle between vectors
cos_theta = np.dot(e_i, e_o)
# Half-angle sine calculation
sin_half_theta = np.sqrt((1 - cos_theta) / 2)
# Calculate the radius of the approximating circle
r = deviation_factor / (sin_half_theta / (1 - sin_half_theta))
# Calculate the cornering velocity for each axis
v_c = np.sqrt(max_acceleration * r)
# Ensuring the velocity is a vector with the same dimensions as the acceleration limit
v_c = np.minimum(v_c, np.linalg.norm(travel1)) * travel1 / np.linalg.norm(travel1)
return v_c
def compute_circle_radius(theta: float, delta: float) -> float:
# Handling the case when theta is close to 180 degrees (pi radians)
if np.isclose(theta, np.pi, atol=1e-3): # Adjust tolerance as needed
return float('inf')
sin_theta_over_2 = np.sqrt((1 - np.cos(theta)) / 2)
radius = delta * sin_theta_over_2 / (1 - sin_theta_over_2)
print(f"Radius: {radius}")
return delta * sin_theta_over_2 / (1 - sin_theta_over_2)
def angle_between(v1: np.array, v2: np.array) -> float:
if np.allclose(v1, 0) or np.allclose(v2, 0):
return np.pi
dot_product = np.dot(v1, v2)
magnitude_product = np.linalg.norm(v1) * np.linalg.norm(v2)
if np.isclose(magnitude_product, 0):
return 0
cos_theta = dot_product / magnitude_product
return np.arccos(np.clip(cos_theta, -1, 1)) # Clipping for numerical stability
def closest_to_zero(arr1: np.array, arr2: np.array) -> np.array:
return np.where(np.abs(arr1) < np.abs(arr2), arr1, arr2)
def max_speed_delta_for_position_delta(position_delta: np.array, acceleration_limits: np.array) -> np.array:
return np.sqrt(2 * acceleration_limits * np.abs(position_delta))
def invert_blocks(blocks: List[Block]) -> List[Block]:
"""
Inverts the acceleration for each block.
"""
inverted_blocks = []
for block in blocks:
inverted_enter_speed = block.exit_speed # WARN: switched
inverted_exit_speed = block.enter_speed # WARN: switched
# Create a new Block with inverted acceleration
inverted_block = Block(
p1=block.p1,
p2=block.p2,
prev=block.prev,
next=block.next,
enter_speed=inverted_enter_speed,
exit_speed=inverted_exit_speed,
)
inverted_blocks.append(inverted_block)
return inverted_blocks
def initialize_path(commands: List[Command]) -> Path:
# Run acceleration forward, to figure out how to accelerate from stops.
forward_blocks = apply_acceleration(commands)
print("initialize_path forward_blocks", [block for block in forward_blocks])
forward_path = Path(blocks=forward_blocks)
# Run acceleration backward, to figure out when to slow down for stops.
backward_commands = commands[::-1]
backward_blocks = apply_acceleration(backward_commands)[::-1]
inverted_backward_blocks = invert_blocks(backward_blocks)
backward_path = Path(blocks=inverted_backward_blocks)
print("Forward path before processing:")
for block in forward_path.blocks:
print(f"Enter Speed: {block.enter_speed}, Exit Speed: {block.exit_speed}, Block: {block}, ")
print("Backward path before processing:")
for block in backward_path.blocks:
print(f"Enter Speed: {block.enter_speed}, Exit Speed: {block.exit_speed}, Block: {block}, ")
print("ZIPP", list(forward_path.blocks))
print("ZIPP", list(backward_path.blocks))
for f_block, b_block in zip(forward_path.blocks, backward_path.blocks):
f_block.enter_speed = closest_to_zero(f_block.enter_speed, b_block.enter_speed)
f_block.exit_speed = closest_to_zero(f_block.exit_speed, b_block.exit_speed)
assert not np.any(np.isnan(f_block.enter_speed)), "f_block.enter_speed contains nan"
assert not np.any(np.isnan(f_block.exit_speed)), "f_block.exit_speed contains nan"
print(f"Forward path: {forward_path.blocks}")
for block1, block2 in zip(forward_path.blocks[:-1], forward_path.blocks[1:]):
assert np.array_equal(block1.exit_speed, block2.enter_speed)
return forward_path
def apply_acceleration(commands: List[Command], accel_limits=ONE, enter_speed=ZERO) -> List[Block]:
current_speed = enter_speed
blocks = []
for i in range(len(commands) - 1):
p1 = commands[i].point
p2 = commands[i + 1].point
travel = p2 - p1
maximum_speed_delta = max_speed_delta_for_position_delta(travel, accel_limits)
print(f"i={i}, maximum_speed_delta={maximum_speed_delta}")
direction = np.sign(travel)
max_next_speed = current_speed + direction * maximum_speed_delta
print(f"max_next_speed={max_next_speed}, commands[i + 1].cmd_speed={commands[i + 1].speed}")
exit_speed = np.where(direction >= 0,
np.minimum(max_next_speed, commands[i + 1].speed),
np.maximum(max_next_speed, -commands[i + 1].speed))
if i < len(commands) - 2:
travel1 = travel
p3 = commands[i + 2].point
travel2 = p3 - p2
corner_speed = calculate_max_corner_speed(travel1, travel2, accel_limits)
# Limit exit_speed for each axis based on corner speed
exit_speed = closest_to_zero(exit_speed, corner_speed) # TODO: this is wrong
print(f"corner_speed={corner_speed}, limited exit_speed={exit_speed}")
if np.any(np.isnan(exit_speed)):
print(f"Warning: NaN detected in exit_speed at command index {i}")
block = Block(p1=p1, p2=p2, enter_speed=current_speed, exit_speed=exit_speed)
blocks.append(block)
current_speed = exit_speed # Update current speed for the next block
# Set exit speed of the last block to zero
blocks[-1].exit_speed = ZERO
# Link blocks
for i in range(len(blocks) - 1):
blocks[i].next = blocks[i + 1]
blocks[i + 1].prev = blocks[i]
for block in blocks:
print(block)
return blocks
def generate_commands(positions: List[np.array | List], speed: Iterable = ONE) -> List[Command]:
speed = np.array(speed)
positions = np.array(positions)
return [Command(point=point, speed=speed) for point in positions]
"""
TESTS
"""
def test_calculate_max_corner_speed_smooth_turn():
travel1 = np.array([1, 0, 0])
travel2 = np.array([1, 1, 0])
accel_limit = np.array([1, 1, 1])
speed = calculate_max_corner_speed(travel1, travel2, accel_limit)
assert not np.array_equal(speed, np.array([0, 0, 0]))
def test_calculate_max_corner_speed_direction_change():
travel1 = np.array([1, 1, 1])
travel2 = np.array([-1, -1, -1])
accel_limit = np.array([1, 1, 1])
speed = calculate_max_corner_speed(travel1, travel2, accel_limit)
assert np.array_equal(speed, np.array([0, 0, 0]))
def test_calculate_max_corner_speed_straight_line():
travel1 = np.array([1, 1, 1])
travel2 = np.array([-1, -1, -1])
accel_limit = np.array([1, 1, 1])
speed = calculate_max_corner_speed(travel1, travel2, accel_limit)
assert np.array_equal(speed, np.array([0, 0, 0]))
def test_calculate_max_corner_speed_sharp_turn_positive_to_origin():
travel1 = np.array([1, 1, 1])
travel2 = np.array([-1, -1, -1])
accel_limit = np.array([1, 1, 1])
speed = calculate_max_corner_speed(travel1, travel2, accel_limit)
assert np.array_equal(speed, np.array([0, 0, 0]))
def test_calculate_max_corner_speed_sharp_turn_negative_to_origin():
travel1 = np.array([-1, -1, -1])
travel2 = np.array([1, 1, 1])
accel_limit = np.array([1, 1, 1])
speed = calculate_max_corner_speed(travel1, travel2, accel_limit)
assert np.array_equal(speed, np.array([0, 0, 0]))
class StopState(enum.Enum):
IGNORE_FIRST = 100
STOP = 0
SLOW = 1
GO = 2
STOP = StopState.STOP
GO = StopState.GO
SLOW = StopState.SLOW
IGNORE_FIRST = StopState.IGNORE_FIRST
@dataclass
class Check:
position: List[int]
state: StopState
def run_acceleration_tests(checks: List[Check]):
positions = [check.position for check in checks]
stop_states = [check.state for check in checks[1:]] # Ignore first
commands = generate_commands(positions, ONE)
blocks = apply_acceleration(commands)
assert len(blocks) == len(commands) - 1
for stop_state, block in zip(stop_states, blocks):
if stop_state == STOP:
assert is_zero(block.exit_speed)
elif stop_state == GO:
assert not is_zero(block.exit_speed)
elif stop_state == SLOW:
assert not is_zero(block.exit_speed)
# TODO: more
def test_apply_acceleration():
checks = [
Check([0, 0, 0], IGNORE_FIRST),
Check([1, 1, 1], GO),
Check([2, 2, 2], STOP)
]
positions = [check.position for check in checks]
commands = generate_commands(positions, ONE)
blocks = apply_acceleration(commands)
assert not is_zero(blocks[0].exit_speed) # Check([1, 1, 1], GO)
assert is_zero(blocks[1].exit_speed) # Check([2, 2, 2], STOP)
def test_apply_acceleration_backward_directions():
checks = [
Check([0, 0, 0], IGNORE_FIRST),
Check([-1, -1, -1], GO),
Check([-2, -2, -2], STOP)
]
positions = [check.position for check in checks]
commands = generate_commands(positions, ONE)
blocks = apply_acceleration(commands)
assert not is_zero(blocks[0].exit_speed) # Check([-1, -1, -1], GO)
assert is_zero(blocks[1].exit_speed) # Check([-2, -2, -2], STOP)
def test_apply_acceleration_mixed_directions():
checks = [
Check([0, 0, 0], IGNORE_FIRST),
Check([1, -1, 0], GO),
Check([-1, 1, 0], STOP)
]
positions = [check.position for check in checks]
commands = generate_commands(positions, ONE)
blocks = apply_acceleration(commands)
assert not is_zero(blocks[0].exit_speed) # Check([1, -1, 0], GO)
assert is_zero(blocks[1].exit_speed) # Check([-1, 1, 0], STOP)
def test_apply_acceleration_with_direction_change():
checks = [
Check([0, 0, 0], IGNORE_FIRST),
Check([1, 1, 1], GO),
Check([0, 0, 0], STOP)
]
positions = [check.position for check in checks]
commands = generate_commands(positions, ONE)
blocks = apply_acceleration(commands)
assert is_zero(blocks[0].exit_speed) # Check([1, 1, 1], GO)
assert is_zero(blocks[1].exit_speed) # Check([0, 0, 0], STOP)
def test_apply_acceleration_with_direction_change_backward():
checks = [
Check([0, 0, 0], IGNORE_FIRST),
Check([-1, -1, -1], STOP),
Check([0, 0, 0], STOP)
]
positions = [check.position for check in checks]
commands = generate_commands(positions, ONE)
blocks = apply_acceleration(commands)
assert is_zero(blocks[0].exit_speed) # Check([-1, -1, -1], STOP)
assert is_zero(blocks[1].exit_speed) # Check([0, 0, 0], STOP)
def test_apply_acceleration_with_sharp_corners():
checks = [
Check([0, 0, 0], IGNORE_FIRST),
Check([1, 1, 1], GO),
Check([2, 2, 2], STOP),
Check([-1, -1, -1], STOP),
Check([1, 1, 1], GO),
Check([2, 2, 2], STOP),
Check([0, 0, 0], STOP)
]
positions = [check.position for check in checks]
commands = generate_commands(positions, ONE)
blocks = apply_acceleration(commands)
assert not is_zero(blocks[0].exit_speed) # Check([1, 1, 1], GO)
assert is_zero(blocks[1].exit_speed) # Check([2, 2, 2], STOP)
assert is_zero(blocks[2].exit_speed) # Check([-1, -1, -1], STOP)
assert not is_zero(blocks[3].exit_speed) # Check([1, 1, 1], GO)
assert is_zero(blocks[4].exit_speed) # Check([2, 2, 2], STOP)
assert is_zero(blocks[5].exit_speed) # Check([0, 0, 0], STOP)
def test_path_speed():
checks = [
Check([0, 0, 0], IGNORE_FIRST),
Check([1, 1, 1], GO),
Check([2, 2, 2], STOP),
]
positions = [check.position for check in checks]
commands = generate_commands(positions, speed=ONE)
path = initialize_path(commands)
assert is_zero(path.blocks[0].enter_speed)
assert not is_zero(path.blocks[0].exit_speed)
assert is_zero(path.blocks[-1].exit_speed)
def test_path_speed_backward():
commands = generate_commands(
positions=[[n] * NUM_AXES for n in [0, -1, -2]],
speed=[1, 1, 1]
)
path = initialize_path(commands)
assert is_zero(path.blocks[0].enter_speed)
assert not is_zero(path.blocks[0].exit_speed)
assert is_zero(path.blocks[-1].exit_speed)
def test_path_speed_reverse():
commands = generate_commands(
positions=[[0, 0, 0], [1, 1, 1], [0, 0, 0]],
speed=[1, 1, 1]
)
path = initialize_path(commands)
assert is_zero(path.blocks[0].enter_speed)
assert is_zero(path.blocks[0].exit_speed)
assert is_zero(path.blocks[1].enter_speed)
assert is_zero(path.blocks[1].exit_speed)
def test_path_with_mixed_corners():
commands = generate_commands(
positions=[[n] * NUM_AXES for n in [0, 1, 2, 3, 2, 3, 4]],
speed=[1, 1, 1]
)
path = initialize_path(commands)
expected_travels = [...] # Fill in with expected travel distances for each block
expected_exit_speeds = [...] # Fill in with expected exit speeds for each block
# 0 - 1
assert is_zero(path.blocks[0].enter_speed)
assert not is_zero(path.blocks[0].exit_speed)
# 1 - 2
assert not is_zero(path.blocks[1].exit_speed)
# 2 - 3
assert is_zero(path.blocks[2].exit_speed)
# 3 - 2
assert is_zero(path.blocks[3].exit_speed)
# 2 - 3
assert not is_zero(path.blocks[4].exit_speed)
# 3 - 4
assert is_zero(path.blocks[5].exit_speed)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment