Last active
January 25, 2019 18:33
-
-
Save keithmgould/f99296260b4a739ac6b523ad71e1ec90 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# OpenGym MountainCar-v0 | |
# ------------------- | |
# | |
# This code demonstrates debugging of a basic Q-network (without target network) | |
# in an OpenGym MountainCar-v0 environment. | |
# | |
# Made as part of blog series Let's make a DQN, available at: | |
# https://jaromiru.com/2016/10/12/lets-make-a-dqn-debugging/ | |
# | |
# author: Jaromir Janisch, 2016 | |
# | |
# edited by Keith to make the env last 1000 steps... | |
import random, numpy, math, gym, time | |
#-------------------- BRAIN --------------------------- | |
from keras.models import Sequential | |
from keras.layers import * | |
from keras.optimizers import * | |
class Brain: | |
def __init__(self, stateCnt, actionCnt): | |
self.stateCnt = stateCnt | |
self.actionCnt = actionCnt | |
self.model = self._createModel() | |
# self.model.load_weights("models/MountainCar-256-256-basic.h5") | |
def _createModel(self): | |
model = Sequential() | |
model.add(Dense(units=64, activation='relu', input_dim=stateCnt)) | |
# model.add(Dense(units=256, activation='relu')) | |
model.add(Dense(units=actionCnt, activation='linear')) | |
opt = RMSprop(lr=0.00025) | |
model.compile(loss='mse', optimizer=opt) | |
return model | |
def train(self, x, y, epoch=1, verbose=0): | |
self.model.fit(x, y, batch_size=64, nb_epoch=epoch, verbose=verbose) | |
def predict(self, s): | |
return self.model.predict(s) | |
def predictOne(self, s): | |
return self.predict(s.reshape(1, self.stateCnt)).flatten() | |
#-------------------- MEMORY -------------------------- | |
class Memory: # stored as ( s, a, r, s_ ) | |
samples = [] | |
def __init__(self, capacity): | |
self.capacity = capacity | |
def add(self, sample): | |
self.samples.append(sample) | |
if len(self.samples) > self.capacity: | |
self.samples.pop(0) | |
def sample(self, n): | |
n = min(n, len(self.samples)) | |
return random.sample(self.samples, n) | |
def isFull(self): | |
return len(self.samples) >= self.capacity | |
#-------------------- AGENT --------------------------- | |
MEMORY_CAPACITY = 100000 | |
BATCH_SIZE = 64 | |
GAMMA = 0.99 | |
MAX_EPSILON = 1 | |
MIN_EPSILON = 0.1 | |
LAMBDA = 0.001 # speed of decay | |
class Agent: | |
steps = 0 | |
epsilon = MAX_EPSILON | |
def __init__(self, stateCnt, actionCnt): | |
self.stateCnt = stateCnt | |
self.actionCnt = actionCnt | |
self.brain = Brain(stateCnt, actionCnt) | |
self.memory = Memory(MEMORY_CAPACITY) | |
def act(self, s): | |
if random.random() < self.epsilon: | |
return random.randint(0, self.actionCnt-1) | |
else: | |
return numpy.argmax(self.brain.predictOne(s)) | |
def observe(self, sample): # in (s, a, r, s_) format | |
self.memory.add(sample) | |
# slowly decrease Epsilon based on our eperience | |
self.steps += 1 | |
self.epsilon = MIN_EPSILON + (MAX_EPSILON - MIN_EPSILON) * math.exp(-LAMBDA * self.steps) | |
def replay(self): | |
batch = self.memory.sample(BATCH_SIZE) | |
batchLen = len(batch) | |
no_state = numpy.zeros(self.stateCnt) | |
states = numpy.array([ o[0] for o in batch ]) | |
states_ = numpy.array([ (no_state if o[3] is None else o[3]) for o in batch ]) | |
p = agent.brain.predict(states) | |
p_ = agent.brain.predict(states_) | |
x = numpy.zeros((batchLen, self.stateCnt)) | |
y = numpy.zeros((batchLen, self.actionCnt)) | |
for i in range(batchLen): | |
o = batch[i] | |
s = o[0]; a = o[1]; r = o[2]; s_ = o[3] | |
t = p[i] | |
if s_ is None: | |
t[a] = r | |
else: | |
t[a] = r + GAMMA * numpy.amax(p_[i]) | |
x[i] = s | |
y[i] = t | |
self.brain.train(x, y) | |
class RandomAgent: | |
memory = Memory(MEMORY_CAPACITY) | |
def __init__(self, actionCnt): | |
self.actionCnt = actionCnt | |
def act(self, s): | |
return random.randint(0, self.actionCnt-1) | |
def observe(self, sample): # in (s, a, r, s_) format | |
self.memory.add(sample) | |
def replay(self): | |
pass | |
#-------------------- ENVIRONMENT --------------------- | |
class Environment: | |
def __init__(self, problem): | |
self.problem = problem | |
self.env = gym.make(problem) | |
high = self.env.observation_space.high | |
low = self.env.observation_space.low | |
self.mean = (high + low) / 2 | |
self.spread = abs(high - low) / 2 | |
def normalize(self, s): | |
return (s - self.mean) / self.spread | |
def run(self, agent, render=False): | |
s = self.env.reset() | |
s = self.normalize(s) | |
R = 0 | |
while True: | |
if render: | |
self.env.render() | |
a = agent.act(s) # map actions; 0 = left, 2 = right | |
if a == 0: | |
a_ = 0 | |
elif a == 1: | |
a_ = 2 | |
s_, r, done, info = self.env.step(a_) | |
s_ = self.normalize(s_) | |
if done: # terminal state | |
s_ = None | |
agent.observe( (s, a, r, s_) ) | |
agent.replay() | |
s = s_ | |
R += r | |
if done: | |
break | |
return R | |
#-------------------- MAIN ---------------------------- | |
gym.envs.register( | |
id='MountainCarMyEasyVersion-v0', | |
entry_point='gym.envs.classic_control:MountainCarEnv', | |
max_episode_steps=1000, # MountainCar-v0 uses 200 | |
reward_threshold=-110.0, | |
) | |
PROBLEM = 'MountainCarMyEasyVersion-v0' | |
env = Environment(PROBLEM) | |
stateCnt = env.env.observation_space.shape[0] | |
actionCnt = 2 #env.env.action_space.n | |
agent = Agent(stateCnt, actionCnt) | |
randomAgent = RandomAgent(actionCnt) | |
try: | |
while randomAgent.memory.isFull() == False: | |
env.run(randomAgent, False) | |
agent.memory = randomAgent.memory | |
randomAgent = None | |
run_counter = 0 | |
while True: | |
run_counter += 1 | |
total_reward = env.run(agent, False) | |
print("{} - total reward: {}".format(run_counter,total_reward)) | |
finally: | |
timestr = time.strftime("%Y%m%d-%H%M%S") | |
agent.brain.model.save("models/MountainCar-64_"+timestr+"_basic.h5") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment