Skip to content

Instantly share code, notes, and snippets.

@wingedsheep
Created May 22, 2016 08:36
Show Gist options
  • Save wingedsheep/f162f73aad0dcfb2470a89ac0781696f to your computer and use it in GitHub Desktop.
Save wingedsheep/f162f73aad0dcfb2470a89ac0781696f to your computer and use it in GitHub Desktop.
Deep Q learning for the lunar lander
# import the neural net stuff
from keras.models import Sequential
from keras import optimizers
from keras.layers.core import Dense, Dropout, Activation
from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import LeakyReLU
from keras.regularizers import l2
# import other stuff
import random
import numpy as np
from memory import Memory
class DeepQ:
def __init__(self, inputs, outputs, memorySize, discountFactor, learningRate, learnStart):
self.input_size = inputs
self.output_size = outputs
self.memory = Memory(memorySize)
self.discountFactor = discountFactor
self.learnStart = learnStart
self.learningRate = learningRate
def initNetworks(self, hiddenLayers):
model = self.createModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate)
self.model = model
targetModel = self.createModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate)
self.targetModel = targetModel
def createRegularizedModel(self, inputs, outputs, hiddenLayers, activationType, learningRate):
bias = True
dropout = 0
regularizationFactor = 0.01
model = Sequential()
if len(hiddenLayers) == 0:
model.add(Dense(self.output_size, input_shape=(self.input_size,), init='lecun_uniform', bias=bias))
model.add(Activation("linear"))
else :
if regularizationFactor > 0:
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias))
else:
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform', bias=bias))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
for index in range(1, len(hiddenLayers)-1):
layerSize = hiddenLayers[index]
if regularizationFactor > 0:
model.add(Dense(layerSize, init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias))
else:
model.add(Dense(layerSize, init='lecun_uniform', bias=bias))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
if dropout > 0:
model.add(Dropout(dropout))
model.add(Dense(self.output_size, init='lecun_uniform', bias=bias))
model.add(Activation("linear"))
optimizer = optimizers.RMSprop(lr=learningRate, rho=0.9, epsilon=1e-06)
model.compile(loss="mse", optimizer=optimizer)
return model
def createModel(self, inputs, outputs, hiddenLayers, activationType, learningRate):
model = Sequential()
if len(hiddenLayers) == 0:
model.add(Dense(self.output_size, input_shape=(self.input_size,), init='lecun_uniform'))
model.add(Activation("linear"))
else :
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform'))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
for index in range(1, len(hiddenLayers)-1):
layerSize = hiddenLayers[index]
model.add(Dense(layerSize, init='lecun_uniform'))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
model.add(Dense(self.output_size, init='lecun_uniform'))
model.add(Activation("linear"))
optimizer = optimizers.RMSprop(lr=learningRate, rho=0.9, epsilon=1e-06)
model.compile(loss="mse", optimizer=optimizer)
return model
def printNetwork(self):
i = 0
for layer in self.model.layers:
weights = layer.get_weights()
print "layer ",i,": ",weights
i += 1
def backupNetwork(self, model, backup):
weightMatrix = []
for layer in model.layers:
weights = layer.get_weights()
weightMatrix.append(weights)
i = 0
for layer in backup.layers:
weights = weightMatrix[i]
layer.set_weights(weights)
i += 1
def updateTargetNetwork(self):
self.backupNetwork(self.model, self.targetModel)
# predict Q values for all the actions
def getQValues(self, state):
predicted = self.model.predict(state.reshape(1,len(state)))
return predicted[0]
def getTargetQValues(self, state):
predicted = self.targetModel.predict(state.reshape(1,len(state)))
return predicted[0]
def getMaxQ(self, qValues):
return np.max(qValues)
def getMaxIndex(self, qValues):
return np.argmax(qValues)
# calculate the target function
def calculateTarget(self, qValuesNewState, reward, isFinal):
if isFinal:
return reward
else :
return reward + self.discountFactor * self.getMaxQ(qValuesNewState)
# select the action with the highest Q value
def selectAction(self, qValues, explorationRate):
rand = random.random()
if rand < explorationRate :
action = np.random.randint(0, self.output_size)
else :
action = self.getMaxIndex(qValues)
return action
def selectActionByProbability(self, qValues, bias):
qValueSum = 0
shiftBy = 0
for value in qValues:
if value + shiftBy < 0:
shiftBy = - (value + shiftBy)
shiftBy += 1e-06
for value in qValues:
qValueSum += (value + shiftBy) ** bias
probabilitySum = 0
qValueProbabilities = []
for value in qValues:
probability = ((value + shiftBy) ** bias) / float(qValueSum)
qValueProbabilities.append(probability + probabilitySum)
probabilitySum += probability
qValueProbabilities[len(qValueProbabilities) - 1] = 1
rand = random.random()
i = 0
for value in qValueProbabilities:
if (rand <= value):
return i
i += 1
def addMemory(self, state, action, reward, newState, isFinal):
self.memory.addMemory(state, action, reward, newState, isFinal)
def learnOnLastState(self):
if self.memory.getCurrentSize() >= 1:
return self.memory.getMemory(self.memory.getCurrentSize() - 1)
def learnOnMiniBatch(self, miniBatchSize, useTargetNetwork=True):
if self.memory.getCurrentSize() > self.learnStart :
miniBatch = self.memory.getMiniBatch(miniBatchSize)
X_batch = np.empty((0,self.input_size), dtype = np.float64)
Y_batch = np.empty((0,self.output_size), dtype = np.float64)
for sample in miniBatch:
isFinal = sample['isFinal']
state = sample['state']
action = sample['action']
reward = sample['reward']
newState = sample['newState']
qValues = self.getQValues(state)
if useTargetNetwork:
qValuesNewState = self.getTargetQValues(newState)
else :
qValuesNewState = self.getQValues(newState)
targetValue = self.calculateTarget(qValuesNewState, reward, isFinal)
X_batch = np.append(X_batch, np.array([state.copy()]), axis=0)
Y_sample = qValues.copy()
Y_sample[action] = targetValue
Y_batch = np.append(Y_batch, np.array([Y_sample]), axis=0)
if isFinal:
X_batch = np.append(X_batch, np.array([newState.copy()]), axis=0)
Y_batch = np.append(Y_batch, np.array([[reward]*self.output_size]), axis=0)
self.model.fit(X_batch, Y_batch, batch_size = len(miniBatch), nb_epoch=1, verbose = 0)
# import the gym stuff
import gym
# import other stuff
import random
import numpy as np
# import own classes
from deepq import DeepQ
print(gym.envs.registry.all())
env = gym.make('LunarLander-v1')
epochs = 1000
steps = 1000
updateTargetNetwork = 10000
explorationRate = 1
minibatch_size = 128
learnStart = 128
learningRate = 0.00025
discountFactor = 0.99
memorySize = 1000000
last100Scores = [0] * 100
last100ScoresIndex = 0
last100Filled = False
renderPerXEpochs = 50
shouldRender = False
deepQ = DeepQ(len(env.observation_space.high), env.action_space.n, memorySize, discountFactor, learningRate, learnStart)
deepQ.initNetworks([30,30,30])
stepCounter = 0
# number of reruns
for epoch in xrange(epochs):
observation = env.reset()
print explorationRate
# number of timesteps
totalReward = 0
for t in xrange(steps):
if epoch % renderPerXEpochs == 0 and shouldRender:
env.render()
qValues = deepQ.getQValues(observation)
action = deepQ.selectAction(qValues, explorationRate)
newObservation, reward, done, info = env.step(action)
totalReward += reward
deepQ.addMemory(observation, action, reward, newObservation, done)
if stepCounter >= learnStart:
if stepCounter <= updateTargetNetwork:
deepQ.learnOnMiniBatch(minibatch_size, False)
else :
deepQ.learnOnMiniBatch(minibatch_size, True)
observation = newObservation
if done:
last100Scores[last100ScoresIndex] = totalReward
last100ScoresIndex += 1
if last100ScoresIndex >= 100:
last100Filled = True
last100ScoresIndex = 0
if not last100Filled:
print "Episode ",epoch," finished after {} timesteps".format(t+1)," with total reward",totalReward
else :
print "Episode ",epoch," finished after {} timesteps".format(t+1)," with total reward",totalReward," last 100 average: ",(sum(last100Scores)/len(last100Scores))
break
stepCounter += 1
if stepCounter % updateTargetNetwork == 0:
deepQ.updateTargetNetwork()
print "updating target network"
explorationRate *= 0.995
explorationRate = max (0.05, explorationRate)
import numpy as np
import random
class Memory:
def __init__(self, size):
self.size = size
self.currentPosition = 0
self.states = []
self.actions = []
self.rewards = []
self.newStates = []
self.finals = []
def getMiniBatch(self, size) :
indices = random.sample(np.arange(len(self.states)), min(size,len(self.states)) )
miniBatch = []
for index in indices:
miniBatch.append({'state': self.states[index],'action': self.actions[index], 'reward': self.rewards[index], 'newState': self.newStates[index], 'isFinal': self.finals[index]})
return miniBatch
def getCurrentSize(self) :
return len(self.states)
def getMemory(self, index):
return {'state': self.states[index],'action': self.actions[index], 'reward': self.rewards[index], 'newState': self.newStates[index], 'isFinal': self.finals[index]}
def addMemory(self, state, action, reward, newState, isFinal) :
if (self.currentPosition >= self.size - 1) :
self.currentPosition = 0
if (len(self.states) > self.size) :
self.states[self.currentPosition] = state
self.actions[self.currentPosition] = action
self.rewards[self.currentPosition] = reward
self.newStates[self.currentPosition] = newState
self.finals[self.currentPosition] = isFinal
else :
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
self.newStates.append(newState)
self.finals.append(isFinal)
self.currentPosition += 1
@wingedsheep
Copy link
Author

I used the same Q learning algorithm as I used to solve the cartpole problem. (https://gym.openai.com/evaluations/eval_nI8cryNQaKlFKv592N7hQ)
The only thing is it doesn't seem to learn how to land but how to hover in a steady position. There is still some work left.

@arpan-dhatt
Copy link

I would give double and deuling DQN a try. They are very easy to implement. I am also having trouble getting mine to work, even with those 2 improvements. I even added prioritized replay. I believe hyperparameter tuning is the reason why yours and mine are not working as well as we expect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment