Skip to content

Instantly share code, notes, and snippets.

@Redchards
Last active November 12, 2018 16:23
Show Gist options
  • Save Redchards/d7c12de1cf7590326356ffec01fa2e4f to your computer and use it in GitHub Desktop.
Save Redchards/d7c12de1cf7590326356ffec01fa2e4f to your computer and use it in GitHub Desktop.
Retarded Q-learning
#!/usr/bin/env python
# OH NO ! IT'S RETARDED !!
'''
1, 1 [[ 0.94235238 0.4 ]
[ 0. 0. ]
[ 0. 0. ]
[-0.4 -1.35506596]
[ 0. 0. ]
[ 0. 0. ]
[-1.13457664 -1.85922768]
[-1.373824 -1.89873836]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.784 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.67546624]
[-0.4 -1.26544384]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.784 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.64 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0.4 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.64 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.4 ]
[ 0. -0.8704 ]
[ 0. 2.1724221 ]
[ 0. 0. ]
[ 0. 0. ]
[-0.68224 -1.60657824]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.9899223 ]
[-1.84627302 -2.31450877]]
1, 2 [[ 0. 3.19539843]
[ 0. 0. ]
[-0.784 -1.26705434]
[ 0. -0.99921636]
[ 0. 0. ]
[ 0. 0. ]
[-3.14717287 -4.47898616]
[-5.13272036 -5.45997029]
[ 0.4 1.0549504 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.4 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.64 ]
[ 0. -1.2967306 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.4 ]
[ 0. -0.64 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.64 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.784 ]
[ 0.4 0.92224 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.4 ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. 0. ]
[ 0. -0.9899223 ]
[ 0. 3.14102508]
[ 0. 0. ]
[ 0. -0.928 ]
[-1.60660251 -1.71887104]
[ 0. 0. ]
[ 0. 0. ]
[-1.15264 -3.41924685]
[-4.99526105 -5.41065665]]
'''
import rospy
import math
import random #used for the random choice of a strategy
from std_msgs.msg import Int16,Float32,Bool,Float32MultiArray,Int16MultiArray
from nav_msgs.msg import Odometry
from fastsim.srv import *
from subsomption.msg import Channel
from sensor_msgs.msg import LaserScan
import sys
import numpy as np
import numpy.random as npr
channel=[]
lasers=LaserScan()
radar = []
odom = Odometry()
bumper_l=0
bumper_r=0
goalx = 300
goaly = 480
def s_id(S):
state_id = 0
state_id += int(S[0])
state_id += int(S[1]) << 1
state_id += int(S[2]) << 2
state_id += int(S[3]) << 3
return state_id
#-------------------------------------------
class CallBack_module_cl(object):
def __init__(self, num):
print "Creating callback for "+str(num)
self.num = num
def __call__(self, data):
return callback_module(self.num,data)
#-------------------------------------------
def callback_module(n, data):
channel[n]=data
#rospy.loginfo(rospy.get_caller_id()+" n=%d Activated: %d speed_l: %f speed_r: %f",n,data.activated, data.speed_left, data.speed_right)
#-------------------------------------------
def callback_right_bumper(data):
global bumper_r
bumper_r=data.data
#rospy.loginfo(rospy.get_caller_id()+"Right bumper %d",data.data)
#-------------------------------------------
def callback_left_bumper(data):
global bumper_l
bumper_l=data.data
#rospy.loginfo(rospy.get_caller_id()+"Left bumper %d",data.data)
#-------------------------------------------
def callback_lasers(data):
global lasers
lasers=data
#-------------------------------------------
def callback_radar(data):
global radar
radar=data.data
#-------------------------------------------
def callback_odom(data):
global odom
odom=data
# ajouter un call back radar avec le bon type de donnees
#-------------------------------------------
# nbCh is the number of behavioral modules (channels) in competition
# gatingType sets the gating algorithm to be used ['random','qlearning']
#-------------------------------------------
def strategy_gating(nbCh,gatingType):
rospy.init_node('strategy_gating', anonymous=True)
v=Channel()
v.activated=False
v.speed_left=0
v.speed_right=0
# Parameters of State building
th_neglectedWall = 65
angleLMin = 0
angleLMax = 55
angleFMin=56
angleFMax=143
angleRMin=144
angleRMax=199
alpha = 0.4
beta = 8
pouyou = 0.9
# The node publishes movement orders for simu_fastsim :
pub_l = rospy.Publisher('/simu_fastsim/speed_left', Float32 , queue_size=10)
pub_r = rospy.Publisher('/simu_fastsim/speed_right', Float32 , queue_size=10)
# The node receives order suggestions by the behavioral modules (channels):
for n in range(nbCh):
rospy.Subscriber("/navigation_strategies/channel"+str(n), Channel, CallBack_module_cl(n))
# If necessary, the node receives sensory information from simu_fastsim:
rospy.Subscriber("/simu_fastsim/laser_scan", LaserScan, callback_lasers)
rospy.Subscriber("/simu_fastsim/radars", Int16MultiArray, callback_radar)
rospy.Subscriber("/simu_fastsim/odom", Odometry, callback_odom)
rospy.Subscriber("/simu_fastsim/right_bumper", Bool, callback_right_bumper)
rospy.Subscriber("/simu_fastsim/left_bumper", Bool, callback_left_bumper)
# Targetted operating frequency of the node:
r = rospy.Rate(10) # 10hz
# Q-learning related stuff
# definition of states at time t and t-1
S_t = ''
S_tm1 = ''
Q = {}
# start time and timing related things
startT = rospy.get_time()
rospy.loginfo("Start time"+str(startT))
trial = 0
nbTrials = 10
trialDuration = np.zeros((nbTrials))
collisions = np.zeros((nbTrials))
choice = -1
rew = 0
i2strat = ['wall follower','guidance']
persist_step = 0
q_learning_step = 40
q_values = np.zeros((64, 2))
choice_selection_strategy = 1
q_value_update_strategy = 2
# Main loop:
while (not rospy.is_shutdown()) and (trial <nbTrials):
print(trial)
speed_l=0
speed_r=0
# processing of the sensory data :
#------------------------------------------------
# 1) has the robot found the reward ?
#rospy.loginfo("pose: "+str(odom.pose.pose.position.x)+", "+str(odom.pose.pose.position.y))
dist2goal = math.sqrt((odom.pose.pose.position.x-goalx)**2+(odom.pose.pose.position.y-goaly)**2)
#rospy.loginfo(dist2goal)
# if so, teleport it:
if (dist2goal<30):
rospy.wait_for_service('simu_fastsim/teleport')
try:
# teleport robot
teleport = rospy.ServiceProxy('simu_fastsim/teleport', Teleport)
x = 300 #20+random.randrange(520)
y = 40
th = random.randrange(360)/2*math.pi
resp1 = teleport(x, y, th)
# store information about the duration of the finishing trial:
currT = rospy.get_time()
trialDuration[trial] = currT - startT
startT = currT
rospy.loginfo("Trial "+str(trial)+" duration:"+str(trialDuration[trial]))
trial +=1
rew = 1
odom.pose.pose.position.x = x
odom.pose.pose.position.y = y
except rospy.ServiceException, e:
print "Service call failed: %s"%e
# 2) has the robot bumped into a wall ?
#rospy.loginfo("BUMPERS "+str(bumper_r)+' '+str(bumper_l))
if bumper_r or bumper_l:
collisions[trial] += 1
rew = -1
#rospy.loginfo("BING! A wall...")
# 3) build the state, that will be used by learning, from the sensory data
#rospy.loginfo("Nb laser scans="+str(len(lasers.ranges)))
if len(lasers.ranges) == 200:
S_tm1 = S_t
S_t = ''
# determine if obstacle on the left:
wall='0'
for l in lasers.ranges[angleLMin:angleLMax]:
if l < th_neglectedWall:
wall ='1'
S_t += wall
# determine if obstacle in front:
wall='0'
for l in lasers.ranges[angleFMin:angleFMax]:
#rospy.loginfo("front:"+str(l))
if l < th_neglectedWall:
wall ='1'
S_t += wall
# determine if obstacle in front:
wall='0'
for l in lasers.ranges[angleRMin:angleRMax]:
if l < th_neglectedWall:
wall ='1'
S_t += wall
# check if we are receiving radar measurements
if radar != 0:
radar_list = []
for i in range(len(radar)):
radar_list.append(radar[i])
#rospy.loginfo(str(radar_list))
print(radar_list)
S_t += str(radar_list[0])
#rospy.loginfo("S(t)="+S_t+" ; S(t-1)="+S_tm1)
# The chosen gating strategy is to be coded here:
#------------------------------------------------
if gatingType=='random':
choice = random.randrange(nbCh)
#choice = 1
rospy.loginfo("Module actif: "+i2strat[choice])
speed_l=channel[choice].speed_left
speed_r=channel[choice].speed_right
#------------------------------------------------
elif gatingType=='randomPersist':
# a choice is made every 5 steps
if persist_step < 20:
persist_step += 1
else:
persist_step = 0
choice = random.randrange(nbCh)
speed_l = channel[choice].speed_left
speed_r = channel[choice].speed_right
rospy.loginfo("Module actif: "+i2strat[choice])
rospy.loginfo("randomPersist: arbitrage a coder.")
pass
#------------------------------------------------
elif gatingType=='guidance':
choice = 1
rospy.loginfo("Module actif: "+i2strat[choice])
speed_l=channel[choice].speed_left
speed_r=channel[choice].speed_right
#------------------------------------------------
elif gatingType=='wallFollower':
choice = 1
rospy.loginfo("Module actif: "+i2strat[choice])
speed_l=channel[choice].speed_left
speed_r=channel[choice].speed_right
#------------------------------------------------
elif gatingType=='qlearning':
print(S_t)
no_state = (S_t == '' or S_tm1 == '')
sensor_percept = (S_tm1 == S_t)
if choice == -1:
choice = 0
if q_learning_step < 20 or no_state:
q_learning_step += 1
if ((q_learning_step >= 20 and choice_selection_strategy == 1) or (sensor_percept and choice_selection_strategy == 2)) and not no_state:
norm_factor = sum([np.exp(beta * q_values[s_id(S_t), a]) for a in range(2)])
probs = [np.exp(beta * q_values[s_id(S_t), a]) / norm_factor for a in range(2)]
print(probs)
draw = np.random.uniform()
choice = 0 if draw < probs[0] else 1
if ((q_learning_step >= 20 and q_value_update_strategy == 1) or (not rew == 0 and q_value_update_strategy == 2)) and not no_state:
print(S_t)
print("Strategy : ", choice)
delta = rew + pouyou * q_values[s_id(S_t), :].max() - q_values[s_id(S_tm1), choice]
q_values[s_id(S_tm1), choice] = q_values[s_id(S_tm1), choice] + alpha * delta
print(q_values)
rew = 0
if q_learning_step >= 20:
q_learning_step = 0
speed_l=channel[choice].speed_left
speed_r=channel[choice].speed_right
rospy.loginfo("qlearning: arbitrage a coder.")
pass
#------------------------------------------------
else:
rospy.loginfo(gatingType+' unknown.')
exit()
#for i in range(nbCh):
# channel[i]=v
pub_l.publish(speed_l)
pub_r.publish(speed_r)
r.sleep()
# Log files opening
print(trialDuration)
print(collisions)
logDuration = open('DureesEssais_a_'+str(alpha)+'_b_'+str(beta)+'_g_'+str(pouyou)+'_'+str(startT),'w')
logCollision = open('Collisions_a_'+str(alpha)+'_b_'+str(beta)+'_g_'+str(pouyou)+'_'+str(startT),'w')
for i in range(nbTrials):
rospy.loginfo('T = '+str(trialDuration[i]))
logDuration.write(str(i)+' '+str(trialDuration[i])+'\n')
logCollision.write(str(i) + ' ' + str(collisions[i]) + '\n')
logDuration.close()
logCollision.close()
#-------------------------------------------
if __name__ == '__main__':
nbch=int(sys.argv[1])
gatingType=sys.argv[2]
v=Channel()
v.activated=False
v.speed_left=0
v.speed_right=0
channel=[v for i in range(nbch)]
try:
strategy_gating(nbch,gatingType)
except rospy.ROSInterruptException: pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment