Skip to content

Instantly share code, notes, and snippets.

@wingedsheep
Created May 23, 2016 05:49
Show Gist options
  • Save wingedsheep/092f8fb8aaec30e9130c8e21a3330443 to your computer and use it in GitHub Desktop.
Save wingedsheep/092f8fb8aaec30e9130c8e21a3330443 to your computer and use it in GitHub Desktop.
# import os
# os.environ["THEANO_FLAGS"] = "mode=FAST_RUN,device=gpu,floatX=float32"
# import theano
# import the neural net stuff
from keras.models import Sequential
from keras import optimizers
from keras.layers.core import Dense, Dropout, Activation
from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import LeakyReLU
from keras.regularizers import l2
# import other stuff
import random
import numpy as np
from memory import Memory
class DeepQ:
def __init__(self, inputs, outputs, memorySize, discountFactor, learningRate, learnStart):
self.input_size = inputs
self.output_size = outputs
self.memory = Memory(memorySize)
self.discountFactor = discountFactor
self.learnStart = learnStart
self.learningRate = learningRate
def initNetworks(self, hiddenLayers):
model = self.createModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate)
self.model = model
targetModel = self.createModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate)
self.targetModel = targetModel
def createRegularizedModel(self, inputs, outputs, hiddenLayers, activationType, learningRate):
bias = True
dropout = 0
regularizationFactor = 0.01
model = Sequential()
if len(hiddenLayers) == 0:
model.add(Dense(self.output_size, input_shape=(self.input_size,), init='lecun_uniform', bias=bias))
model.add(Activation("linear"))
else :
if regularizationFactor > 0:
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias))
else:
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform', bias=bias))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
for index in range(1, len(hiddenLayers)-1):
layerSize = hiddenLayers[index]
if regularizationFactor > 0:
model.add(Dense(layerSize, init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias))
else:
model.add(Dense(layerSize, init='lecun_uniform', bias=bias))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
if dropout > 0:
model.add(Dropout(dropout))
model.add(Dense(self.output_size, init='lecun_uniform', bias=bias))
model.add(Activation("linear"))
optimizer = optimizers.RMSprop(lr=learningRate, rho=0.9, epsilon=1e-06)
model.compile(loss="mse", optimizer=optimizer)
return model
def createModel(self, inputs, outputs, hiddenLayers, activationType, learningRate):
model = Sequential()
if len(hiddenLayers) == 0:
model.add(Dense(self.output_size, input_shape=(self.input_size,), init='lecun_uniform'))
model.add(Activation("linear"))
else :
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform'))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
for index in range(1, len(hiddenLayers)-1):
layerSize = hiddenLayers[index]
model.add(Dense(layerSize, init='lecun_uniform'))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
model.add(Dense(self.output_size, init='lecun_uniform'))
model.add(Activation("linear"))
optimizer = optimizers.RMSprop(lr=learningRate, rho=0.9, epsilon=1e-06)
model.compile(loss="mse", optimizer=optimizer)
return model
def printNetwork(self):
i = 0
for layer in self.model.layers:
weights = layer.get_weights()
print "layer ",i,": ",weights
i += 1
def backupNetwork(self, model, backup):
weightMatrix = []
for layer in model.layers:
weights = layer.get_weights()
weightMatrix.append(weights)
i = 0
for layer in backup.layers:
weights = weightMatrix[i]
layer.set_weights(weights)
i += 1
def updateTargetNetwork(self):
self.backupNetwork(self.model, self.targetModel)
# predict Q values for all the actions
def getQValues(self, state):
predicted = self.model.predict(state.reshape(1,len(state)))
return predicted[0]
def getTargetQValues(self, state):
predicted = self.targetModel.predict(state.reshape(1,len(state)))
return predicted[0]
def getMaxQ(self, qValues):
return np.max(qValues)
def getMaxIndex(self, qValues):
return np.argmax(qValues)
# calculate the target function
def calculateTarget(self, qValuesNewState, reward, isFinal):
if isFinal:
return reward
else :
return reward + self.discountFactor * self.getMaxQ(qValuesNewState)
# select the action with the highest Q value
def selectAction(self, qValues, explorationRate):
rand = random.random()
if rand < explorationRate :
action = np.random.randint(0, self.output_size)
else :
action = self.getMaxIndex(qValues)
return action
def selectActionByProbability(self, qValues, bias):
qValueSum = 0
shiftBy = 0
for value in qValues:
if value + shiftBy < 0:
shiftBy = - (value + shiftBy)
shiftBy += 1e-06
for value in qValues:
qValueSum += (value + shiftBy) ** bias
probabilitySum = 0
qValueProbabilities = []
for value in qValues:
probability = ((value + shiftBy) ** bias) / float(qValueSum)
qValueProbabilities.append(probability + probabilitySum)
probabilitySum += probability
qValueProbabilities[len(qValueProbabilities) - 1] = 1
rand = random.random()
i = 0
for value in qValueProbabilities:
if (rand <= value):
return i
i += 1
def addMemory(self, state, action, reward, newState, isFinal):
self.memory.addMemory(state, action, reward, newState, isFinal)
def learnOnLastState(self):
if self.memory.getCurrentSize() >= 1:
return self.memory.getMemory(self.memory.getCurrentSize() - 1)
def learnOnMiniBatch(self, miniBatchSize, useTargetNetwork=True):
if self.memory.getCurrentSize() > self.learnStart :
miniBatch = self.memory.getMiniBatch(miniBatchSize)
X_batch = np.empty((0,self.input_size), dtype = np.float64)
Y_batch = np.empty((0,self.output_size), dtype = np.float64)
for sample in miniBatch:
isFinal = sample['isFinal']
state = sample['state']
action = sample['action']
reward = sample['reward']
newState = sample['newState']
qValues = self.getQValues(state)
if useTargetNetwork:
qValuesNewState = self.getTargetQValues(newState)
else :
qValuesNewState = self.getQValues(newState)
targetValue = self.calculateTarget(qValuesNewState, reward, isFinal)
X_batch = np.append(X_batch, np.array([state.copy()]), axis=0)
Y_sample = qValues.copy()
Y_sample[action] = targetValue
Y_batch = np.append(Y_batch, np.array([Y_sample]), axis=0)
self.model.fit(X_batch, Y_batch, batch_size = len(miniBatch), nb_epoch=1, verbose = 0)
# import the gym stuff
import gym
# import other stuff
import random
import numpy as np
# import own classes
from deepq import DeepQ
print(gym.envs.registry.all())
env = gym.make('LunarLander-v1')
epochs = 10000
steps = 1000
updateTargetNetwork = 10000
explorationRate = 1
minibatch_size = 32
learnStart = 32
learningRate = 0.00025
discountFactor = 0.99
memorySize = 1000000
last100Scores = [0] * 100
last100ScoresIndex = 0
last100Filled = False
renderPerXEpochs = 50
shouldRender = True
deepQ = DeepQ(len(env.observation_space.high), env.action_space.n, memorySize, discountFactor, learningRate, learnStart)
deepQ.initNetworks([30,30,30])
stepCounter = 0
# number of reruns
for epoch in xrange(epochs):
observation = env.reset()
print explorationRate
# number of timesteps
totalReward = 0
for t in xrange(steps):
if epoch % renderPerXEpochs == 0 and shouldRender:
env.render()
qValues = deepQ.getQValues(observation)
action = deepQ.selectAction(qValues, explorationRate)
newObservation, reward, done, info = env.step(action)
totalReward += reward
deepQ.addMemory(observation, action, reward, newObservation, done)
if stepCounter >= learnStart:
if stepCounter <= updateTargetNetwork:
deepQ.learnOnMiniBatch(minibatch_size, False)
else :
deepQ.learnOnMiniBatch(minibatch_size, True)
observation = newObservation
if done:
last100Scores[last100ScoresIndex] = totalReward
last100ScoresIndex += 1
if last100ScoresIndex >= 100:
last100Filled = True
last100ScoresIndex = 0
if not last100Filled:
print "Episode ",epoch," finished after {} timesteps".format(t+1)," with total reward",totalReward
else :
print "Episode ",epoch," finished after {} timesteps".format(t+1)," with total reward",totalReward," last 100 average: ",(sum(last100Scores)/len(last100Scores))
break
stepCounter += 1
if stepCounter % updateTargetNetwork == 0:
deepQ.updateTargetNetwork()
print "updating target network"
explorationRate *= 0.995
explorationRate = max (0.05, explorationRate)
import numpy as np
import random
class Memory:
def __init__(self, size):
self.size = size
self.currentPosition = 0
self.states = []
self.actions = []
self.rewards = []
self.newStates = []
self.finals = []
def getMiniBatch(self, size) :
indices = random.sample(np.arange(len(self.states)), min(size,len(self.states)) )
miniBatch = []
for index in indices:
miniBatch.append({'state': self.states[index],'action': self.actions[index], 'reward': self.rewards[index], 'newState': self.newStates[index], 'isFinal': self.finals[index]})
return miniBatch
def getCurrentSize(self) :
return len(self.states)
def getMemory(self, index):
return {'state': self.states[index],'action': self.actions[index], 'reward': self.rewards[index], 'newState': self.newStates[index], 'isFinal': self.finals[index]}
def addMemory(self, state, action, reward, newState, isFinal) :
if (self.currentPosition >= self.size - 1) :
self.currentPosition = 0
if (len(self.states) > self.size) :
self.states[self.currentPosition] = state
self.actions[self.currentPosition] = action
self.rewards[self.currentPosition] = reward
self.newStates[self.currentPosition] = newState
self.finals[self.currentPosition] = isFinal
else :
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
self.newStates.append(newState)
self.finals.append(isFinal)
self.currentPosition += 1
@wingedsheep
Copy link
Author

I used the same Q learning algorithm as I used to solve the cartpole problem. (https://gym.openai.com/evaluations/eval_nI8cryNQaKlFKv592N7hQ)
Only this time with more iterations and a smaller minibatch size for training.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment