Skip to content

Instantly share code, notes, and snippets.

@AMAN021jin
Last active May 10, 2024 07:09
Show Gist options
  • Save AMAN021jin/6ccb9f84cb9f5af2aac1d5e42b98a48d to your computer and use it in GitHub Desktop.
Save AMAN021jin/6ccb9f84cb9f5af2aac1d5e42b98a48d to your computer and use it in GitHub Desktop.
Predicting pose using EKF
#!/usr/bin/env python3
from math import atan2, sqrt, pow, pi, sin, cos, degrees, radians, copysign
import rospy
from geometry_msgs.msg import Twist
from turtlesim.msg import Pose
import numpy as np
import time
from common_utils import CommonFunctions
from ekf import EKF
class EKF():
def __init__(self, pose_data: list) -> None:
initial_x = float(pose_data[0])
initial_y = float(pose_data[1])
initial_theta = float(pose_data[2])
self.__linear_vel = float(pose_data[3])
self.__angular_vel = float(pose_data[4])
self.__theta = initial_theta
self.__x = np.array([initial_x, initial_y, initial_theta,
self.__linear_vel, self.__angular_vel]).reshape(5,1)
self.__P = np.eye(5)
self.__R = np.eye(5)
self.__F = np.eye(5)
self.__Q = np.eye(5)
self.__H = np.eye(5)
def predict_step(self, dt: float) -> None:
# x at (t+1) = F.X(at t)
# P at (t+1) = F.P(at t).F(transpose) + R(noise at t)
self.__F = np.array([[1, 0, -dt*self.__linear_vel*(np.sin(self.__theta))],
[0, 1, dt*self.__linear_vel*(np.cos(self.__theta))],
[0, 0, 1]])
self.__Q = 0.001*self.__F.dot(self.__F.transpose())
predicted_x = self.__F.dot(self.__x)
predicted_P = self.__F.dot(self.__P).dot(self.__F.transpose()) + self.__Q
self.__x = predicted_x
self.__P = predicted_P
self.__x[0] += self.__linear_vel * np.cos(self.__x[2]) * dt
self.__x[1] += self.__linear_vel * np.sin(self.__x[2]) * dt
self.__x[2] += self.__theta * dt
return self.__x
def update_step(self, measurement):
# y = z (at t) - H.(predicted x)
# S = H.(predicted P).H(transpose) + Q (measurement noise)
# K = (predicted P).H(transpose).S(inverse)
# (new x) = (predicted x) + K.y
# (new P) = (I - K.H)(predicted P)
z = np.array([measurement]).reshape(3,1)
y = z - self.__H.dot(self.__x)
print(y)
print(self.__P.dot(self.__H.transpose()))
s = self.__H.dot(self.__P).dot(self.__H.transpose()) + self.__R
print(s)
k = self.__P.dot(self.__H.transpose()).dot(np.linalg.inv(s))
new_x = self.__x + k.dot(y)
new_p = (np.eye(3) - k.dot(self.__H)).dot(self.__P)
self.__x = new_x
self.__P = new_p
return new_x
class ChaseTurtleWithPlan():
def __init__(self):
rospy.init_node('chasing', anonymous=True)
self.__velocity_publisher = rospy.Publisher('/turtle2/cmd_vel', Twist, queue_size=10)
self.__real_pose_subscriber = rospy.Subscriber('/rt_real_pose', Pose, self.plan)
self.__rate = rospy.Rate(10)
self.__pose_subscriber = rospy.Subscriber('/turtle2/pose', Pose, self.__posecallback)
self.__pose = Pose()
self.__common_functions = CommonFunctions()
self.__max_vel = 1
def __posecallback(self, msg):
self.__pose = msg
def __moveGoal(self, target):
vel_msgs = Twist()
last_vel = Twist()
I_error = 0
last_error = sqrt(pow((self.__pose.x-target.x), 2) + pow((self.__pose.y - target.y), 2))
last_angle_err = 0
i_angle_err = 0
t_final = time.time()
while not rospy.is_shutdown():
I_error = I_error + last_error
rospy.loginfo("I Error is %f ", I_error)
error = sqrt(pow((self.__pose.x-target.x), 2) + pow((self.__pose.y - target.y), 2))
rospy.loginfo("Error between current pose and the target pose %f", error)
D_error = error - last_error
angle_error = atan2(target.y - self.__pose.y, target.x - self.__pose.x)
if angle_error > pi:
angle_error -= 2 * pi
elif angle_error < -pi:
angle_error += 2 * pi
d_angle_err = angle_error - last_angle_err
i_angle_err = i_angle_err + last_angle_err
rospy.loginfo("Angle Error %f", angle_error)
rospy.loginfo("Derivative angle error %f", d_angle_err)
# I_error = I_error + error
if(error<=0.3):
vel_msgs.linear.x = 0
vel_msgs.angular.z = 0
self.__velocity_publisher.publish(vel_msgs)
I_error = 0
rospy.loginfo("Goal Reached")
break
else:
if I_error > 50:
I_error = 0
vel_msgs.linear.x = kp*(error) + kd * (D_error) + ki * (I_error)
vel_msgs.linear.y = 0
vel_msgs.linear.z = 0
vel_msgs.angular.x = 0
vel_msgs.angular.y = 0
signed_angle_diff = angle_error - self.__pose.theta
if signed_angle_diff > pi:
signed_angle_diff -= 2 * pi
elif signed_angle_diff < -pi:
signed_angle_diff += 2 * pi
vel_msgs.angular.z = 8*(kp*signed_angle_diff + (kd - 0.1)*d_angle_err +\
ki * i_angle_err)
rospy.loginfo("Linear Velocity of PT is %f" , vel_msgs.linear.x)
rospy.loginfo("Angular velocity of PT is %f ", vel_msgs.angular.z)
last_vel,t_final = self.__common_functions.step_vel(self.__velocity_publisher,
vel_msgs,last_vel,t_final)
self.__velocity_publisher.publish(vel_msgs)
self.__rate.sleep()
last_error = error
last_angle_err = angle_error
def distance(self,goal_x, goal_y):
return(sqrt(pow((self.__pose.x-goal_x), 2) + pow((self.__pose.y-goal_y), 2)))
def plan(self,target):
dist_to_target = self.distance(target.x, target.y)
print()
if(dist_to_target <= 0.5):
rospy.loginfo("My current pose is x: %f, y: %f ", self.__pose.x, self.__pose.y)
rospy.loginfo("Target pose is x: %f, y: %f ",target.x,target.y)
zero_vel=Twist()
zero_vel.linear.x = 0
zero_vel.angular.z = 0
self.__velocity_publisher.publish(zero_vel)
rospy.loginfo("Distance is less than 0.5 units. Caught Robber Turtle \n")
self.__real_pose_subscriber.unregister()
rospy.set_param('caughtStatus',True)
else:
ekf_input = [target.x, target.y, target.theta,
target.linear_velocity, target.angular_velocity]
ekf = EKF(ekf_input)
dt = 0.1
total_time = 10
# steps = int(total_time/dt)
# for _ in np.arange(0, total_time, dt):
estimated_pose = ekf.predict_step(dt)
predicted_pose = Pose()
predicted_pose.x = estimated_pose[0]
predicted_pose.y = estimated_pose[1]
predicted_pose.theta = estimated_pose[2]
predicted_pose.linear_velocity = target.linear_velocity
predicted_pose.angular_velocity = target.angular_velocity
dist_to_target = self.distance(estimated_pose[0], estimated_pose[1])
print(".........distance to target, ", dist_to_target)
print("Predicted pose is......................: ", predicted_pose)
self.__moveGoal(predicted_pose)
if __name__ == '__main__':
try:
time.sleep(10)
x = ChaseTurtleWithPlan()
rospy.spin()
except rospy.ROSInterruptException: pass
#!/usr/bin/env python3
from math import atan2, sqrt, pow, pi, sin, cos, degrees, radians, copysign
import rospy
from geometry_msgs.msg import Twist
from turtlesim.msg import Pose
import numpy as np
import time
from common_utils import CommonFunctions
from ekf import EKF
class EKF():
def __init__(self, pose_data: list) -> None:
initial_x = float(pose_data[0])
initial_y = float(pose_data[1])
initial_theta = float(pose_data[2])
self.__linear_vel = float(pose_data[3])
self.__angular_vel = float(pose_data[4])
self.__theta = initial_theta
self.__x = np.array([initial_x, initial_y, initial_theta]).reshape(3,1)
self.__P = np.eye(3)
self.__R = np.eye(3)
self.__F = np.eye(3)
self.__Q = np.eye(3)
self.__H = np.eye(3)
def predict_step(self, dt: float) -> None:
# x at (t+1) = F.X(at t)
# P at (t+1) = F.P(at t).F(transpose) + R(noise at t)
self.__F = np.array([[1, 0, -dt*self.__linear_vel*(np.sin(self.__theta))],
[0, 1, dt*self.__linear_vel*(np.cos(self.__theta))],
[0, 0, 1]])
# self.__Q = 0.001*self.__F.dot(self.__F.transpose())
predicted_x = self.__F.dot(self.__x)
predicted_P = self.__F.dot(self.__P).dot(self.__F.transpose())
self.__x = predicted_x
self.__P = predicted_P
# self.__x[0] += self.__linear_vel * np.cos(self.__x[2]) * dt
# self.__x[1] += self.__linear_vel * np.sin(self.__x[2]) * dt
# self.__x[2] += self.__theta * dt
return self.__x
def update_step(self, measurement):
# y = z (at t) - H.(predicted x)
# S = H.(predicted P).H(transpose) + Q (measurement noise)
# K = (predicted P).H(transpose).S(inverse)
# (new x) = (predicted x) + K.y
# (new P) = (I - K.H)(predicted P)
z = np.array([measurement]).reshape(3,1)
y = z - self.__H.dot(self.__x)
print(y)
print(self.__P.dot(self.__H.transpose()))
s = self.__H.dot(self.__P).dot(self.__H.transpose())
print(s)
k = self.__P.dot(self.__H.transpose()).dot(np.linalg.inv(s))
new_x = self.__x + k.dot(y)
new_p = (np.eye(3) - k.dot(self.__H)).dot(self.__P)
self.__x = new_x
self.__P = new_p
return new_x
class ChaseTurtleWithPlan():
def __init__(self):
rospy.init_node('chasing', anonymous=True)
self.__velocity_publisher = rospy.Publisher('/turtle2/cmd_vel', Twist, queue_size=10)
self.__real_pose_subscriber = rospy.Subscriber('/rt_real_pose', Pose, self.plan)
self.__rate = rospy.Rate(10)
self.__pose_subscriber = rospy.Subscriber('/turtle2/pose', Pose, self.__posecallback)
self.__pose = Pose()
self.__common_functions = CommonFunctions()
self.__max_vel = 1
def __posecallback(self, msg):
self.__pose = msg
def __moveGoal(self, target):
vel_msgs = Twist()
last_vel = Twist()
I_error = 0
last_error = sqrt(pow((self.__pose.x-target.x), 2) + pow((self.__pose.y - target.y), 2))
last_angle_err = 0
i_angle_err = 0
t_final = time.time()
while not rospy.is_shutdown():
I_error = I_error + last_error
rospy.loginfo("I Error is %f ", I_error)
error = sqrt(pow((self.__pose.x-target.x), 2) + pow((self.__pose.y - target.y), 2))
rospy.loginfo("Error between current pose and the target pose %f", error)
D_error = error - last_error
angle_error = atan2(target.y - self.__pose.y, target.x - self.__pose.x)
if angle_error > pi:
angle_error -= 2 * pi
elif angle_error < -pi:
angle_error += 2 * pi
d_angle_err = angle_error - last_angle_err
i_angle_err = i_angle_err + last_angle_err
rospy.loginfo("Angle Error %f", angle_error)
rospy.loginfo("Derivative angle error %f", d_angle_err)
# I_error = I_error + error
if(error<=0.3):
vel_msgs.linear.x = 0
vel_msgs.angular.z = 0
self.__velocity_publisher.publish(vel_msgs)
I_error = 0
rospy.loginfo("Goal Reached")
break
else:
if I_error > 50:
I_error = 0
vel_msgs.linear.x = kp*(error) + kd * (D_error) + ki * (I_error)
vel_msgs.linear.y = 0
vel_msgs.linear.z = 0
vel_msgs.angular.x = 0
vel_msgs.angular.y = 0
signed_angle_diff = angle_error - self.__pose.theta
if signed_angle_diff > pi:
signed_angle_diff -= 2 * pi
elif signed_angle_diff < -pi:
signed_angle_diff += 2 * pi
vel_msgs.angular.z = 8*(kp*signed_angle_diff + (kd - 0.1)*d_angle_err +\
ki * i_angle_err)
rospy.loginfo("Linear Velocity of PT is %f" , vel_msgs.linear.x)
rospy.loginfo("Angular velocity of PT is %f ", vel_msgs.angular.z)
last_vel,t_final = self.__common_functions.step_vel(self.__velocity_publisher,
vel_msgs,last_vel,t_final)
self.__velocity_publisher.publish(vel_msgs)
self.__rate.sleep()
last_error = error
last_angle_err = angle_error
def distance(self,goal_x, goal_y):
return(sqrt(pow((self.__pose.x-goal_x), 2) + pow((self.__pose.y-goal_y), 2)))
def plan(self,target):
dist_to_target = self.distance(target.x, target.y)
print()
if(dist_to_target <= 0.5):
rospy.loginfo("My current pose is x: %f, y: %f ", self.__pose.x, self.__pose.y)
rospy.loginfo("Target pose is x: %f, y: %f ",target.x,target.y)
zero_vel=Twist()
zero_vel.linear.x = 0
zero_vel.angular.z = 0
self.__velocity_publisher.publish(zero_vel)
rospy.loginfo("Distance is less than 0.5 units. Caught Robber Turtle \n")
self.__real_pose_subscriber.unregister()
rospy.set_param('caughtStatus',True)
else:
ekf_input = [target.x, target.y, target.theta,
target.linear_velocity, target.angular_velocity]
ekf = EKF(ekf_input)
dt = 0.1
total_time = 10
# steps = int(total_time/dt)
# for _ in np.arange(0, total_time, dt):
estimated_pose = ekf.predict_step(dt)
predicted_pose = Pose()
predicted_pose.x = estimated_pose[0]
predicted_pose.y = estimated_pose[1]
predicted_pose.theta = estimated_pose[2]
predicted_pose.linear_velocity = target.linear_velocity
predicted_pose.angular_velocity = target.angular_velocity
dist_to_target = self.distance(estimated_pose[0], estimated_pose[1])
print(".........distance to target, ", dist_to_target)
print("Predicted pose is......................: ", predicted_pose)
self.__moveGoal(predicted_pose)
if __name__ == '__main__':
try:
time.sleep(10)
x = ChaseTurtleWithPlan()
rospy.spin()
except rospy.ROSInterruptException: pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment