Skip to content

Instantly share code, notes, and snippets.

@robotsorcerer
Created July 19, 2023 20:34
Show Gist options
  • Save robotsorcerer/13df022dcc2099ff86e894d0668cac4f to your computer and use it in GitHub Desktop.
Save robotsorcerer/13df022dcc2099ff86e894d0668cac4f to your computer and use it in GitHub Desktop.
Msg Filters Subscribers in ros 2
#!/usr/bin/env python3
__all__ = ["SubscribeRegress"]
import os
import sys
import time
import rospy
import logging
import pickle
import rospkg
import threading
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from message_filters import TimeSynchronizer, Subscriber, ApproximateTimeSynchronizer
from geometry_msgs.msg import WrenchStamped
from rospy.rostime import Duration
from gazebo_msgs.srv import GetLinkState, GetLinkStateResponse
from gazebo_msgs.srv import GetJointProperties, GetJointPropertiesResponse
from tf.transformations import euler_from_quaternion
sys.path.append( os.path.dirname( os.path.dirname( os.path.abspath(__file__) ) ) )
from scripts.wrench_viz import WrenchVisualizer
rospack = rospkg.RosPack()
data_dir = rospack.get_path('param_est') + '/data'
if not os.path.exists(data_dir):
os.makedirs(data_dir)
LOGGER = logging.getLogger(__name__)
class SubscribeRegress():
"""
Calculates the regression paramters for
equation (4) in the paper.
"""
def __init__(self, rate, verbose=False, data_collect=False, \
freedoms=6, do_plot=True):
super(SubscribeRegress, self).__init__()
self.verbose = verbose
self.base_name = "stewart"
zeros3 = np.zeros((3,1))
joint = dict(omega_prev=zeros3,adot_prev=zeros3)
self.params = {
"chain1": dict(u_joint=joint,p_joint=joint,s_joint=joint),
"chain2": dict(u_joint=joint,p_joint=joint,s_joint=joint),
"chain3": dict(u_joint=joint,p_joint=joint,s_joint=joint),
"chain4": dict(u_joint=joint,p_joint=joint,s_joint=joint),
"chain5": dict(u_joint=joint,p_joint=joint,s_joint=joint),
"chain6": dict(u_joint=joint,p_joint=joint,s_joint=joint),
}
self.gravity_vec = np.array(([[0, 0, -9.8]])).T
self.freedoms = freedoms # total degree of freedoms for the robot
if do_plot:
fig = plt.figure(figsize=(16, 9))
gs = gridspec.GridSpec(1, 1, figure=fig)
plt.ion()
self.viz = WrenchVisualizer(fig, gs[0], pause_time=1e-4, labels=['Wrench: force, torque']
_fontdict=fontdict)
# for data collection
self.data_collect_mode = data_collect
if self.data_collect_mode:
joints = {"wrench_s": np.zeros((6,1)), "wrench_u":np.zeros((6,1)), "wrench_p": np.zeros((6,1)),
"wrench_piston_butt": np.zeros((6,1))}
self.wrenches = {f"chain{i+1}":joints for i in range(6)}
self.data_collect_mode = data_collect
self.counter = 0
def subscribers(self):
"""
Thread this loop from a calling Class's member function
"""
"Intermediary links"
# piston prismatic joints parent:: piston_cylinder, child: shaft
prismatic_sub_1 = Subscriber(rf"/{self.base_name}/piston1_prismatic_wrench", WrenchStamped)
prismatic_sub_2 = Subscriber(rf"/{self.base_name}/piston2_prismatic_wrench", WrenchStamped)
prismatic_sub_3 = Subscriber(rf"/{self.base_name}/piston3_prismatic_wrench", WrenchStamped)
prismatic_sub_4 = Subscriber(rf"/{self.base_name}/piston4_prismatic_wrench", WrenchStamped)
prismatic_sub_5 = Subscriber(rf"/{self.base_name}/piston5_prismatic_wrench", WrenchStamped)
prismatic_sub_6 = Subscriber(rf"/{self.base_name}/piston6_prismatic_wrench", WrenchStamped)
# piston_bottom_pitch_joints:: parent: butt ball link, child piston_cyl_link
prismatic_pitch_sub_1 = Subscriber(rf"/{self.base_name}/piston1_butt_pitch_wrench", WrenchStamped)
prismatic_pitch_sub_2 = Subscriber(rf"/{self.base_name}/piston2_butt_pitch_wrench", WrenchStamped)
prismatic_pitch_sub_3 = Subscriber(rf"/{self.base_name}/piston3_butt_pitch_wrench", WrenchStamped)
prismatic_pitch_sub_4 = Subscriber(rf"/{self.base_name}/piston4_butt_pitch_wrench", WrenchStamped)
prismatic_pitch_sub_5 = Subscriber(rf"/{self.base_name}/piston5_butt_pitch_wrench", WrenchStamped)
prismatic_pitch_sub_6 = Subscriber(rf"/{self.base_name}/piston6_butt_pitch_wrench", WrenchStamped)
"End links"
# piston_top_ball_joints:: parent top_ball_link, child: piston_shaft_link
top_ball_sub_1 = Subscriber(rf"/{self.base_name}/piston1_top_ball_wrench", WrenchStamped)
top_ball_sub_2 = Subscriber(rf"/{self.base_name}/piston2_top_ball_wrench", WrenchStamped)
top_ball_sub_3 = Subscriber(rf"/{self.base_name}/piston3_top_ball_wrench", WrenchStamped)
top_ball_sub_4 = Subscriber(rf"/{self.base_name}/piston4_top_ball_wrench", WrenchStamped)
top_ball_sub_5 = Subscriber(rf"/{self.base_name}/piston5_top_ball_wrench", WrenchStamped)
top_ball_sub_6 = Subscriber(rf"/{self.base_name}/piston6_top_ball_wrench", WrenchStamped)
# butt_ball_joints:: parent:: parent base_link, child butt_ball_link
butt_ball_sub_1 = Subscriber(rf"/{self.base_name}/butt_ball1_wrench", WrenchStamped)
butt_ball_sub_2 = Subscriber(rf"/{self.base_name}/butt_ball2_wrench", WrenchStamped)
butt_ball_sub_3 = Subscriber(rf"/{self.base_name}/butt_ball3_wrench", WrenchStamped)
butt_ball_sub_4 = Subscriber(rf"/{self.base_name}/butt_ball4_wrench", WrenchStamped)
butt_ball_sub_5 = Subscriber(rf"/{self.base_name}/butt_ball5_wrench", WrenchStamped)
butt_ball_sub_6 = Subscriber(rf"/{self.base_name}/butt_ball6_wrench", WrenchStamped)
# end joints: top S-joint and butt U-joint
self.approx_ts = ApproximateTimeSynchronizer(
[prismatic_sub_1, prismatic_sub_2, \
prismatic_sub_3, prismatic_sub_4, \
prismatic_sub_5, prismatic_sub_6,\
prismatic_pitch_sub_1, prismatic_pitch_sub_2, \
prismatic_pitch_sub_3, prismatic_pitch_sub_4, \
prismatic_pitch_sub_5, prismatic_pitch_sub_6, \
top_ball_sub_1, top_ball_sub_2, top_ball_sub_3, \
top_ball_sub_4, top_ball_sub_5, top_ball_sub_6, \
butt_ball_sub_1, butt_ball_sub_2, \
butt_ball_sub_3, butt_ball_sub_4, \
butt_ball_sub_5, butt_ball_sub_6,
], 10, slop=0.51)
self.approx_ts.registerCallback(self.cb)
rospy.spin()
def cb(self, p1_prism, p2_prism, p3_prism, \
p4_prism, p5_prism, p6_prism, \
p1_prism_pitch, p2_prism_pitch, p3_prism_pitch,\
p4_prism_pitch, p5_prism_pitch, p6_prism_pitch, \
p1_top_ball, p2_top_ball, p3_top_ball,\
p4_top_ball, p5_top_ball, p6_top_ball, \
l1_butt_ball, l2_butt_ball, l3_butt_ball, \
l4_butt_ball, l5_butt_ball, l6_butt_ball):
"""
In order: piston prismatic joints
piston_bottom_pitch_joints
piston_top_ball_joints
butt_ball_joints
"""
start = rospy.get_rostime().to_sec()
"process the intermediary twelve wrenches"
# piston prismatic joints:: parent: cylinder, child: shaft
self.params["chain1"]["wrench_p"] = self.gather_wrench(p1_prism.wrench)
self.params["chain2"]["wrench_p"] = self.gather_wrench(p2_prism.wrench)
self.params["chain3"]["wrench_p"] = self.gather_wrench(p3_prism.wrench)
self.params["chain4"]["wrench_p"] = self.gather_wrench(p4_prism.wrench)
self.params["chain5"]["wrench_p"] = self.gather_wrench(p5_prism.wrench)
self.params["chain6"]["wrench_p"] = self.gather_wrench(p6_prism.wrench)
# piston_bottom_pitch_joints:: parent: butt ball link, child piston_cyl_link
self.params["chain1"]["wrench_piston_butt"] = self.gather_wrench(p1_prism_pitch.wrench)
self.params["chain2"]["wrench_piston_butt"] = self.gather_wrench(p2_prism_pitch.wrench)
self.params["chain3"]["wrench_piston_butt"] = self.gather_wrench(p3_prism_pitch.wrench)
self.params["chain4"]["wrench_piston_butt"] = self.gather_wrench(p4_prism_pitch.wrench)
self.params["chain5"]["wrench_piston_butt"] = self.gather_wrench(p5_prism_pitch.wrench)
self.params["chain6"]["wrench_piston_butt"] = self.gather_wrench(p6_prism_pitch.wrench)
"process the end twelve wrenches"
# piston_top_ball_joints:: parent top_ball_link, child: piston_shaft_link
self.params["chain1"]["wrench_s"] = self.gather_wrench(p1_top_ball.wrench)
self.params["chain2"]["wrench_s"] = self.gather_wrench(p2_top_ball.wrench)
self.params["chain3"]["wrench_s"] = self.gather_wrench(p3_top_ball.wrench)
self.params["chain4"]["wrench_s"] = self.gather_wrench(p4_top_ball.wrench)
self.params["chain5"]["wrench_s"] = self.gather_wrench(p5_top_ball.wrench)
self.params["chain6"]["wrench_s"] = self.gather_wrench(p6_top_ball.wrench)
# butt_ball_joints:: parent:: parent base_link, child butt_ball_link
self.params["chain1"]["wrench_u"] = self.gather_wrench(l1_butt_ball.wrench)
self.params["chain2"]["wrench_u"] = self.gather_wrench(l2_butt_ball.wrench)
self.params["chain3"]["wrench_u"] = self.gather_wrench(l3_butt_ball.wrench)
self.params["chain4"]["wrench_u"] = self.gather_wrench(l4_butt_ball.wrench)
self.params["chain5"]["wrench_u"] = self.gather_wrench(l5_butt_ball.wrench)
self.params["chain6"]["wrench_u"] = self.gather_wrench(l6_butt_ball.wrench)
# Calculate and store the transformations
self.params["chain1"]["Tu"] = self.transmit_mat((rf"butt_ball1_link", rf"piston1_cylinder_link"))
self.params["chain1"]["Tp"] = self.transmit_mat((rf"piston1_cylinder_link", rf"piston1_shaft_link"))
self.params["chain1"]["Tp"] *= self.transmit_mat((rf"piston1_shaft_link", rf"top_ball1_link"))
self.params["chain1"]["Ts"] = self.transmit_mat((rf"top_ball1_link", rf"platform_link"))
self.params["chain2"]["Tu"] = self.transmit_mat((rf"butt_ball2_link", rf"piston2_cylinder_link"))
self.params["chain2"]["Tp"] = self.transmit_mat((rf"piston2_cylinder_link", rf"piston2_shaft_link"))
self.params["chain2"]["Tp"] *= self.transmit_mat((rf"piston2_shaft_link", rf"top_ball2_link"))
self.params["chain2"]["Ts"] = self.transmit_mat((rf"top_ball2_link", rf"platform_link"))
self.params["chain3"]["Tu"] = self.transmit_mat((rf"butt_ball3_link", rf"piston3_cylinder_link"))
self.params["chain3"]["Tp"] = self.transmit_mat((rf"piston3_cylinder_link", rf"piston3_shaft_link"))
self.params["chain3"]["Tp"] *= self.transmit_mat((rf"piston3_shaft_link", rf"top_ball3_link"))
self.params["chain3"]["Ts"] = self.transmit_mat((rf"top_ball3_link", rf"platform_link"))
self.params["chain4"]["Tu"] = self.transmit_mat((rf"butt_ball4_link", rf"piston4_cylinder_link"))
self.params["chain4"]["Tp"] = self.transmit_mat((rf"piston4_cylinder_link", rf"piston4_shaft_link"))
self.params["chain4"]["Tp"] *= self.transmit_mat((rf"piston4_shaft_link", rf"top_ball4_link"))
self.params["chain4"]["Ts"] = self.transmit_mat((rf"top_ball4_link", rf"platform_link"))
self.params["chain5"]["Tu"] = self.transmit_mat((rf"butt_ball5_link", rf"piston5_cylinder_link"))
self.params["chain5"]["Tp"] = self.transmit_mat((rf"piston5_cylinder_link", rf"piston5_shaft_link"))
self.params["chain5"]["Tp"] *= self.transmit_mat((rf"piston5_shaft_link", rf"top_ball5_link"))
self.params["chain5"]["Ts"] = self.transmit_mat((rf"top_ball5_link", rf"platform_link"))
self.params["chain6"]["Tu"] = self.transmit_mat((rf"butt_ball6_link", rf"piston6_cylinder_link"))
self.params["chain6"]["Tp"] = self.transmit_mat((rf"piston6_cylinder_link", rf"piston6_shaft_link"))
self.params["chain6"]["Tp"] *= self.transmit_mat((rf"piston6_shaft_link", rf"top_ball6_link"))
self.params["chain6"]["Ts"] = self.transmit_mat((rf"top_ball6_link", rf"platform_link"))
# Get the regression matrices Φ𝑒 Φ𝑝 Φ𝑠
chain1_mat = self.regress_mat(self.params["chain1"], 1)
self.params["chain1"].update(chain1_mat)
chain2_mat = self.regress_mat(self.params["chain2"], 2)
self.params["chain2"].update(chain2_mat)
chain3_mat = self.regress_mat(self.params["chain3"], 3)
self.params["chain3"].update(chain3_mat)
chain4_mat = self.regress_mat(self.params["chain4"], 4)
self.params["chain4"].update(chain4_mat)
chain5_mat = self.regress_mat(self.params["chain5"], 5)
self.params["chain5"].update(chain5_mat)
chain6_mat = self.regress_mat(self.params["chain6"], 6)
self.params["chain6"].update(chain6_mat)
if self.data_collect_mode:
self.wrenches["chain1"]["wrench_s"] = np.append(self.wrenches["chain1"]["wrench_s"], self.params["chain1"]["wrench_s"], axis=1)
self.wrenches["chain1"]["wrench_u"] = np.append(self.wrenches["chain1"]["wrench_u"], self.params["chain1"]["wrench_u"], axis=1)
self.wrenches["chain1"]["wrench_p"] = np.append(self.wrenches["chain1"]["wrench_p"], self.params["chain1"]["wrench_p"], axis=1)
self.wrenches["chain1"]["wrench_piston_butt"] = np.append(self.wrenches["chain1"]["wrench_piston_butt"] , \
self.params["chain1"]["wrench_piston_butt"], axis=1)
# with open(os.path.join(data_dir, 'wrenches.pkl'), 'ab') as f:
# pickle.dump(self.wrenches, f)
if self.counter > 500:
print("Reached critical level")
np.savez(os.path.join(data_dir, 'wrenches.npz'), wrench_1s=self.wrenches["chain1"]["wrench_s"],\
wrench_1p=self.wrenches["chain1"]["wrench_p"], wrench_1u=self.wrenches["chain1"]["wrench_u"])
rospy.signal_shutdown("Shutting down node")
self.counter += 1
def numpize(self, vector_msg):
numpy_array = np.array([[vector_msg.x, vector_msg.y, vector_msg.z]], dtype=np.float64).T
return numpy_array
def gather_wrench(self, data, zero_force=False):
"""
For the passive joints, they only torque about their normals
so that the force generated is null. Use flag zero_force to
set these passive joints' forces to null.
"""
if zero_force:
force = np.zeros((3,1), dtype=np.float64)
else:
force=self.numpize(data.force)
torque=self.numpize(data.torque)
wrench = np.vstack((force, torque))
wrench = self.do_low_pass_filter(wrench)
return wrench
def do_low_pass_filter(self, wrench):
assert wrench.size == 6, "wrench to be filtered must be of size 6."
filt_param = 0.99
wrench = filt_param*wrench + (1.0-filt_param)*wrench
return wrench
def get_link_state(self, *msg):
rospy.wait_for_service('/gazebo/get_link_state')
try:
get_link = rospy.ServiceProxy('/gazebo/get_link_state', GetLinkState)
resp = get_link(msg[0][0], msg[0][1])
return GetLinkStateResponse(resp.link_state, resp.success, resp.status_message)
except rospy.ServiceException as e:
LOGGER.warning("Service call failed on: %s"%e)
def get_joint_info(self, msg):
rospy.wait_for_service('/gazebo/get_joint_properties')
try:
get_joint = rospy.ServiceProxy('/gazebo/get_joint_properties', GetJointProperties)
resp = get_joint(msg)
return GetJointPropertiesResponse(resp.type, resp.damping, resp.position, \
resp.rate, resp.success, resp.status_message)
except rospy.ServiceException as e:
LOGGER.warning("Service call failed on: %s"%e)
def process_link_state(self, resp):
if resp.success:
pos = resp.link_state.pose.position
quat = resp.link_state.pose.orientation
linvel = resp.link_state.twist.linear
angvel = resp.link_state.twist.angular
x = np.array(([[pos.x, pos.y, pos.z]])).T
rpy = euler_from_quaternion([quat.x, quat.y, quat.z, quat.w])
rpy = np.asarray(rpy).reshape(3,1)
xdot = np.array(([[linvel.x, linvel.y, linvel.z]])).T
omega = np.array(([[angvel.x, angvel.y, angvel.z]])).T
return x, rpy, xdot, omega
else:
LOGGER.warn("Message not received")
def process_joint_state(self, resp):
if resp.success:
return resp.rate
else:
LOGGER.warn("Message not received")
def skew_symmetric(self, vec):
assert vec.size==3, "vector for skew-symmetric matrix must be of size 3."
vector = np.array(([[ 0, -vec[2,0], vec[1,0]],
[vec[2,0], 0, -vec[0,0]],
[-vec[1,0], vec[0,0], 0],
]))
return vector
def regress_mat(self, chain, chain_num):
"""
Solve for \Phi from equation (9) in paper.
"""
"get πœ™π‘’ πœ™π‘ πœ™π‘ "
"get πœ™π‘’ first"
start = rospy.get_rostime().to_sec()
# angular velocity and acceleration of the u joint link: πœ” πœ”dot
_, πœ”, _, πœ”dot = self.process_link_state(self.get_link_state((rf"butt_ball{chain_num}_link", "base_link")))
skew_πœ” = self.skew_symmetric(πœ”)
skew_πœ”dot = self.skew_symmetric(πœ”dot)
dot_prod_πœ” = self.dot_prod_omega(πœ”)
dot_prod_πœ”dot = self.dot_prod_omega(πœ”dot)
# get ai_ddot from next link higher up
adot = self.process_joint_state(self.get_joint_info(rf"piston{chain_num}_prismatic_joint"))
Ξ” = rospy.get_rostime().to_sec() - start
Ξ” = Ξ” if Ξ” > 0.01 else 1
addot = (adot - chain["u_joint"]["adot_prev"])/Ξ”
chain["u_joint"]["a_ddot_prev"] = addot
r1 = np.concatenate((addot - self.gravity_vec, skew_πœ”dot+(skew_πœ”@skew_πœ”), np.zeros((3,self.freedoms)) ), axis=1)
r2 = np.concatenate(( np.zeros((3,1)), self.skew_symmetric(self.gravity_vec-addot), dot_prod_πœ”dot+skew_πœ”@dot_prod_πœ” ), axis=1)
# print(r1.shape, r2.shape)
πœ™π‘’ = np.concatenate((r1, r2), axis=0)
"calculate πœ™π‘"
start = rospy.get_rostime().to_sec()
# angular velocity and acceleration of the p joint link: πœ” πœ”dot
_, πœ”, _, πœ”dot = self.process_link_state(self.get_link_state((rf"piston{chain_num}_cylinder_link", "base_link")))
skew_πœ” = self.skew_symmetric(πœ”)
skew_πœ”dot = self.skew_symmetric(πœ”dot)
dot_prod_πœ” = self.dot_prod_omega(πœ”)
dot_prod_πœ”dot = self.dot_prod_omega(πœ”dot)
# get ai_ddot from next link higher up
adot = self.process_joint_state(self.get_joint_info(rf"piston{chain_num}_top_ball_joint"))
Ξ” = rospy.get_rostime().to_sec() - start
Ξ” = Ξ” if Ξ” > 0.01 else 1
addot = (adot - chain["p_joint"]["adot_prev"])/Ξ”
chain["p_joint"]["a_ddot_prev"] = addot
r1 = np.concatenate((addot - self.gravity_vec, skew_πœ”dot+(skew_πœ”@skew_πœ”), np.zeros((3,self.freedoms))), axis=1)
r2 = np.concatenate((np.zeros((3,1)), self.skew_symmetric(self.gravity_vec-addot), dot_prod_πœ”dot+skew_πœ”@dot_prod_πœ”), axis=1)
πœ™π‘ = np.concatenate((r1, r2), axis=0)
"calculate πœ™π‘ "
start = rospy.get_rostime().to_sec()
# angular velocity and acceleration of the s joint link: πœ” πœ”dot
_, πœ”, _, πœ”dot = self.process_link_state(self.get_link_state((rf"top_ball{chain_num}_link", "base_link")))
skew_πœ” = self.skew_symmetric(πœ”)
skew_πœ”dot = self.skew_symmetric(πœ”dot)
dot_prod_πœ” = self.dot_prod_omega(πœ”)
dot_prod_πœ”dot = self.dot_prod_omega(πœ”dot)
# get ai_ddot from next link higher up
adot = self.process_joint_state(self.get_joint_info(rf"top_ball{chain_num}_joint"))
Ξ” = rospy.get_rostime().to_sec() - start
Ξ” = Ξ” if Ξ” > 0.01 else 1
addot = (adot - chain["s_joint"]["adot_prev"])/Ξ”
chain["s_joint"]["a_ddot_prev"] = addot
r1 = np.concatenate((addot - self.gravity_vec, skew_πœ”dot+(skew_πœ”@skew_πœ”), np.zeros((3,self.freedoms))), axis=1)
r2 = np.concatenate((np.zeros((3,1)), self.skew_symmetric(self.gravity_vec-addot), dot_prod_πœ”dot+skew_πœ”@dot_prod_πœ”), axis=1)
πœ™π‘  = np.concatenate((r1, r2), axis=0)
" stack the wrench vector for this chain:"
chain["wrench_vec"] = np.concatenate((chain["wrench_u"], chain["wrench_p"], chain["wrench_s"]), axis=0)
"stack the regression matrix for this chain"
zeros6_10 = np.zeros((6,10),dtype=np.float64)
r1 = np.concatenate((chain["Tu"]@πœ™π‘’, chain["Tu"]@chain["Tp"]@πœ™π‘, chain["Tu"]@chain["Tp"]@chain["Ts"]@πœ™π‘ ), axis=1)
r2 = np.concatenate((zeros6_10, chain["Tp"]@πœ™π‘, chain["Tp"]@chain["Ts"]@πœ™π‘ ), axis=1)
r3 = np.concatenate((zeros6_10, zeros6_10, chain["Ts"]@πœ™π‘ ), axis=1)
Ξ¦ = np.concatenate(( r1, r2, r3), axis=0)
# update regression matrix
"chain for results storage"
chain["Ξ¦_mat"] = Ξ¦
return chain
def transmit_mat(self, msg):
"""
x: linear position vector, shape -- 3 x 1
rpy: angular position vector, shape - 3
For quarternion vector multiplication see: http://docs.ros.org/en/melodic/api/tf2/html/Quaternion_8h_source.html#l00373
return Quaternion( q.w() * w.x() + q.y() * w.z() - q.z() * w.y(),
q.w() * w.y() + q.z() * w.x() - q.x() * w.z(),
q.w() * w.z() + q.x() * w.y() - q.y() * w.x(),
-q.x() * w.x() - q.y() * w.y() - q.z() * w.z());
"""
resp = self.get_link_state(msg)
x, rpy, xdot, omega = self.process_link_state(resp)
# print("rpy: ", rpy)
assert x.size == 3, "position vector must be of size 3"
assert rpy.size == 3, "angular vector must be of size 3"
x_skew = self.skew_symmetric(x)
rot = self.rotation_from_rpy(rpy.squeeze())
T1 = np.hstack((rot, np.zeros((3,3))))
T2 = np.hstack((x_skew@rot, rot))
T = np.vstack((T1, T2))
return T
def decompose_params(self, chain):
if not "Θ" in chain.keys():
# batch ident
Θu = chain["ΘPrev"][:10]
Θp = chain["ΘPrev"][10:20]
Θs = chain["ΘPrev"][20:]
else:
Θu = chain["Θ"][:10]
Θp = chain["Θ"][10:20]
Θs = chain["Θ"][20:]
mass_u = Θu[0,0]; mass_p = Θp[0,0]; mass_s = Θs[0,0];
"calculate the coms in the body frame"
com_u = Θu[1:4]/mass_u; com_p = Θp[1:4]/mass_p; com_s = Θs[1:4]/mass_s;
"I^wedge in top frame"
Iwedge_u = Θu[4:]; Iwedge_p = Θp[4:]; Iwedge_s = Θs[4:];
"calculate the inertials in the body frame"
I_u = self.full_inertia(Iwedge_u) - mass_u*((com_u.T@com_u)*np.eye(3) - com_u@com_u.T);
I_p = self.full_inertia(Iwedge_p) - mass_p*((com_p.T@com_p)*np.eye(3) - com_p@com_p.T);
I_s = self.full_inertia(Iwedge_s) - mass_s*((com_s.T@com_s)*np.eye(3) - com_s@com_s.T);
return dict(I_u=I_u, I_p=I_p, I_s=I_s,\
m_u=mass_u, m_p=mass_p, m_s=mass_s,\
com_u=com_u,com_p=com_p,com_s=com_s)
def rotation_from_rpy(self, ang):
assert ang.size==3, "rpy vector must be of size 3."
row1 = [np.cos(ang[2])*np.cos(ang[1]), -np.sin(ang[2])*np.cos(ang[0]) + np.cos(ang[2])*np.sin(ang[1])*np.sin(ang[0]), np.sin(ang[2])*np.sin(ang[0])+np.cos(ang[2])*np.sin(ang[1])*np.cos(ang[0])]
row2 = [np.sin(ang[2])*np.cos(ang[1]), np.cos(ang[2])*np.cos(ang[0]) + np.sin(ang[2])*np.sin(ang[1])*np.sin(ang[0]), -np.cos(ang[2])*np.sin(ang[0])+np.sin(ang[2])*np.sin(ang[1])*np.cos(ang[0])],
row3 = [-np.sin(ang[1]),np.cos(ang[1])*np.sin(ang[0]),np.cos(ang[1])*np.cos(ang[2])]
rot_mat = np.vstack(([row1, row2, row3]))
return rot_mat
def dot_prod_omega(self, vec):
assert vec.size==3, "vector for skew-symmetric matrix must be of size 3."
vector = np.array(([[ vec[0,0], vec[1,0], vec[2,0], 0, 0, 0],
[0, vec[0,0], 0, vec[1,0], vec[2,0], 0],
[0, 0, vec[0,0], 0, vec[1,0], vec[2,0]],
]))
return vector
def full_inertia(self, I):
assert I.size==self.freedoms, "inertia vector must be 6X1"
inertial_mat = np.array([I[0], I[3], I[-1]]) * np.eye(3)
inertial_mat[0,1:] = [I[1], I[2]]
inertial_mat[1:,0] = [I[1], I[2]]
inertial_mat[2,1] = I[-2]
inertial_mat[1,2] = I[-2]
return inertial_mat
# if __name__ == "__main__":
#
# rospy.init_node("estimator_node", anonymous=True, disable_signals=True)
# subreg = SubscribeRegress(30, verbose=True)
#
# # collect some wrenches and 0plot
# try:
# while not rospy.is_shutdown():
# subreg.subscribers()
# except rospy.ROSInterruptException:
# rospy.logfatal("shutting down ros")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment