Last active
May 10, 2024 07:09
-
-
Save AMAN021jin/6ccb9f84cb9f5af2aac1d5e42b98a48d to your computer and use it in GitHub Desktop.
Predicting pose using EKF
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from math import atan2, sqrt, pow, pi, sin, cos, degrees, radians, copysign | |
import rospy | |
from geometry_msgs.msg import Twist | |
from turtlesim.msg import Pose | |
import numpy as np | |
import time | |
from common_utils import CommonFunctions | |
from ekf import EKF | |
class EKF(): | |
def __init__(self, pose_data: list) -> None: | |
initial_x = float(pose_data[0]) | |
initial_y = float(pose_data[1]) | |
initial_theta = float(pose_data[2]) | |
self.__linear_vel = float(pose_data[3]) | |
self.__angular_vel = float(pose_data[4]) | |
self.__theta = initial_theta | |
self.__x = np.array([initial_x, initial_y, initial_theta, | |
self.__linear_vel, self.__angular_vel]).reshape(5,1) | |
self.__P = np.eye(5) | |
self.__R = np.eye(5) | |
self.__F = np.eye(5) | |
self.__Q = np.eye(5) | |
self.__H = np.eye(5) | |
def predict_step(self, dt: float) -> None: | |
# x at (t+1) = F.X(at t) | |
# P at (t+1) = F.P(at t).F(transpose) + R(noise at t) | |
self.__F = np.array([[1, 0, -dt*self.__linear_vel*(np.sin(self.__theta))], | |
[0, 1, dt*self.__linear_vel*(np.cos(self.__theta))], | |
[0, 0, 1]]) | |
self.__Q = 0.001*self.__F.dot(self.__F.transpose()) | |
predicted_x = self.__F.dot(self.__x) | |
predicted_P = self.__F.dot(self.__P).dot(self.__F.transpose()) + self.__Q | |
self.__x = predicted_x | |
self.__P = predicted_P | |
self.__x[0] += self.__linear_vel * np.cos(self.__x[2]) * dt | |
self.__x[1] += self.__linear_vel * np.sin(self.__x[2]) * dt | |
self.__x[2] += self.__theta * dt | |
return self.__x | |
def update_step(self, measurement): | |
# y = z (at t) - H.(predicted x) | |
# S = H.(predicted P).H(transpose) + Q (measurement noise) | |
# K = (predicted P).H(transpose).S(inverse) | |
# (new x) = (predicted x) + K.y | |
# (new P) = (I - K.H)(predicted P) | |
z = np.array([measurement]).reshape(3,1) | |
y = z - self.__H.dot(self.__x) | |
print(y) | |
print(self.__P.dot(self.__H.transpose())) | |
s = self.__H.dot(self.__P).dot(self.__H.transpose()) + self.__R | |
print(s) | |
k = self.__P.dot(self.__H.transpose()).dot(np.linalg.inv(s)) | |
new_x = self.__x + k.dot(y) | |
new_p = (np.eye(3) - k.dot(self.__H)).dot(self.__P) | |
self.__x = new_x | |
self.__P = new_p | |
return new_x | |
class ChaseTurtleWithPlan(): | |
def __init__(self): | |
rospy.init_node('chasing', anonymous=True) | |
self.__velocity_publisher = rospy.Publisher('/turtle2/cmd_vel', Twist, queue_size=10) | |
self.__real_pose_subscriber = rospy.Subscriber('/rt_real_pose', Pose, self.plan) | |
self.__rate = rospy.Rate(10) | |
self.__pose_subscriber = rospy.Subscriber('/turtle2/pose', Pose, self.__posecallback) | |
self.__pose = Pose() | |
self.__common_functions = CommonFunctions() | |
self.__max_vel = 1 | |
def __posecallback(self, msg): | |
self.__pose = msg | |
def __moveGoal(self, target): | |
vel_msgs = Twist() | |
last_vel = Twist() | |
I_error = 0 | |
last_error = sqrt(pow((self.__pose.x-target.x), 2) + pow((self.__pose.y - target.y), 2)) | |
last_angle_err = 0 | |
i_angle_err = 0 | |
t_final = time.time() | |
while not rospy.is_shutdown(): | |
I_error = I_error + last_error | |
rospy.loginfo("I Error is %f ", I_error) | |
error = sqrt(pow((self.__pose.x-target.x), 2) + pow((self.__pose.y - target.y), 2)) | |
rospy.loginfo("Error between current pose and the target pose %f", error) | |
D_error = error - last_error | |
angle_error = atan2(target.y - self.__pose.y, target.x - self.__pose.x) | |
if angle_error > pi: | |
angle_error -= 2 * pi | |
elif angle_error < -pi: | |
angle_error += 2 * pi | |
d_angle_err = angle_error - last_angle_err | |
i_angle_err = i_angle_err + last_angle_err | |
rospy.loginfo("Angle Error %f", angle_error) | |
rospy.loginfo("Derivative angle error %f", d_angle_err) | |
# I_error = I_error + error | |
if(error<=0.3): | |
vel_msgs.linear.x = 0 | |
vel_msgs.angular.z = 0 | |
self.__velocity_publisher.publish(vel_msgs) | |
I_error = 0 | |
rospy.loginfo("Goal Reached") | |
break | |
else: | |
if I_error > 50: | |
I_error = 0 | |
vel_msgs.linear.x = kp*(error) + kd * (D_error) + ki * (I_error) | |
vel_msgs.linear.y = 0 | |
vel_msgs.linear.z = 0 | |
vel_msgs.angular.x = 0 | |
vel_msgs.angular.y = 0 | |
signed_angle_diff = angle_error - self.__pose.theta | |
if signed_angle_diff > pi: | |
signed_angle_diff -= 2 * pi | |
elif signed_angle_diff < -pi: | |
signed_angle_diff += 2 * pi | |
vel_msgs.angular.z = 8*(kp*signed_angle_diff + (kd - 0.1)*d_angle_err +\ | |
ki * i_angle_err) | |
rospy.loginfo("Linear Velocity of PT is %f" , vel_msgs.linear.x) | |
rospy.loginfo("Angular velocity of PT is %f ", vel_msgs.angular.z) | |
last_vel,t_final = self.__common_functions.step_vel(self.__velocity_publisher, | |
vel_msgs,last_vel,t_final) | |
self.__velocity_publisher.publish(vel_msgs) | |
self.__rate.sleep() | |
last_error = error | |
last_angle_err = angle_error | |
def distance(self,goal_x, goal_y): | |
return(sqrt(pow((self.__pose.x-goal_x), 2) + pow((self.__pose.y-goal_y), 2))) | |
def plan(self,target): | |
dist_to_target = self.distance(target.x, target.y) | |
print() | |
if(dist_to_target <= 0.5): | |
rospy.loginfo("My current pose is x: %f, y: %f ", self.__pose.x, self.__pose.y) | |
rospy.loginfo("Target pose is x: %f, y: %f ",target.x,target.y) | |
zero_vel=Twist() | |
zero_vel.linear.x = 0 | |
zero_vel.angular.z = 0 | |
self.__velocity_publisher.publish(zero_vel) | |
rospy.loginfo("Distance is less than 0.5 units. Caught Robber Turtle \n") | |
self.__real_pose_subscriber.unregister() | |
rospy.set_param('caughtStatus',True) | |
else: | |
ekf_input = [target.x, target.y, target.theta, | |
target.linear_velocity, target.angular_velocity] | |
ekf = EKF(ekf_input) | |
dt = 0.1 | |
total_time = 10 | |
# steps = int(total_time/dt) | |
# for _ in np.arange(0, total_time, dt): | |
estimated_pose = ekf.predict_step(dt) | |
predicted_pose = Pose() | |
predicted_pose.x = estimated_pose[0] | |
predicted_pose.y = estimated_pose[1] | |
predicted_pose.theta = estimated_pose[2] | |
predicted_pose.linear_velocity = target.linear_velocity | |
predicted_pose.angular_velocity = target.angular_velocity | |
dist_to_target = self.distance(estimated_pose[0], estimated_pose[1]) | |
print(".........distance to target, ", dist_to_target) | |
print("Predicted pose is......................: ", predicted_pose) | |
self.__moveGoal(predicted_pose) | |
if __name__ == '__main__': | |
try: | |
time.sleep(10) | |
x = ChaseTurtleWithPlan() | |
rospy.spin() | |
except rospy.ROSInterruptException: pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from math import atan2, sqrt, pow, pi, sin, cos, degrees, radians, copysign | |
import rospy | |
from geometry_msgs.msg import Twist | |
from turtlesim.msg import Pose | |
import numpy as np | |
import time | |
from common_utils import CommonFunctions | |
from ekf import EKF | |
class EKF(): | |
def __init__(self, pose_data: list) -> None: | |
initial_x = float(pose_data[0]) | |
initial_y = float(pose_data[1]) | |
initial_theta = float(pose_data[2]) | |
self.__linear_vel = float(pose_data[3]) | |
self.__angular_vel = float(pose_data[4]) | |
self.__theta = initial_theta | |
self.__x = np.array([initial_x, initial_y, initial_theta]).reshape(3,1) | |
self.__P = np.eye(3) | |
self.__R = np.eye(3) | |
self.__F = np.eye(3) | |
self.__Q = np.eye(3) | |
self.__H = np.eye(3) | |
def predict_step(self, dt: float) -> None: | |
# x at (t+1) = F.X(at t) | |
# P at (t+1) = F.P(at t).F(transpose) + R(noise at t) | |
self.__F = np.array([[1, 0, -dt*self.__linear_vel*(np.sin(self.__theta))], | |
[0, 1, dt*self.__linear_vel*(np.cos(self.__theta))], | |
[0, 0, 1]]) | |
# self.__Q = 0.001*self.__F.dot(self.__F.transpose()) | |
predicted_x = self.__F.dot(self.__x) | |
predicted_P = self.__F.dot(self.__P).dot(self.__F.transpose()) | |
self.__x = predicted_x | |
self.__P = predicted_P | |
# self.__x[0] += self.__linear_vel * np.cos(self.__x[2]) * dt | |
# self.__x[1] += self.__linear_vel * np.sin(self.__x[2]) * dt | |
# self.__x[2] += self.__theta * dt | |
return self.__x | |
def update_step(self, measurement): | |
# y = z (at t) - H.(predicted x) | |
# S = H.(predicted P).H(transpose) + Q (measurement noise) | |
# K = (predicted P).H(transpose).S(inverse) | |
# (new x) = (predicted x) + K.y | |
# (new P) = (I - K.H)(predicted P) | |
z = np.array([measurement]).reshape(3,1) | |
y = z - self.__H.dot(self.__x) | |
print(y) | |
print(self.__P.dot(self.__H.transpose())) | |
s = self.__H.dot(self.__P).dot(self.__H.transpose()) | |
print(s) | |
k = self.__P.dot(self.__H.transpose()).dot(np.linalg.inv(s)) | |
new_x = self.__x + k.dot(y) | |
new_p = (np.eye(3) - k.dot(self.__H)).dot(self.__P) | |
self.__x = new_x | |
self.__P = new_p | |
return new_x | |
class ChaseTurtleWithPlan(): | |
def __init__(self): | |
rospy.init_node('chasing', anonymous=True) | |
self.__velocity_publisher = rospy.Publisher('/turtle2/cmd_vel', Twist, queue_size=10) | |
self.__real_pose_subscriber = rospy.Subscriber('/rt_real_pose', Pose, self.plan) | |
self.__rate = rospy.Rate(10) | |
self.__pose_subscriber = rospy.Subscriber('/turtle2/pose', Pose, self.__posecallback) | |
self.__pose = Pose() | |
self.__common_functions = CommonFunctions() | |
self.__max_vel = 1 | |
def __posecallback(self, msg): | |
self.__pose = msg | |
def __moveGoal(self, target): | |
vel_msgs = Twist() | |
last_vel = Twist() | |
I_error = 0 | |
last_error = sqrt(pow((self.__pose.x-target.x), 2) + pow((self.__pose.y - target.y), 2)) | |
last_angle_err = 0 | |
i_angle_err = 0 | |
t_final = time.time() | |
while not rospy.is_shutdown(): | |
I_error = I_error + last_error | |
rospy.loginfo("I Error is %f ", I_error) | |
error = sqrt(pow((self.__pose.x-target.x), 2) + pow((self.__pose.y - target.y), 2)) | |
rospy.loginfo("Error between current pose and the target pose %f", error) | |
D_error = error - last_error | |
angle_error = atan2(target.y - self.__pose.y, target.x - self.__pose.x) | |
if angle_error > pi: | |
angle_error -= 2 * pi | |
elif angle_error < -pi: | |
angle_error += 2 * pi | |
d_angle_err = angle_error - last_angle_err | |
i_angle_err = i_angle_err + last_angle_err | |
rospy.loginfo("Angle Error %f", angle_error) | |
rospy.loginfo("Derivative angle error %f", d_angle_err) | |
# I_error = I_error + error | |
if(error<=0.3): | |
vel_msgs.linear.x = 0 | |
vel_msgs.angular.z = 0 | |
self.__velocity_publisher.publish(vel_msgs) | |
I_error = 0 | |
rospy.loginfo("Goal Reached") | |
break | |
else: | |
if I_error > 50: | |
I_error = 0 | |
vel_msgs.linear.x = kp*(error) + kd * (D_error) + ki * (I_error) | |
vel_msgs.linear.y = 0 | |
vel_msgs.linear.z = 0 | |
vel_msgs.angular.x = 0 | |
vel_msgs.angular.y = 0 | |
signed_angle_diff = angle_error - self.__pose.theta | |
if signed_angle_diff > pi: | |
signed_angle_diff -= 2 * pi | |
elif signed_angle_diff < -pi: | |
signed_angle_diff += 2 * pi | |
vel_msgs.angular.z = 8*(kp*signed_angle_diff + (kd - 0.1)*d_angle_err +\ | |
ki * i_angle_err) | |
rospy.loginfo("Linear Velocity of PT is %f" , vel_msgs.linear.x) | |
rospy.loginfo("Angular velocity of PT is %f ", vel_msgs.angular.z) | |
last_vel,t_final = self.__common_functions.step_vel(self.__velocity_publisher, | |
vel_msgs,last_vel,t_final) | |
self.__velocity_publisher.publish(vel_msgs) | |
self.__rate.sleep() | |
last_error = error | |
last_angle_err = angle_error | |
def distance(self,goal_x, goal_y): | |
return(sqrt(pow((self.__pose.x-goal_x), 2) + pow((self.__pose.y-goal_y), 2))) | |
def plan(self,target): | |
dist_to_target = self.distance(target.x, target.y) | |
print() | |
if(dist_to_target <= 0.5): | |
rospy.loginfo("My current pose is x: %f, y: %f ", self.__pose.x, self.__pose.y) | |
rospy.loginfo("Target pose is x: %f, y: %f ",target.x,target.y) | |
zero_vel=Twist() | |
zero_vel.linear.x = 0 | |
zero_vel.angular.z = 0 | |
self.__velocity_publisher.publish(zero_vel) | |
rospy.loginfo("Distance is less than 0.5 units. Caught Robber Turtle \n") | |
self.__real_pose_subscriber.unregister() | |
rospy.set_param('caughtStatus',True) | |
else: | |
ekf_input = [target.x, target.y, target.theta, | |
target.linear_velocity, target.angular_velocity] | |
ekf = EKF(ekf_input) | |
dt = 0.1 | |
total_time = 10 | |
# steps = int(total_time/dt) | |
# for _ in np.arange(0, total_time, dt): | |
estimated_pose = ekf.predict_step(dt) | |
predicted_pose = Pose() | |
predicted_pose.x = estimated_pose[0] | |
predicted_pose.y = estimated_pose[1] | |
predicted_pose.theta = estimated_pose[2] | |
predicted_pose.linear_velocity = target.linear_velocity | |
predicted_pose.angular_velocity = target.angular_velocity | |
dist_to_target = self.distance(estimated_pose[0], estimated_pose[1]) | |
print(".........distance to target, ", dist_to_target) | |
print("Predicted pose is......................: ", predicted_pose) | |
self.__moveGoal(predicted_pose) | |
if __name__ == '__main__': | |
try: | |
time.sleep(10) | |
x = ChaseTurtleWithPlan() | |
rospy.spin() | |
except rospy.ROSInterruptException: pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment