Created
July 19, 2023 20:34
-
-
Save robotsorcerer/13df022dcc2099ff86e894d0668cac4f to your computer and use it in GitHub Desktop.
Msg Filters Subscribers in ros 2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
__all__ = ["SubscribeRegress"] | |
import os | |
import sys | |
import time | |
import rospy | |
import logging | |
import pickle | |
import rospkg | |
import threading | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import matplotlib.gridspec as gridspec | |
from message_filters import TimeSynchronizer, Subscriber, ApproximateTimeSynchronizer | |
from geometry_msgs.msg import WrenchStamped | |
from rospy.rostime import Duration | |
from gazebo_msgs.srv import GetLinkState, GetLinkStateResponse | |
from gazebo_msgs.srv import GetJointProperties, GetJointPropertiesResponse | |
from tf.transformations import euler_from_quaternion | |
sys.path.append( os.path.dirname( os.path.dirname( os.path.abspath(__file__) ) ) ) | |
from scripts.wrench_viz import WrenchVisualizer | |
rospack = rospkg.RosPack() | |
data_dir = rospack.get_path('param_est') + '/data' | |
if not os.path.exists(data_dir): | |
os.makedirs(data_dir) | |
LOGGER = logging.getLogger(__name__) | |
class SubscribeRegress(): | |
""" | |
Calculates the regression paramters for | |
equation (4) in the paper. | |
""" | |
def __init__(self, rate, verbose=False, data_collect=False, \ | |
freedoms=6, do_plot=True): | |
super(SubscribeRegress, self).__init__() | |
self.verbose = verbose | |
self.base_name = "stewart" | |
zeros3 = np.zeros((3,1)) | |
joint = dict(omega_prev=zeros3,adot_prev=zeros3) | |
self.params = { | |
"chain1": dict(u_joint=joint,p_joint=joint,s_joint=joint), | |
"chain2": dict(u_joint=joint,p_joint=joint,s_joint=joint), | |
"chain3": dict(u_joint=joint,p_joint=joint,s_joint=joint), | |
"chain4": dict(u_joint=joint,p_joint=joint,s_joint=joint), | |
"chain5": dict(u_joint=joint,p_joint=joint,s_joint=joint), | |
"chain6": dict(u_joint=joint,p_joint=joint,s_joint=joint), | |
} | |
self.gravity_vec = np.array(([[0, 0, -9.8]])).T | |
self.freedoms = freedoms # total degree of freedoms for the robot | |
if do_plot: | |
fig = plt.figure(figsize=(16, 9)) | |
gs = gridspec.GridSpec(1, 1, figure=fig) | |
plt.ion() | |
self.viz = WrenchVisualizer(fig, gs[0], pause_time=1e-4, labels=['Wrench: force, torque'] | |
_fontdict=fontdict) | |
# for data collection | |
self.data_collect_mode = data_collect | |
if self.data_collect_mode: | |
joints = {"wrench_s": np.zeros((6,1)), "wrench_u":np.zeros((6,1)), "wrench_p": np.zeros((6,1)), | |
"wrench_piston_butt": np.zeros((6,1))} | |
self.wrenches = {f"chain{i+1}":joints for i in range(6)} | |
self.data_collect_mode = data_collect | |
self.counter = 0 | |
def subscribers(self): | |
""" | |
Thread this loop from a calling Class's member function | |
""" | |
"Intermediary links" | |
# piston prismatic joints parent:: piston_cylinder, child: shaft | |
prismatic_sub_1 = Subscriber(rf"/{self.base_name}/piston1_prismatic_wrench", WrenchStamped) | |
prismatic_sub_2 = Subscriber(rf"/{self.base_name}/piston2_prismatic_wrench", WrenchStamped) | |
prismatic_sub_3 = Subscriber(rf"/{self.base_name}/piston3_prismatic_wrench", WrenchStamped) | |
prismatic_sub_4 = Subscriber(rf"/{self.base_name}/piston4_prismatic_wrench", WrenchStamped) | |
prismatic_sub_5 = Subscriber(rf"/{self.base_name}/piston5_prismatic_wrench", WrenchStamped) | |
prismatic_sub_6 = Subscriber(rf"/{self.base_name}/piston6_prismatic_wrench", WrenchStamped) | |
# piston_bottom_pitch_joints:: parent: butt ball link, child piston_cyl_link | |
prismatic_pitch_sub_1 = Subscriber(rf"/{self.base_name}/piston1_butt_pitch_wrench", WrenchStamped) | |
prismatic_pitch_sub_2 = Subscriber(rf"/{self.base_name}/piston2_butt_pitch_wrench", WrenchStamped) | |
prismatic_pitch_sub_3 = Subscriber(rf"/{self.base_name}/piston3_butt_pitch_wrench", WrenchStamped) | |
prismatic_pitch_sub_4 = Subscriber(rf"/{self.base_name}/piston4_butt_pitch_wrench", WrenchStamped) | |
prismatic_pitch_sub_5 = Subscriber(rf"/{self.base_name}/piston5_butt_pitch_wrench", WrenchStamped) | |
prismatic_pitch_sub_6 = Subscriber(rf"/{self.base_name}/piston6_butt_pitch_wrench", WrenchStamped) | |
"End links" | |
# piston_top_ball_joints:: parent top_ball_link, child: piston_shaft_link | |
top_ball_sub_1 = Subscriber(rf"/{self.base_name}/piston1_top_ball_wrench", WrenchStamped) | |
top_ball_sub_2 = Subscriber(rf"/{self.base_name}/piston2_top_ball_wrench", WrenchStamped) | |
top_ball_sub_3 = Subscriber(rf"/{self.base_name}/piston3_top_ball_wrench", WrenchStamped) | |
top_ball_sub_4 = Subscriber(rf"/{self.base_name}/piston4_top_ball_wrench", WrenchStamped) | |
top_ball_sub_5 = Subscriber(rf"/{self.base_name}/piston5_top_ball_wrench", WrenchStamped) | |
top_ball_sub_6 = Subscriber(rf"/{self.base_name}/piston6_top_ball_wrench", WrenchStamped) | |
# butt_ball_joints:: parent:: parent base_link, child butt_ball_link | |
butt_ball_sub_1 = Subscriber(rf"/{self.base_name}/butt_ball1_wrench", WrenchStamped) | |
butt_ball_sub_2 = Subscriber(rf"/{self.base_name}/butt_ball2_wrench", WrenchStamped) | |
butt_ball_sub_3 = Subscriber(rf"/{self.base_name}/butt_ball3_wrench", WrenchStamped) | |
butt_ball_sub_4 = Subscriber(rf"/{self.base_name}/butt_ball4_wrench", WrenchStamped) | |
butt_ball_sub_5 = Subscriber(rf"/{self.base_name}/butt_ball5_wrench", WrenchStamped) | |
butt_ball_sub_6 = Subscriber(rf"/{self.base_name}/butt_ball6_wrench", WrenchStamped) | |
# end joints: top S-joint and butt U-joint | |
self.approx_ts = ApproximateTimeSynchronizer( | |
[prismatic_sub_1, prismatic_sub_2, \ | |
prismatic_sub_3, prismatic_sub_4, \ | |
prismatic_sub_5, prismatic_sub_6,\ | |
prismatic_pitch_sub_1, prismatic_pitch_sub_2, \ | |
prismatic_pitch_sub_3, prismatic_pitch_sub_4, \ | |
prismatic_pitch_sub_5, prismatic_pitch_sub_6, \ | |
top_ball_sub_1, top_ball_sub_2, top_ball_sub_3, \ | |
top_ball_sub_4, top_ball_sub_5, top_ball_sub_6, \ | |
butt_ball_sub_1, butt_ball_sub_2, \ | |
butt_ball_sub_3, butt_ball_sub_4, \ | |
butt_ball_sub_5, butt_ball_sub_6, | |
], 10, slop=0.51) | |
self.approx_ts.registerCallback(self.cb) | |
rospy.spin() | |
def cb(self, p1_prism, p2_prism, p3_prism, \ | |
p4_prism, p5_prism, p6_prism, \ | |
p1_prism_pitch, p2_prism_pitch, p3_prism_pitch,\ | |
p4_prism_pitch, p5_prism_pitch, p6_prism_pitch, \ | |
p1_top_ball, p2_top_ball, p3_top_ball,\ | |
p4_top_ball, p5_top_ball, p6_top_ball, \ | |
l1_butt_ball, l2_butt_ball, l3_butt_ball, \ | |
l4_butt_ball, l5_butt_ball, l6_butt_ball): | |
""" | |
In order: piston prismatic joints | |
piston_bottom_pitch_joints | |
piston_top_ball_joints | |
butt_ball_joints | |
""" | |
start = rospy.get_rostime().to_sec() | |
"process the intermediary twelve wrenches" | |
# piston prismatic joints:: parent: cylinder, child: shaft | |
self.params["chain1"]["wrench_p"] = self.gather_wrench(p1_prism.wrench) | |
self.params["chain2"]["wrench_p"] = self.gather_wrench(p2_prism.wrench) | |
self.params["chain3"]["wrench_p"] = self.gather_wrench(p3_prism.wrench) | |
self.params["chain4"]["wrench_p"] = self.gather_wrench(p4_prism.wrench) | |
self.params["chain5"]["wrench_p"] = self.gather_wrench(p5_prism.wrench) | |
self.params["chain6"]["wrench_p"] = self.gather_wrench(p6_prism.wrench) | |
# piston_bottom_pitch_joints:: parent: butt ball link, child piston_cyl_link | |
self.params["chain1"]["wrench_piston_butt"] = self.gather_wrench(p1_prism_pitch.wrench) | |
self.params["chain2"]["wrench_piston_butt"] = self.gather_wrench(p2_prism_pitch.wrench) | |
self.params["chain3"]["wrench_piston_butt"] = self.gather_wrench(p3_prism_pitch.wrench) | |
self.params["chain4"]["wrench_piston_butt"] = self.gather_wrench(p4_prism_pitch.wrench) | |
self.params["chain5"]["wrench_piston_butt"] = self.gather_wrench(p5_prism_pitch.wrench) | |
self.params["chain6"]["wrench_piston_butt"] = self.gather_wrench(p6_prism_pitch.wrench) | |
"process the end twelve wrenches" | |
# piston_top_ball_joints:: parent top_ball_link, child: piston_shaft_link | |
self.params["chain1"]["wrench_s"] = self.gather_wrench(p1_top_ball.wrench) | |
self.params["chain2"]["wrench_s"] = self.gather_wrench(p2_top_ball.wrench) | |
self.params["chain3"]["wrench_s"] = self.gather_wrench(p3_top_ball.wrench) | |
self.params["chain4"]["wrench_s"] = self.gather_wrench(p4_top_ball.wrench) | |
self.params["chain5"]["wrench_s"] = self.gather_wrench(p5_top_ball.wrench) | |
self.params["chain6"]["wrench_s"] = self.gather_wrench(p6_top_ball.wrench) | |
# butt_ball_joints:: parent:: parent base_link, child butt_ball_link | |
self.params["chain1"]["wrench_u"] = self.gather_wrench(l1_butt_ball.wrench) | |
self.params["chain2"]["wrench_u"] = self.gather_wrench(l2_butt_ball.wrench) | |
self.params["chain3"]["wrench_u"] = self.gather_wrench(l3_butt_ball.wrench) | |
self.params["chain4"]["wrench_u"] = self.gather_wrench(l4_butt_ball.wrench) | |
self.params["chain5"]["wrench_u"] = self.gather_wrench(l5_butt_ball.wrench) | |
self.params["chain6"]["wrench_u"] = self.gather_wrench(l6_butt_ball.wrench) | |
# Calculate and store the transformations | |
self.params["chain1"]["Tu"] = self.transmit_mat((rf"butt_ball1_link", rf"piston1_cylinder_link")) | |
self.params["chain1"]["Tp"] = self.transmit_mat((rf"piston1_cylinder_link", rf"piston1_shaft_link")) | |
self.params["chain1"]["Tp"] *= self.transmit_mat((rf"piston1_shaft_link", rf"top_ball1_link")) | |
self.params["chain1"]["Ts"] = self.transmit_mat((rf"top_ball1_link", rf"platform_link")) | |
self.params["chain2"]["Tu"] = self.transmit_mat((rf"butt_ball2_link", rf"piston2_cylinder_link")) | |
self.params["chain2"]["Tp"] = self.transmit_mat((rf"piston2_cylinder_link", rf"piston2_shaft_link")) | |
self.params["chain2"]["Tp"] *= self.transmit_mat((rf"piston2_shaft_link", rf"top_ball2_link")) | |
self.params["chain2"]["Ts"] = self.transmit_mat((rf"top_ball2_link", rf"platform_link")) | |
self.params["chain3"]["Tu"] = self.transmit_mat((rf"butt_ball3_link", rf"piston3_cylinder_link")) | |
self.params["chain3"]["Tp"] = self.transmit_mat((rf"piston3_cylinder_link", rf"piston3_shaft_link")) | |
self.params["chain3"]["Tp"] *= self.transmit_mat((rf"piston3_shaft_link", rf"top_ball3_link")) | |
self.params["chain3"]["Ts"] = self.transmit_mat((rf"top_ball3_link", rf"platform_link")) | |
self.params["chain4"]["Tu"] = self.transmit_mat((rf"butt_ball4_link", rf"piston4_cylinder_link")) | |
self.params["chain4"]["Tp"] = self.transmit_mat((rf"piston4_cylinder_link", rf"piston4_shaft_link")) | |
self.params["chain4"]["Tp"] *= self.transmit_mat((rf"piston4_shaft_link", rf"top_ball4_link")) | |
self.params["chain4"]["Ts"] = self.transmit_mat((rf"top_ball4_link", rf"platform_link")) | |
self.params["chain5"]["Tu"] = self.transmit_mat((rf"butt_ball5_link", rf"piston5_cylinder_link")) | |
self.params["chain5"]["Tp"] = self.transmit_mat((rf"piston5_cylinder_link", rf"piston5_shaft_link")) | |
self.params["chain5"]["Tp"] *= self.transmit_mat((rf"piston5_shaft_link", rf"top_ball5_link")) | |
self.params["chain5"]["Ts"] = self.transmit_mat((rf"top_ball5_link", rf"platform_link")) | |
self.params["chain6"]["Tu"] = self.transmit_mat((rf"butt_ball6_link", rf"piston6_cylinder_link")) | |
self.params["chain6"]["Tp"] = self.transmit_mat((rf"piston6_cylinder_link", rf"piston6_shaft_link")) | |
self.params["chain6"]["Tp"] *= self.transmit_mat((rf"piston6_shaft_link", rf"top_ball6_link")) | |
self.params["chain6"]["Ts"] = self.transmit_mat((rf"top_ball6_link", rf"platform_link")) | |
# Get the regression matrices Ξ¦π’ Ξ¦π Ξ¦π | |
chain1_mat = self.regress_mat(self.params["chain1"], 1) | |
self.params["chain1"].update(chain1_mat) | |
chain2_mat = self.regress_mat(self.params["chain2"], 2) | |
self.params["chain2"].update(chain2_mat) | |
chain3_mat = self.regress_mat(self.params["chain3"], 3) | |
self.params["chain3"].update(chain3_mat) | |
chain4_mat = self.regress_mat(self.params["chain4"], 4) | |
self.params["chain4"].update(chain4_mat) | |
chain5_mat = self.regress_mat(self.params["chain5"], 5) | |
self.params["chain5"].update(chain5_mat) | |
chain6_mat = self.regress_mat(self.params["chain6"], 6) | |
self.params["chain6"].update(chain6_mat) | |
if self.data_collect_mode: | |
self.wrenches["chain1"]["wrench_s"] = np.append(self.wrenches["chain1"]["wrench_s"], self.params["chain1"]["wrench_s"], axis=1) | |
self.wrenches["chain1"]["wrench_u"] = np.append(self.wrenches["chain1"]["wrench_u"], self.params["chain1"]["wrench_u"], axis=1) | |
self.wrenches["chain1"]["wrench_p"] = np.append(self.wrenches["chain1"]["wrench_p"], self.params["chain1"]["wrench_p"], axis=1) | |
self.wrenches["chain1"]["wrench_piston_butt"] = np.append(self.wrenches["chain1"]["wrench_piston_butt"] , \ | |
self.params["chain1"]["wrench_piston_butt"], axis=1) | |
# with open(os.path.join(data_dir, 'wrenches.pkl'), 'ab') as f: | |
# pickle.dump(self.wrenches, f) | |
if self.counter > 500: | |
print("Reached critical level") | |
np.savez(os.path.join(data_dir, 'wrenches.npz'), wrench_1s=self.wrenches["chain1"]["wrench_s"],\ | |
wrench_1p=self.wrenches["chain1"]["wrench_p"], wrench_1u=self.wrenches["chain1"]["wrench_u"]) | |
rospy.signal_shutdown("Shutting down node") | |
self.counter += 1 | |
def numpize(self, vector_msg): | |
numpy_array = np.array([[vector_msg.x, vector_msg.y, vector_msg.z]], dtype=np.float64).T | |
return numpy_array | |
def gather_wrench(self, data, zero_force=False): | |
""" | |
For the passive joints, they only torque about their normals | |
so that the force generated is null. Use flag zero_force to | |
set these passive joints' forces to null. | |
""" | |
if zero_force: | |
force = np.zeros((3,1), dtype=np.float64) | |
else: | |
force=self.numpize(data.force) | |
torque=self.numpize(data.torque) | |
wrench = np.vstack((force, torque)) | |
wrench = self.do_low_pass_filter(wrench) | |
return wrench | |
def do_low_pass_filter(self, wrench): | |
assert wrench.size == 6, "wrench to be filtered must be of size 6." | |
filt_param = 0.99 | |
wrench = filt_param*wrench + (1.0-filt_param)*wrench | |
return wrench | |
def get_link_state(self, *msg): | |
rospy.wait_for_service('/gazebo/get_link_state') | |
try: | |
get_link = rospy.ServiceProxy('/gazebo/get_link_state', GetLinkState) | |
resp = get_link(msg[0][0], msg[0][1]) | |
return GetLinkStateResponse(resp.link_state, resp.success, resp.status_message) | |
except rospy.ServiceException as e: | |
LOGGER.warning("Service call failed on: %s"%e) | |
def get_joint_info(self, msg): | |
rospy.wait_for_service('/gazebo/get_joint_properties') | |
try: | |
get_joint = rospy.ServiceProxy('/gazebo/get_joint_properties', GetJointProperties) | |
resp = get_joint(msg) | |
return GetJointPropertiesResponse(resp.type, resp.damping, resp.position, \ | |
resp.rate, resp.success, resp.status_message) | |
except rospy.ServiceException as e: | |
LOGGER.warning("Service call failed on: %s"%e) | |
def process_link_state(self, resp): | |
if resp.success: | |
pos = resp.link_state.pose.position | |
quat = resp.link_state.pose.orientation | |
linvel = resp.link_state.twist.linear | |
angvel = resp.link_state.twist.angular | |
x = np.array(([[pos.x, pos.y, pos.z]])).T | |
rpy = euler_from_quaternion([quat.x, quat.y, quat.z, quat.w]) | |
rpy = np.asarray(rpy).reshape(3,1) | |
xdot = np.array(([[linvel.x, linvel.y, linvel.z]])).T | |
omega = np.array(([[angvel.x, angvel.y, angvel.z]])).T | |
return x, rpy, xdot, omega | |
else: | |
LOGGER.warn("Message not received") | |
def process_joint_state(self, resp): | |
if resp.success: | |
return resp.rate | |
else: | |
LOGGER.warn("Message not received") | |
def skew_symmetric(self, vec): | |
assert vec.size==3, "vector for skew-symmetric matrix must be of size 3." | |
vector = np.array(([[ 0, -vec[2,0], vec[1,0]], | |
[vec[2,0], 0, -vec[0,0]], | |
[-vec[1,0], vec[0,0], 0], | |
])) | |
return vector | |
def regress_mat(self, chain, chain_num): | |
""" | |
Solve for \Phi from equation (9) in paper. | |
""" | |
"get ππ’ ππ ππ " | |
"get ππ’ first" | |
start = rospy.get_rostime().to_sec() | |
# angular velocity and acceleration of the u joint link: π πdot | |
_, π, _, πdot = self.process_link_state(self.get_link_state((rf"butt_ball{chain_num}_link", "base_link"))) | |
skew_π = self.skew_symmetric(π) | |
skew_πdot = self.skew_symmetric(πdot) | |
dot_prod_π = self.dot_prod_omega(π) | |
dot_prod_πdot = self.dot_prod_omega(πdot) | |
# get ai_ddot from next link higher up | |
adot = self.process_joint_state(self.get_joint_info(rf"piston{chain_num}_prismatic_joint")) | |
Ξ = rospy.get_rostime().to_sec() - start | |
Ξ = Ξ if Ξ > 0.01 else 1 | |
addot = (adot - chain["u_joint"]["adot_prev"])/Ξ | |
chain["u_joint"]["a_ddot_prev"] = addot | |
r1 = np.concatenate((addot - self.gravity_vec, skew_πdot+(skew_π@skew_π), np.zeros((3,self.freedoms)) ), axis=1) | |
r2 = np.concatenate(( np.zeros((3,1)), self.skew_symmetric(self.gravity_vec-addot), dot_prod_πdot+skew_π@dot_prod_π ), axis=1) | |
# print(r1.shape, r2.shape) | |
ππ’ = np.concatenate((r1, r2), axis=0) | |
"calculate ππ" | |
start = rospy.get_rostime().to_sec() | |
# angular velocity and acceleration of the p joint link: π πdot | |
_, π, _, πdot = self.process_link_state(self.get_link_state((rf"piston{chain_num}_cylinder_link", "base_link"))) | |
skew_π = self.skew_symmetric(π) | |
skew_πdot = self.skew_symmetric(πdot) | |
dot_prod_π = self.dot_prod_omega(π) | |
dot_prod_πdot = self.dot_prod_omega(πdot) | |
# get ai_ddot from next link higher up | |
adot = self.process_joint_state(self.get_joint_info(rf"piston{chain_num}_top_ball_joint")) | |
Ξ = rospy.get_rostime().to_sec() - start | |
Ξ = Ξ if Ξ > 0.01 else 1 | |
addot = (adot - chain["p_joint"]["adot_prev"])/Ξ | |
chain["p_joint"]["a_ddot_prev"] = addot | |
r1 = np.concatenate((addot - self.gravity_vec, skew_πdot+(skew_π@skew_π), np.zeros((3,self.freedoms))), axis=1) | |
r2 = np.concatenate((np.zeros((3,1)), self.skew_symmetric(self.gravity_vec-addot), dot_prod_πdot+skew_π@dot_prod_π), axis=1) | |
ππ = np.concatenate((r1, r2), axis=0) | |
"calculate ππ " | |
start = rospy.get_rostime().to_sec() | |
# angular velocity and acceleration of the s joint link: π πdot | |
_, π, _, πdot = self.process_link_state(self.get_link_state((rf"top_ball{chain_num}_link", "base_link"))) | |
skew_π = self.skew_symmetric(π) | |
skew_πdot = self.skew_symmetric(πdot) | |
dot_prod_π = self.dot_prod_omega(π) | |
dot_prod_πdot = self.dot_prod_omega(πdot) | |
# get ai_ddot from next link higher up | |
adot = self.process_joint_state(self.get_joint_info(rf"top_ball{chain_num}_joint")) | |
Ξ = rospy.get_rostime().to_sec() - start | |
Ξ = Ξ if Ξ > 0.01 else 1 | |
addot = (adot - chain["s_joint"]["adot_prev"])/Ξ | |
chain["s_joint"]["a_ddot_prev"] = addot | |
r1 = np.concatenate((addot - self.gravity_vec, skew_πdot+(skew_π@skew_π), np.zeros((3,self.freedoms))), axis=1) | |
r2 = np.concatenate((np.zeros((3,1)), self.skew_symmetric(self.gravity_vec-addot), dot_prod_πdot+skew_π@dot_prod_π), axis=1) | |
ππ = np.concatenate((r1, r2), axis=0) | |
" stack the wrench vector for this chain:" | |
chain["wrench_vec"] = np.concatenate((chain["wrench_u"], chain["wrench_p"], chain["wrench_s"]), axis=0) | |
"stack the regression matrix for this chain" | |
zeros6_10 = np.zeros((6,10),dtype=np.float64) | |
r1 = np.concatenate((chain["Tu"]@ππ’, chain["Tu"]@chain["Tp"]@ππ, chain["Tu"]@chain["Tp"]@chain["Ts"]@ππ ), axis=1) | |
r2 = np.concatenate((zeros6_10, chain["Tp"]@ππ, chain["Tp"]@chain["Ts"]@ππ ), axis=1) | |
r3 = np.concatenate((zeros6_10, zeros6_10, chain["Ts"]@ππ ), axis=1) | |
Ξ¦ = np.concatenate(( r1, r2, r3), axis=0) | |
# update regression matrix | |
"chain for results storage" | |
chain["Ξ¦_mat"] = Ξ¦ | |
return chain | |
def transmit_mat(self, msg): | |
""" | |
x: linear position vector, shape -- 3 x 1 | |
rpy: angular position vector, shape - 3 | |
For quarternion vector multiplication see: http://docs.ros.org/en/melodic/api/tf2/html/Quaternion_8h_source.html#l00373 | |
return Quaternion( q.w() * w.x() + q.y() * w.z() - q.z() * w.y(), | |
q.w() * w.y() + q.z() * w.x() - q.x() * w.z(), | |
q.w() * w.z() + q.x() * w.y() - q.y() * w.x(), | |
-q.x() * w.x() - q.y() * w.y() - q.z() * w.z()); | |
""" | |
resp = self.get_link_state(msg) | |
x, rpy, xdot, omega = self.process_link_state(resp) | |
# print("rpy: ", rpy) | |
assert x.size == 3, "position vector must be of size 3" | |
assert rpy.size == 3, "angular vector must be of size 3" | |
x_skew = self.skew_symmetric(x) | |
rot = self.rotation_from_rpy(rpy.squeeze()) | |
T1 = np.hstack((rot, np.zeros((3,3)))) | |
T2 = np.hstack((x_skew@rot, rot)) | |
T = np.vstack((T1, T2)) | |
return T | |
def decompose_params(self, chain): | |
if not "Ξ" in chain.keys(): | |
# batch ident | |
Ξu = chain["ΞPrev"][:10] | |
Ξp = chain["ΞPrev"][10:20] | |
Ξs = chain["ΞPrev"][20:] | |
else: | |
Ξu = chain["Ξ"][:10] | |
Ξp = chain["Ξ"][10:20] | |
Ξs = chain["Ξ"][20:] | |
mass_u = Ξu[0,0]; mass_p = Ξp[0,0]; mass_s = Ξs[0,0]; | |
"calculate the coms in the body frame" | |
com_u = Ξu[1:4]/mass_u; com_p = Ξp[1:4]/mass_p; com_s = Ξs[1:4]/mass_s; | |
"I^wedge in top frame" | |
Iwedge_u = Ξu[4:]; Iwedge_p = Ξp[4:]; Iwedge_s = Ξs[4:]; | |
"calculate the inertials in the body frame" | |
I_u = self.full_inertia(Iwedge_u) - mass_u*((com_u.T@com_u)*np.eye(3) - com_u@com_u.T); | |
I_p = self.full_inertia(Iwedge_p) - mass_p*((com_p.T@com_p)*np.eye(3) - com_p@com_p.T); | |
I_s = self.full_inertia(Iwedge_s) - mass_s*((com_s.T@com_s)*np.eye(3) - com_s@com_s.T); | |
return dict(I_u=I_u, I_p=I_p, I_s=I_s,\ | |
m_u=mass_u, m_p=mass_p, m_s=mass_s,\ | |
com_u=com_u,com_p=com_p,com_s=com_s) | |
def rotation_from_rpy(self, ang): | |
assert ang.size==3, "rpy vector must be of size 3." | |
row1 = [np.cos(ang[2])*np.cos(ang[1]), -np.sin(ang[2])*np.cos(ang[0]) + np.cos(ang[2])*np.sin(ang[1])*np.sin(ang[0]), np.sin(ang[2])*np.sin(ang[0])+np.cos(ang[2])*np.sin(ang[1])*np.cos(ang[0])] | |
row2 = [np.sin(ang[2])*np.cos(ang[1]), np.cos(ang[2])*np.cos(ang[0]) + np.sin(ang[2])*np.sin(ang[1])*np.sin(ang[0]), -np.cos(ang[2])*np.sin(ang[0])+np.sin(ang[2])*np.sin(ang[1])*np.cos(ang[0])], | |
row3 = [-np.sin(ang[1]),np.cos(ang[1])*np.sin(ang[0]),np.cos(ang[1])*np.cos(ang[2])] | |
rot_mat = np.vstack(([row1, row2, row3])) | |
return rot_mat | |
def dot_prod_omega(self, vec): | |
assert vec.size==3, "vector for skew-symmetric matrix must be of size 3." | |
vector = np.array(([[ vec[0,0], vec[1,0], vec[2,0], 0, 0, 0], | |
[0, vec[0,0], 0, vec[1,0], vec[2,0], 0], | |
[0, 0, vec[0,0], 0, vec[1,0], vec[2,0]], | |
])) | |
return vector | |
def full_inertia(self, I): | |
assert I.size==self.freedoms, "inertia vector must be 6X1" | |
inertial_mat = np.array([I[0], I[3], I[-1]]) * np.eye(3) | |
inertial_mat[0,1:] = [I[1], I[2]] | |
inertial_mat[1:,0] = [I[1], I[2]] | |
inertial_mat[2,1] = I[-2] | |
inertial_mat[1,2] = I[-2] | |
return inertial_mat | |
# if __name__ == "__main__": | |
# | |
# rospy.init_node("estimator_node", anonymous=True, disable_signals=True) | |
# subreg = SubscribeRegress(30, verbose=True) | |
# | |
# # collect some wrenches and 0plot | |
# try: | |
# while not rospy.is_shutdown(): | |
# subreg.subscribers() | |
# except rospy.ROSInterruptException: | |
# rospy.logfatal("shutting down ros") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment