Skip to content

Instantly share code, notes, and snippets.

@keithmgould
Last active January 25, 2019 18:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save keithmgould/f99296260b4a739ac6b523ad71e1ec90 to your computer and use it in GitHub Desktop.
Save keithmgould/f99296260b4a739ac6b523ad71e1ec90 to your computer and use it in GitHub Desktop.
# OpenGym MountainCar-v0
# -------------------
#
# This code demonstrates debugging of a basic Q-network (without target network)
# in an OpenGym MountainCar-v0 environment.
#
# Made as part of blog series Let's make a DQN, available at:
# https://jaromiru.com/2016/10/12/lets-make-a-dqn-debugging/
#
# author: Jaromir Janisch, 2016
#
# edited by Keith to make the env last 1000 steps...
import random, numpy, math, gym, time
#-------------------- BRAIN ---------------------------
from keras.models import Sequential
from keras.layers import *
from keras.optimizers import *
class Brain:
def __init__(self, stateCnt, actionCnt):
self.stateCnt = stateCnt
self.actionCnt = actionCnt
self.model = self._createModel()
# self.model.load_weights("models/MountainCar-256-256-basic.h5")
def _createModel(self):
model = Sequential()
model.add(Dense(units=64, activation='relu', input_dim=stateCnt))
# model.add(Dense(units=256, activation='relu'))
model.add(Dense(units=actionCnt, activation='linear'))
opt = RMSprop(lr=0.00025)
model.compile(loss='mse', optimizer=opt)
return model
def train(self, x, y, epoch=1, verbose=0):
self.model.fit(x, y, batch_size=64, nb_epoch=epoch, verbose=verbose)
def predict(self, s):
return self.model.predict(s)
def predictOne(self, s):
return self.predict(s.reshape(1, self.stateCnt)).flatten()
#-------------------- MEMORY --------------------------
class Memory: # stored as ( s, a, r, s_ )
samples = []
def __init__(self, capacity):
self.capacity = capacity
def add(self, sample):
self.samples.append(sample)
if len(self.samples) > self.capacity:
self.samples.pop(0)
def sample(self, n):
n = min(n, len(self.samples))
return random.sample(self.samples, n)
def isFull(self):
return len(self.samples) >= self.capacity
#-------------------- AGENT ---------------------------
MEMORY_CAPACITY = 100000
BATCH_SIZE = 64
GAMMA = 0.99
MAX_EPSILON = 1
MIN_EPSILON = 0.1
LAMBDA = 0.001 # speed of decay
class Agent:
steps = 0
epsilon = MAX_EPSILON
def __init__(self, stateCnt, actionCnt):
self.stateCnt = stateCnt
self.actionCnt = actionCnt
self.brain = Brain(stateCnt, actionCnt)
self.memory = Memory(MEMORY_CAPACITY)
def act(self, s):
if random.random() < self.epsilon:
return random.randint(0, self.actionCnt-1)
else:
return numpy.argmax(self.brain.predictOne(s))
def observe(self, sample): # in (s, a, r, s_) format
self.memory.add(sample)
# slowly decrease Epsilon based on our eperience
self.steps += 1
self.epsilon = MIN_EPSILON + (MAX_EPSILON - MIN_EPSILON) * math.exp(-LAMBDA * self.steps)
def replay(self):
batch = self.memory.sample(BATCH_SIZE)
batchLen = len(batch)
no_state = numpy.zeros(self.stateCnt)
states = numpy.array([ o[0] for o in batch ])
states_ = numpy.array([ (no_state if o[3] is None else o[3]) for o in batch ])
p = agent.brain.predict(states)
p_ = agent.brain.predict(states_)
x = numpy.zeros((batchLen, self.stateCnt))
y = numpy.zeros((batchLen, self.actionCnt))
for i in range(batchLen):
o = batch[i]
s = o[0]; a = o[1]; r = o[2]; s_ = o[3]
t = p[i]
if s_ is None:
t[a] = r
else:
t[a] = r + GAMMA * numpy.amax(p_[i])
x[i] = s
y[i] = t
self.brain.train(x, y)
class RandomAgent:
memory = Memory(MEMORY_CAPACITY)
def __init__(self, actionCnt):
self.actionCnt = actionCnt
def act(self, s):
return random.randint(0, self.actionCnt-1)
def observe(self, sample): # in (s, a, r, s_) format
self.memory.add(sample)
def replay(self):
pass
#-------------------- ENVIRONMENT ---------------------
class Environment:
def __init__(self, problem):
self.problem = problem
self.env = gym.make(problem)
high = self.env.observation_space.high
low = self.env.observation_space.low
self.mean = (high + low) / 2
self.spread = abs(high - low) / 2
def normalize(self, s):
return (s - self.mean) / self.spread
def run(self, agent, render=False):
s = self.env.reset()
s = self.normalize(s)
R = 0
while True:
if render:
self.env.render()
a = agent.act(s) # map actions; 0 = left, 2 = right
if a == 0:
a_ = 0
elif a == 1:
a_ = 2
s_, r, done, info = self.env.step(a_)
s_ = self.normalize(s_)
if done: # terminal state
s_ = None
agent.observe( (s, a, r, s_) )
agent.replay()
s = s_
R += r
if done:
break
return R
#-------------------- MAIN ----------------------------
gym.envs.register(
id='MountainCarMyEasyVersion-v0',
entry_point='gym.envs.classic_control:MountainCarEnv',
max_episode_steps=1000, # MountainCar-v0 uses 200
reward_threshold=-110.0,
)
PROBLEM = 'MountainCarMyEasyVersion-v0'
env = Environment(PROBLEM)
stateCnt = env.env.observation_space.shape[0]
actionCnt = 2 #env.env.action_space.n
agent = Agent(stateCnt, actionCnt)
randomAgent = RandomAgent(actionCnt)
try:
while randomAgent.memory.isFull() == False:
env.run(randomAgent, False)
agent.memory = randomAgent.memory
randomAgent = None
run_counter = 0
while True:
run_counter += 1
total_reward = env.run(agent, False)
print("{} - total reward: {}".format(run_counter,total_reward))
finally:
timestr = time.strftime("%Y%m%d-%H%M%S")
agent.brain.model.save("models/MountainCar-64_"+timestr+"_basic.h5")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment