Created
August 21, 2017 23:21
-
-
Save akihikoy/650b8ee4699da98fc9f766107484398c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Following codes are an example of processing marker tracker topic from FingerVision. | |
I just copied them from my system, which might be incomplete, but you could find the idea of the calculation. | |
The main function to process the marker tracker topic is BlobMoves. | |
(see `def BlobMoves(t,l,msg,side)`) | |
t: Interface of core tools (ROS connection, robot controller, etc.); you can replace t by adequate functions. | |
l: A container object. The simplest implementation is like: | |
class Container(): pass | |
l= Container() | |
#now you can add any members by | |
l.foo= 1.2 | |
l.bar= [1,2,3] | |
BlobMoves stores the computed data in l so that you can access to them with l. | |
You can replace l by a dictionary or a list. | |
msg: Topic from the marker tracker. | |
side: We assume that two FingerVision sensors are attached on a parallel gripper. side is a flag to select one of them. | |
side should be RIGHT or LEFT, RIGHT=0, LEFT=1 | |
Subscribe a topic from FingerVision with the callback using BlobMoves. | |
Set up l: | |
l= TContainer() | |
l.seq= [[],[]] | |
l.force= [None,None] | |
l.recording= False | |
Right finger: | |
Topic: '/visual_skin_node_ay1/blob_moves_usbcam2fay12_r' | |
Type: lfd_vision.msg.BlobMoves | |
Callback: lambda msg,l=l:BlobMoves(t,l,msg,RIGHT) | |
Left finger: | |
Topic: '/visual_skin_node_ay1/blob_moves_usbcam2fay12_l' | |
Type: lfd_vision.msg.BlobMoves | |
Callback: lambda msg,l=l:BlobMoves(t,l,msg,LEFT) | |
Note: | |
- VizForce is a visualization code for RViz (just comment out VizForce(...) in BlobMoves). | |
- np = numpy | |
''' | |
#Similar to a median of a position pos but | |
#return one of median(pos) and median(1-pos) | |
#that has a bigger absolute value. | |
def XMedian(array,pos=0.75): | |
if len(array)==0: return None | |
a_sorted= copy.deepcopy(array) | |
a_sorted.sort() | |
a1= a_sorted[int(len(a_sorted)*pos)] | |
a2= a_sorted[int(len(a_sorted)*(1.0-pos))] | |
return a1 if abs(a1)>abs(a2) else a2 | |
def VizForce(t,force,side): | |
viz= t.viz.finger_force | |
x_w= t.robot.FK(x_ext=[0.0,(-0.05,0.05)[side],0.07, 0,0,0,1],arm=RIGHT) | |
x_e= t.robot.FK(x_ext=t.GetAttr('wrist_r','lx'),arm=RIGHT) | |
def VizVector(t,vec,width,col,mid): | |
x_f= Vec([0.0]*7) | |
x_f[:3]= x_w[:3] | |
ex= Vec(vec) | |
f_norm= Norm(ex) | |
if f_norm<1.0e-6: | |
viz.AddArrow(x_f, scale=[0.0,0.0,0.0], mid=mid) | |
return | |
ex= ex/f_norm | |
ey= GetOrthogonalAxisOf(ex, preferable=([0.0,1.0,0.0]), fault=[1.0,0.0,0.0]) | |
ez= np.cross(ex, ey) | |
x_f[3:]= RotToQ(ExyzToRot(ex,ey,ez)) | |
viz.AddArrow(x_f, scale=[0.02*f_norm,width,width], rgb=col, alpha=0.7, mid=mid) | |
VizVector(t,Transform(x_e[3:],force[:3]),width=0.010,col=[1.0,0.0,0.0],mid=2*side+0) | |
VizVector(t,Transform(x_e[3:],force[3:]),width=0.005,col=[0.0,1.0,0.0],mid=2*side+1) | |
def BlobMoves(t,l,msg,side): | |
filter_len= 20 | |
cx= msg.width/2 | |
cy= msg.height/2 | |
div= (msg.width+msg.height)/2 | |
def convert(mv): | |
fscale= [1.0,1.0,2.5] | |
tscale= 2.0 | |
rxy= ((mv.Pox-cx)/div, (mv.Poy-cy)/div) | |
fxy= (mv.DPx, mv.DPy) | |
fz= max(0.0,mv.DS) | |
if side==RIGHT: | |
f= [+(fxy[0]*fscale[0]), -(fz*fscale[2]), -(fxy[1]*fscale[1])] | |
r= [+rxy[0], 0.0, -rxy[1]] | |
elif side==LEFT: | |
f= [-(fxy[0]*fscale[0]), +(fz*fscale[2]), -(fxy[1]*fscale[1])] | |
r= [-rxy[0], 0.0, -rxy[1]] | |
tau= np.cross(r,f)*tscale | |
return f+tau.tolist() | |
force_array= [convert(mv) for mv in msg.data] | |
#print force_array | |
l.seq[side].append(force_array) | |
if len(l.seq[side])>filter_len: l.seq[side].pop() | |
force= [XMedian([force[d] for force in force_array],0.8) for d in xrange(6)] | |
l.force[side]= force | |
#print force | |
VizForce(t,force,side) | |
def serialize(data): | |
return ' '.join((ToStr((mv.Pox,mv.Poy,mv.So,mv.DPx,mv.DPy,mv.DS)) for mv in msg.data)) | |
if l.recording: | |
t_curr= rospy.Time.now().to_nsec() | |
l.fp_avr[side].write('{time} {force}\n'.format( | |
time=t_curr, | |
force=ToStr(force))) | |
l.fp_all[side].write('{time} {width} {height} {data}\n'.format( | |
time=t_curr, | |
width=msg.width,height=msg.height, | |
data=serialize(msg.data))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment