First attempt to implement qlearning using function approximation. Mountain car environment.
# In[]
import gym
import numpy as np
import theano
import theano.tensor as T
import lasagne
import sklearn.preprocessing
from sklearn.kernel_approximation import RBFSampler
from sklearn.pipeline import FeatureUnion
# In[]
class ValueFunctionApproximator:
def __init__(self, env, batch_size, learning_rate):
self.nA = env.action_space.n
self.sS = env.observation_space.shape[0]
self.batch_size = batch_size = theano.shared(np.float32(learning_rate))
self.env = env
observation_examples = np.array([env.observation_space.sample() for x in range(100000)])
# Fit feature scaler
self.scaler = sklearn.preprocessing.StandardScaler()
# Fir feature extractor
self.feature_map = FeatureUnion([("rbf1", RBFSampler(n_components=100, gamma=1., random_state=1)),
("rbf01", RBFSampler(n_components=100, gamma=0.1, random_state=1)),
("rbf10", RBFSampler(n_components=100, gamma=10, random_state=1))])
#self.feature_map =
def _init_model(self):
self.nn_x, self.nn_z = T.matrices('x', 'z')
self.nn_lh1 = lasagne.layers.InputLayer(shape=(None, 300), #self.sS),
self.nn_lh2 = lasagne.layers.DenseLayer(self.nn_lh1, 512,
self.nn_lh3 = lasagne.layers.DenseLayer(self.nn_lh2, 256,
self.nn_ly = lasagne.layers.DenseLayer(self.nn_lh3, self.nA,
self.nn_y = lasagne.layers.get_output(self.nn_ly)
self.f_predict = theano.function([self.nn_x], self.nn_y)
self.nn_params = lasagne.layers.get_all_params(self.nn_ly, unwrap_shared=False, trainable=True)
self.nn_cost = T.sum(lasagne.objectives.squared_error(self.nn_y, self.nn_z))
#self.nn_updates = lasagne.updates.sgd(self.nn_cost, self.nn_params,
self.nn_updates = lasagne.updates.rmsprop(self.nn_cost, self.nn_params,
#self.nn_updates = lasagne.updates.adam(self.nn_cost, self.nn_params)
self.f_train = theano.function([self.nn_x, self.nn_z],
[self.nn_y, self.nn_cost],
def _scale_state(self, s_float32):
return self.scaler.transform(s_float32)
def predict(self, s):
s_float32 = np.array(s)
if len(s_float32.shape) == 1:
s_float32 = np.expand_dims(s_float32, axis=0)
if len(s_float32.shape) != 2:
raise RuntimeError('Input should be an 2d-array or row-vector.')
s_float32 = self._scale_state(s_float32)
s_float32 = self.feature_map.transform(s_float32)
s_float32 = s_float32.astype(np.float32)
return self.f_predict(s_float32)
def train(self, states, actions, rewards):
s_float32 = np.array(states).astype(np.float32)
if len(s_float32.shape) == 1:
s_float32 = np.expand_dims(s_float32, axis=0)
if len(s_float32.shape) != 2:
raise RuntimeError('Input should be an 2d-array or row-vector.')
s_float32 = self._scale_state(s_float32)
s_float32 = self.feature_map.transform(s_float32)
s_float32 = s_float32.astype(np.float32)
a_float32 = np.array(actions).astype(np.float32)
result = self.f_train(s_float32, a_float32)
return result
# In[]
class Agent:
def __init__(self, env, eps=1.0, learning_rate=0.1):
self.nA = env.action_space.n
self.eps = eps
self.value_function = ValueFunctionApproximator(env, 32, learning_rate)
def q_values(self, s):
return self.value_function.predict(s)
def act(self, s):
if np.random.random() < self.eps:
return np.random.randint(0, self.nA)
return np.argmax(self.value_function.predict(s))
def estimate(self, s, a):
prediction = self.value_function.predict(s)[0]
return prediction[a]
def learn(self, s, targets):
self.value_function.train(s, targets, [])
# In[]
class ReplayMemory:
def __init__(self, agent, capacity):
self.agent = agent
self.capacity = capacity
self.memory = []
# State, action, reward and next state
def append(self, s, a, r, sp):
self.memory.append([s, a, r, sp])
if len(self.memory) > self.capacity:
def sample(self, batch_size, discount=1.0):
batch_size = min(batch_size, len(self.memory))
choices = np.random.choice(len(self.memory), batch_size)
s = np.array([self.memory[i][0] for i in choices])
a = np.array([self.memory[i][1] for i in choices])
r = np.array([self.memory[i][2] for i in choices])
sp = np.array([self.memory[i][3] for i in choices])
q_vals = agent.q_values(s)
target = r + (r <= 0).astype(int) * discount * np.amax(agent.q_values(sp), axis=1)
for i in range(len(choices)):
q_vals[i, a[i]] = target[i]
return s, q_vals
# In[]
env_name = 'MountainCar-v0'
env = gym.make(env_name)
# In[]
done = False
agent = Agent(env, eps=0.5, learning_rate=0.0001)
memory = ReplayMemory(agent, 100000)
discount = 1.0 # 1.0
# In[] Main Q Learning loop
n_episodes = 100
max_steps_per_episode = 20000
for episode in range(n_episodes):
steps = 0
s = env.reset()
done = False
while not done:
a = agent.act(s)
q_vals = agent.q_values(s)
sp, r, done, info = env.step(a)
memory.append(s, a, r, sp)
if len(memory.memory) > 128:
mem_states, mem_targets = memory.sample(64, discount)
mem_states = np.array(mem_states)
mem_targets = np.array(mem_targets)
agent.learn(mem_states, mem_targets)
#agent.learn(s, targets)
if steps % 50 == 0:
print('Episode {}, Step {}, eps {}'.format(episode, steps, agent.eps))
if len(memory.memory) > 128:
if done or steps > max_steps_per_episode:
print("Episode finished after {} timesteps".format(steps))
s = sp
#a = ap
steps += 1
if agent.eps >= 0.01 and steps % 10000 == 0:
agent.eps *= 0.9
if agent.eps >= 0.0:
agent.eps *= 0.9
# In[] Act
monitoring = True
render = True
monitor_name = './' + env_name + '-' + 'qlearning' + '-experiment'
if monitoring:
env.monitor.start(monitor_name, force=True)
for e in range(150):
s = env.reset()
episode = 0
done = False
#tmp = agent.eps
#agent.eps = 0.0
while not done and episode < 500:
if render: env.render()
a = agent.act(s)
sp, r, done, info = env.step(a)
s = sp
episode += 1
#agent.eps = tmp
print('episode {} finished in {} steps'.format(e, episode))
if monitoring:
# In[]
gym.upload(monitor_name, api_key='0000000000000000000000', ignore_open_monitors=True)
