Last active
June 30, 2016 21:55
-
-
Save JKCooper2/25e7343e0c3de046e122c6028c5c8c43 to your computer and use it in GitHub Desktop.
[Open AI] Acrobot-v0 Simulated Annealing v1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Simulated Annealing applied to acrobot: | |
Extend ideas from: https://gym.openai.com/evaluations/eval_GaB43XLAS7mVYcmE6cfoQ | |
I've added some additional things to the class in different attempts to solve this: | |
self.changes - Adds the difference to the previous state into the calculation | |
self.n_prev_obs - Holds the last n previous observations and using those in the calculation | |
However the best results came without needing to use either of these | |
Good Things: | |
- Solves quickly (can't get much better than 0) | |
Requirements: | |
- Action effects must be linear | |
- Starting value of alpha can't be too small (would result in no actions being selected where pred > 1 or pred < -1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import gym | |
from SimulatedAnnealing import SimulatedAnnealingAgent | |
def main(): | |
logger = logging.getLogger() | |
logger.setLevel(logging.DEBUG) | |
env = gym.make('Acrobot-v0') | |
agent = SimulatedAnnealingAgent(env.action_space, decay=0.8, alpha=3) | |
upload = True # Sets whether to upload to OpenAI | |
outdir = '/tmp/' + agent.name + '-results' | |
env.monitor.start(outdir, force=True) | |
episode_count = 1000 | |
for i in xrange(episode_count): | |
ob = env.reset() | |
done = False | |
reward = 0 | |
while not done: | |
action = agent.act(ob, reward, done) | |
ob, reward, done, _ = env.step(action) | |
agent.act(ob, reward, done) # Final push required so agent receives notification that episode is done and can update model | |
# Dump result info to disk | |
env.monitor.close() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import copy | |
np.random.seed(0) # So scores are (slightly) more consistent. Randomness in pole counteracts this a bit | |
class SimulatedAnnealingAgent(object): | |
def __init__(self, action_space, repeats=1, alpha=1, decay=0.9, n_prev_obs=0, changes=False): | |
self.name = 'SimAnn' # Name to be submitted to OpenAI | |
self.action_space = action_space # Just for consistency with other agents, not used in this case | |
self.alpha = alpha # Spread when selecting new values to test | |
self.decay = decay # Decay in impact of alpha | |
self.repeats = repeats # Number of times to repeat testing a value | |
self.obs_count = 0 # Number of observation returned (can probably get from the environment somehow) | |
self.best = [] # Holds best values (set on first run of action) | |
self.test = [] # Holds test values | |
self.n_prev_obs = n_prev_obs # Number of previous observations to hold | |
self.prev_obs = [] # Holds previous observations (also used for determining changes | |
self.changes = changes # Whether to add changes between the current and last state to the model | |
self.best_score = 0 # Current max score found | |
self.best_count = 0 # Times hit max score (used for bounded problems) | |
self.ep_score = 0 # Total score for episode | |
self.repeat_count = 0 # Times repeated running test | |
self.start_score = None # Starting score for model | |
# Set the new test values at the start of the episode | |
def set_test(self): | |
# If less than required repeats than just run again | |
if self.repeat_count < self.repeats: | |
self.repeat_count += 1 | |
return self.test | |
# Else reset repeat count and set new values based on current best, spread and alpha | |
self.repeat_count = 0 | |
# Returns a normalised spread around 0 with std dev of alpha | |
return [self.best[i] + np.random.normal(scale=self.alpha) for i in range(self.obs_count)] | |
# Choose action based on observed values | |
def choose_action(self, observation): | |
# Previous observations holds last n_prev_obs + len(observation) values with higher indexes being more recent values | |
# Discard the first len(observation) values is equivalent of discarding the furthest back stored observation | |
self.prev_obs = self.prev_obs[len(observation):] | |
# Then add the newest observation to the end | |
self.prev_obs.extend(observation) | |
# Make a non-referential copy to changes can be added if required | |
test = copy.copy(self.prev_obs) | |
if self.changes: | |
# calculate the difference between the current and previous state and add that to the test sample | |
change_obs = [observation[i] - self.prev_obs[i] for i in range(len(observation))] | |
test.extend(change_obs) | |
score = sum(test[i] * self.test[i] for i in range(len(test))) | |
# Took me ages to realise there were three actions. | |
# Linear selection on works because actions effects are linear | |
# I can't see it working for more complex problems | |
if score < -1: | |
return 0 | |
elif score > 1: | |
return 2 | |
# else score >= -1 and score <= 1 | |
return 1 | |
# If get the same ep score then update best to average of all values that have reached the best score | |
def update_best(self): | |
self.best = [(self.best[i] * self.best_count + self.test[i])/(self.best_count + 1) for i in range(self.obs_count)] | |
self.best_count += 1 | |
# What gets called | |
def act(self, observation, reward, done): | |
# Set initial values if first time agent is seeing observations | |
if self.obs_count == 0: | |
self.obs_count = len(observation) * (1 + self.n_prev_obs + self.changes) # Make rooms for storing additional observations | |
self.best = [0] * self.obs_count | |
self.test = self.best | |
# Fill the previous observations up to the correct length | |
self.prev_obs = [observation for _ in range(1 + self.n_prev_obs)] | |
self.prev_obs = [value for observation in self.prev_obs for value in observation] | |
# Set new test values for new episode | |
if self.ep_score == 0: | |
self.test = self.set_test() | |
# Select action | |
action = self.choose_action(observation) | |
# Update episode score | |
self.ep_score += reward | |
if done: | |
# Because scores counts down to a capped value keep a copy of the first score found (likely a bad one) | |
# This is so if the scores are equal that alpha isn't reduced when both scores are the lower bound | |
if self.start_score is None: | |
self.start_score = self.ep_score | |
self.best_score = self.ep_score | |
# If score is the same as best then the amount of variance in future choices goes down | |
# Set the new best to be the average of all the best scores so far (using incremental mean) | |
if self.ep_score == self.best_score and self.ep_score != self.start_score: | |
self.alpha *= self.decay | |
self.update_best() | |
# If new score is greater then set everything to that | |
elif self.ep_score > self.best_score: | |
self.best_score = self.ep_score | |
self.best = self.test | |
self.best_count = 0 | |
self.alpha *= self.decay | |
self.ep_score = 0 | |
return action |
Hmm, random seeds that like human‘s congenital factors, that's why somebody's IQ is higher than others. :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I realised that the result is heavily influence by the random seed used (0 just happened to work very well). Testing 3 trials without setting the random seed resulted in 0, 187, and 266 episodes to solve. It may be better to not set random seeds on uploads and the the range of scores from reproductions can show the expected performance