Skip to content

Instantly share code, notes, and snippets.

@wingedsheep
Created May 21, 2016 14:29
Show Gist options
  • Save wingedsheep/4199594b02138dd427c22a540d6d6b8d to your computer and use it in GitHub Desktop.
Save wingedsheep/4199594b02138dd427c22a540d6d6b8d to your computer and use it in GitHub Desktop.
Q learning cartpole with target network and experience replay
# import the gym stuff
import gym
# import other stuff
import random
import numpy as np
# import own classes
from deepq import DeepQ
env = gym.make('CartPole-v0')
epochs = 1000
steps = 100000
updateTargetNetwork = 10000
explorationRate = 1
minibatch_size = 128
learnStart = 128
learningRate = 0.00025
discountFactor = 0.99
memorySize = 1000000
last100Scores = [0] * 100
last100ScoresIndex = 0
last100Filled = False
deepQ = DeepQ(4, 2, memorySize, discountFactor, learningRate, learnStart)
deepQ.initNetworks([30,30,30])
stepCounter = 0
# number of reruns
for epoch in xrange(epochs):
observation = env.reset()
print explorationRate
# number of timesteps
for t in xrange(steps):
# env.render()
qValues = deepQ.getQValues(observation)
action = deepQ.selectAction(qValues, explorationRate)
newObservation, reward, done, info = env.step(action)
if (t >= 199):
print "reached the end! :D"
done = True
reward = 200
if done and t < 199:
print "decrease reward"
reward -= 200
deepQ.addMemory(observation, action, reward, newObservation, done)
if stepCounter >= learnStart:
if stepCounter <= updateTargetNetwork:
deepQ.learnOnMiniBatch(minibatch_size, False)
else :
deepQ.learnOnMiniBatch(minibatch_size, True)
observation = newObservation
if done:
last100Scores[last100ScoresIndex] = t
last100ScoresIndex += 1
if last100ScoresIndex >= 100:
last100Filled = True
last100ScoresIndex = 0
if not last100Filled:
print "Episode ",epoch," finished after {} timesteps".format(t+1)
else :
print "Episode ",epoch," finished after {} timesteps".format(t+1)," last 100 average: ",(sum(last100Scores)/len(last100Scores))
break
stepCounter += 1
if stepCounter % updateTargetNetwork == 0:
deepQ.updateTargetNetwork()
print "updating target network"
explorationRate *= 0.995
# explorationRate -= (2.0/epochs)
explorationRate = max (0.05, explorationRate)
# import os
# os.environ["THEANO_FLAGS"] = "mode=FAST_RUN,device=gpu,floatX=float32"
# import theano
# import the neural net stuff
from keras.models import Sequential
from keras import optimizers
from keras.layers.core import Dense, Dropout, Activation
from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import LeakyReLU
from keras.regularizers import l2
# import other stuff
import random
import numpy as np
from memory import Memory
class DeepQ:
def __init__(self, inputs, outputs, memorySize, discountFactor, learningRate, learnStart):
self.input_size = inputs
self.output_size = outputs
self.memory = Memory(memorySize)
self.discountFactor = discountFactor
self.learnStart = learnStart
self.learningRate = learningRate
def initNetworks(self, hiddenLayers):
model = self.createModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate)
self.model = model
targetModel = self.createModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate)
self.targetModel = targetModel
def createRegularizedModel(self, inputs, outputs, hiddenLayers, activationType, learningRate):
bias = True
dropout = 0
regularizationFactor = 0.01
model = Sequential()
if len(hiddenLayers) == 0:
model.add(Dense(self.output_size, input_shape=(self.input_size,), init='lecun_uniform', bias=bias))
model.add(Activation("linear"))
else :
if regularizationFactor > 0:
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias))
else:
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform', bias=bias))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
for index in range(1, len(hiddenLayers)-1):
layerSize = hiddenLayers[index]
if regularizationFactor > 0:
model.add(Dense(layerSize, init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias))
else:
model.add(Dense(layerSize, init='lecun_uniform', bias=bias))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
if dropout > 0:
model.add(Dropout(dropout))
model.add(Dense(self.output_size, init='lecun_uniform', bias=bias))
model.add(Activation("linear"))
optimizer = optimizers.RMSprop(lr=learningRate, rho=0.9, epsilon=1e-06)
model.compile(loss="mse", optimizer=optimizer)
return model
def createModel(self, inputs, outputs, hiddenLayers, activationType, learningRate):
model = Sequential()
if len(hiddenLayers) == 0:
model.add(Dense(self.output_size, input_shape=(self.input_size,), init='lecun_uniform'))
model.add(Activation("linear"))
else :
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform'))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
for index in range(1, len(hiddenLayers)-1):
layerSize = hiddenLayers[index]
model.add(Dense(layerSize, init='lecun_uniform'))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
model.add(Dense(self.output_size, init='lecun_uniform'))
model.add(Activation("linear"))
optimizer = optimizers.RMSprop(lr=learningRate, rho=0.9, epsilon=1e-06)
model.compile(loss="mse", optimizer=optimizer)
return model
def printNetwork(self):
i = 0
for layer in self.model.layers:
weights = layer.get_weights()
print "layer ",i,": ",weights
i += 1
def backupNetwork(self, model, backup):
weightMatrix = []
for layer in model.layers:
weights = layer.get_weights()
weightMatrix.append(weights)
i = 0
for layer in backup.layers:
weights = weightMatrix[i]
layer.set_weights(weights)
i += 1
def updateTargetNetwork(self):
self.backupNetwork(self.model, self.targetModel)
# predict Q values for all the actions
def getQValues(self, state):
predicted = self.model.predict(state.reshape(1,len(state)))
return predicted[0]
def getTargetQValues(self, state):
predicted = self.targetModel.predict(state.reshape(1,len(state)))
return predicted[0]
def getMaxQ(self, qValues):
return np.max(qValues)
def getMaxIndex(self, qValues):
return np.argmax(qValues)
# calculate the target function
def calculateTarget(self, qValuesNewState, reward, isFinal):
if isFinal:
return reward
else :
return reward + self.discountFactor * self.getMaxQ(qValuesNewState)
# select the action with the highest Q value
def selectAction(self, qValues, explorationRate):
rand = random.random()
if rand < explorationRate :
action = np.random.randint(0, self.output_size)
else :
action = self.getMaxIndex(qValues)
return action
def selectActionByProbability(self, qValues, bias):
qValueSum = 0
shiftBy = 0
for value in qValues:
if value + shiftBy < 0:
shiftBy = - (value + shiftBy)
shiftBy += 1e-06
for value in qValues:
qValueSum += (value + shiftBy) ** bias
probabilitySum = 0
qValueProbabilities = []
for value in qValues:
probability = ((value + shiftBy) ** bias) / float(qValueSum)
qValueProbabilities.append(probability + probabilitySum)
probabilitySum += probability
qValueProbabilities[len(qValueProbabilities) - 1] = 1
rand = random.random()
i = 0
for value in qValueProbabilities:
if (rand <= value):
return i
i += 1
def addMemory(self, state, action, reward, newState, isFinal):
self.memory.addMemory(state, action, reward, newState, isFinal)
def learnOnLastState(self):
if self.memory.getCurrentSize() >= 1:
return self.memory.getMemory(self.memory.getCurrentSize() - 1)
def learnOnMiniBatch(self, miniBatchSize, useTargetNetwork=True):
if self.memory.getCurrentSize() > self.learnStart :
miniBatch = self.memory.getMiniBatch(miniBatchSize)
X_batch = np.empty((0,self.input_size), dtype = np.float64)
Y_batch = np.empty((0,self.output_size), dtype = np.float64)
for sample in miniBatch:
isFinal = sample['isFinal']
state = sample['state']
action = sample['action']
reward = sample['reward']
newState = sample['newState']
qValues = self.getQValues(state)
if useTargetNetwork:
qValuesNewState = self.getTargetQValues(newState)
else :
qValuesNewState = self.getQValues(newState)
targetValue = self.calculateTarget(qValuesNewState, reward, isFinal)
X_batch = np.append(X_batch, np.array([state.copy()]), axis=0)
Y_sample = qValues.copy()
Y_sample[action] = targetValue
Y_batch = np.append(Y_batch, np.array([Y_sample]), axis=0)
if isFinal:
X_batch = np.append(X_batch, np.array([newState.copy()]), axis=0)
Y_batch = np.append(Y_batch, np.array([[reward]*self.output_size]), axis=0)
self.model.fit(X_batch, Y_batch, batch_size = len(miniBatch), nb_epoch=1, verbose = 0)
import numpy as np
import random
class Memory:
def __init__(self, size):
self.size = size
self.currentPosition = 0
self.states = []
self.actions = []
self.rewards = []
self.newStates = []
self.finals = []
def getMiniBatch(self, size) :
indices = random.sample(np.arange(len(self.states)), min(size,len(self.states)) )
miniBatch = []
for index in indices:
miniBatch.append({'state': self.states[index],'action': self.actions[index], 'reward': self.rewards[index], 'newState': self.newStates[index], 'isFinal': self.finals[index]})
return miniBatch
def getCurrentSize(self) :
return len(self.states)
def getMemory(self, index):
return {'state': self.states[index],'action': self.actions[index], 'reward': self.rewards[index], 'newState': self.newStates[index], 'isFinal': self.finals[index]}
def addMemory(self, state, action, reward, newState, isFinal) :
if (self.currentPosition >= self.size - 1) :
self.currentPosition = 0
if (len(self.states) > self.size) :
self.states[self.currentPosition] = state
self.actions[self.currentPosition] = action
self.rewards[self.currentPosition] = reward
self.newStates[self.currentPosition] = newState
self.finals[self.currentPosition] = isFinal
else :
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
self.newStates.append(newState)
self.finals.append(isFinal)
self.currentPosition += 1
@wingedsheep
Copy link
Author

This is a deep Q learning approach. Inspired by the paper from deepmind (https://github.com/kuz/DeepMind-Atari-Deep-Q-Learner)
It uses replay memory to store its experiences, and learns on minibatches randomly taken from this replay memory.
Furthermore I used a target network, also described in the article by deepmind.

Things that are slightly different is that I used a larger batch size 128, and a regularization factor.

When not reaching the 200 steps I give a penalty to the reward of -200. I don't really like this addition but because the algorithm isn't allowed an infinite (or higher) number of steps it needs some feedback of when it succeeded.

@platers
Copy link

platers commented May 25, 2016

Do you know how much the target network helps the training?
I'm currently attempting this without a target network and the network fails to train even given thousands of epochs. My memorySize is also only about 1000. Any tips are appreciated!

@vmayoral
Copy link

Thanks @wingedsheep for putting this together. Your code helped nicely my RL series https://github.com/vmayoral/basic_reinforcement_learning.

One thing I noted while reading your code is that there's a bug at https://gist.github.com/wingedsheep/4199594b02138dd427c22a540d6d6b8d#file-deepq-py-L84 that won't create the network architecture as specified at https://gist.github.com/wingedsheep/4199594b02138dd427c22a540d6d6b8d#file-cartpole_runnner-py-L26. Removing the -1 will do.

@vmayoral
Copy link

When not reaching the 200 steps I give a penalty to the reward of -200. I don't really like this addition but because the algorithm isn't allowed an infinite (or higher) number of steps it needs some feedback of when it succeeded.

I'd be interested in hearing your opinion about what's the exact outcome of modifying the rewards. I did some experimental testing with the default rewards and the modified ones you propose and found out that the default perform better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment