Skip to content

Instantly share code, notes, and snippets.

@wingedsheep
Created May 20, 2016 17:45
Show Gist options
  • Save wingedsheep/931c7f6a499d62ac8f1fbe83039e954b to your computer and use it in GitHub Desktop.
Save wingedsheep/931c7f6a499d62ac8f1fbe83039e954b to your computer and use it in GitHub Desktop.
# import the gym stuff
import gym
# import other stuff
import random
import numpy as np
# import own classes
from deepq import DeepQ
env = gym.make('CartPole-v0')
epochs = 3000
steps = 100000
updateTargetNetwork = 5000
initialUpdateTargetNetwork = 1000
explorationRate = 1
minibatch_size = 128
learnStart = 128
learningRate = 0.00025
discountFactor = 0.99
memorySize = 1000000
last100Scores = [0] * 100
last100ScoresIndex = 0
last100Filled = False
deepQ = DeepQ(4, 2, memorySize, discountFactor, learningRate, learnStart)
# deepQ.initNetworks([24, 16, 12, 8])
# deepQ.initNetworks([6])
deepQ.initNetworks([30, 30, 30])
stepCounter = 0
# number of reruns
for epoch in xrange(epochs):
observation = env.reset()
print explorationRate
# number of timesteps
for t in xrange(steps):
# env.render()
qValues = deepQ.getQValues(observation)
action = deepQ.selectAction(qValues, explorationRate)
newObservation, reward, done, info = env.step(action)
deepQ.addMemory(observation, action, reward, newObservation, done)
if stepCounter >= learnStart:
deepQ.learnOnMiniBatch(minibatch_size)
observation = newObservation
if done:
last100Scores[last100ScoresIndex] = t
last100ScoresIndex += 1
if last100ScoresIndex >= 100:
last100Filled = True
last100ScoresIndex = 0
print last100Filled
if not last100Filled:
print "Episode ",epoch," finished after {} timesteps".format(t+1)
else :
print "Episode ",epoch," finished after {} timesteps".format(t+1)," last 100 average: ",(sum(last100Scores)/len(last100Scores))
break
stepCounter += 1
if stepCounter < updateTargetNetwork:
currentUpdateTargetNetwork = initialUpdateTargetNetwork
else:
currentUpdateTargetNetwork = updateTargetNetwork
if stepCounter % currentUpdateTargetNetwork == 0:
deepQ.updateTargetNetwork()
print "updating target network"
explorationRate *= 0.995
# explorationRate -= (2.0/epochs)
explorationRate = max (0.05, explorationRate)
# import os
# os.environ["THEANO_FLAGS"] = "mode=FAST_RUN,device=gpu,floatX=float32"
# import theano
# import the neural net stuff
from keras.models import Sequential
from keras import optimizers
from keras.layers.core import Dense, Dropout, Activation
from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import LeakyReLU
from keras.regularizers import l2
# import other stuff
import random
import numpy as np
from memory import Memory
class DeepQ:
def __init__(self, inputs, outputs, memorySize, discountFactor, learningRate, learnStart):
self.input_size = inputs
self.output_size = outputs
self.memory = Memory(memorySize)
self.discountFactor = discountFactor
self.learnStart = learnStart
self.learningRate = learningRate
def initNetworks(self, hiddenLayers):
model = self.createModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate)
self.model = model
targetModel = self.createModel(self.input_size, self.output_size, hiddenLayers, "relu", self.learningRate)
self.targetModel = targetModel
def createModel(self, inputs, outputs, hiddenLayers, activationType, learningRate):
bias = True
dropout = 0
regularizationFactor = 0.01
model = Sequential()
if len(hiddenLayers) == 0:
model.add(Dense(self.output_size, input_shape=(self.input_size,), init='lecun_uniform', bias=bias))
model.add(Activation("linear"))
else :
model.add(Dense(hiddenLayers[0], input_shape=(self.input_size,), init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
for index in range(1, len(hiddenLayers)-1):
layerSize = hiddenLayers[index]
model.add(Dense(layerSize, init='lecun_uniform', W_regularizer=l2(regularizationFactor), bias=bias))
if dropout > 0:
model.add(Dropout(dropout))
if (activationType == "LeakyReLU") :
model.add(LeakyReLU(alpha=0.01))
else :
model.add(Activation(activationType))
model.add(Dense(self.output_size, init='lecun_uniform', bias=bias))
model.add(Activation("linear"))
optimizer = optimizers.RMSprop(lr=learningRate, rho=0.9, epsilon=1e-06)
model.compile(loss="mse", optimizer=optimizer)
return model
def printNetwork(self):
i = 0
for layer in self.model.layers:
weights = layer.get_weights()
print "layer ",i,": ",weights
i += 1
def backupNetwork(self, model, backup):
weightMatrix = []
for layer in model.layers:
weights = layer.get_weights()
weightMatrix.append(weights)
i = 0
for layer in backup.layers:
weights = weightMatrix[i]
layer.set_weights(weights)
i += 1
def updateTargetNetwork(self):
self.backupNetwork(self.model, self.targetModel)
# predict Q values for all the actions
def getQValues(self, state):
predicted = self.model.predict(state.reshape(1,len(state)))
return predicted[0]
def getTargetQValues(self, state):
predicted = self.targetModel.predict(state.reshape(1,len(state)))
return predicted[0]
def getMaxQ(self, qValues):
return np.max(qValues)
def getMaxIndex(self, qValues):
return np.argmax(qValues)
# calculate the target function
def calculateTarget(self, qValuesNewState, reward, isFinal):
if isFinal:
return reward
else :
return reward + self.discountFactor * self.getMaxQ(qValuesNewState)
# select the action with the highest Q value
def selectAction(self, qValues, explorationRate):
rand = random.random()
if rand < explorationRate :
action = np.random.randint(0, self.output_size)
else :
action = self.getMaxIndex(qValues)
return action
def selectActionByProbability(self, qValues, bias):
qValueSum = 0
shiftBy = 0
for value in qValues:
if value + shiftBy < 0:
shiftBy = - (value + shiftBy)
shiftBy += 1e-06
for value in qValues:
qValueSum += (value + shiftBy) ** bias
probabilitySum = 0
qValueProbabilities = []
for value in qValues:
probability = ((value + shiftBy) ** bias) / float(qValueSum)
qValueProbabilities.append(probability + probabilitySum)
probabilitySum += probability
qValueProbabilities[len(qValueProbabilities) - 1] = 1
rand = random.random()
i = 0
for value in qValueProbabilities:
if (rand <= value):
return i
i += 1
def addMemory(self, state, action, reward, newState, isFinal):
self.memory.addMemory(state, action, reward, newState, isFinal)
def learnOnLastState(self):
if self.memory.getCurrentSize() >= 1:
return self.memory.getMemory(self.memory.getCurrentSize() - 1)
def learnOnMiniBatch(self, miniBatchSize):
if self.memory.getCurrentSize() > self.learnStart :
miniBatch = self.memory.getMiniBatch(miniBatchSize)
X_batch = np.empty((0,self.input_size), dtype = np.float64)
Y_batch = np.empty((0,self.output_size), dtype = np.float64)
for sample in miniBatch:
isFinal = sample['isFinal']
state = sample['state']
action = sample['action']
reward = sample['reward']
newState = sample['newState']
qValues = self.getQValues(state)
qValuesNewState = self.getTargetQValues(newState)
targetValue = self.calculateTarget(qValuesNewState, reward, isFinal)
X_batch = np.append(X_batch, np.array([state.copy()]), axis=0)
Y_sample = qValues.copy()
Y_sample[action] = targetValue
Y_batch = np.append(Y_batch, np.array([Y_sample]), axis=0)
if isFinal:
X_batch = np.append(X_batch, np.array([newState.copy()]), axis=0)
Y_batch = np.append(Y_batch, np.array([[reward]*self.output_size]), axis=0)
self.model.fit(X_batch, Y_batch, batch_size = len(miniBatch), nb_epoch=1, verbose = 0)
import numpy as np
import random
class Memory:
def __init__(self, size):
self.size = size
self.currentPosition = 0
self.states = []
self.actions = []
self.rewards = []
self.newStates = []
self.finals = []
def getMiniBatch(self, size) :
indices = random.sample(np.arange(len(self.states)), min(size,len(self.states)) )
miniBatch = []
for index in indices:
miniBatch.append({'state': self.states[index],'action': self.actions[index], 'reward': self.rewards[index], 'newState': self.newStates[index], 'isFinal': self.finals[index]})
return miniBatch
def getCurrentSize(self) :
return len(self.states)
def getMemory(self, index):
return {'state': self.states[index],'action': self.actions[index], 'reward': self.rewards[index], 'newState': self.newStates[index], 'isFinal': self.finals[index]}
def addMemory(self, state, action, reward, newState, isFinal) :
if (self.currentPosition >= self.size - 1) :
self.currentPosition = 0
if (len(self.states) > self.size) :
self.states[self.currentPosition] = state
self.actions[self.currentPosition] = action
self.rewards[self.currentPosition] = reward
self.newStates[self.currentPosition] = newState
self.finals[self.currentPosition] = isFinal
else :
self.states.append(state)
self.actions.append(action)
self.rewards.append(reward)
self.newStates.append(newState)
self.finals.append(isFinal)
self.currentPosition += 1
@wingedsheep
Copy link
Author

The algorithm seems to learn fast at some point, when it finds a strategy that works. But after a while it seems to get bored by its success and starts doing other stuff again.

@wingedsheep
Copy link
Author

This is a deep Q learning approach. Inspired by the paper from deepmind (https://github.com/kuz/DeepMind-Atari-Deep-Q-Learner)
It uses replay memory to store its experiences, and learns on minibatches randomly taken from this replay memory.
Furthermore I used a target network, also described in the article by deepmind.

Things that are slightly different is that I used a larger batch size 128, and a regularization factor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment