Skip to content

Instantly share code, notes, and snippets.

@JKCooper2
Last active June 30, 2016 21:55
Show Gist options
  • Save JKCooper2/25e7343e0c3de046e122c6028c5c8c43 to your computer and use it in GitHub Desktop.
Save JKCooper2/25e7343e0c3de046e122c6028c5c8c43 to your computer and use it in GitHub Desktop.
[Open AI] Acrobot-v0 Simulated Annealing v1
Simulated Annealing applied to acrobot:
Extend ideas from: https://gym.openai.com/evaluations/eval_GaB43XLAS7mVYcmE6cfoQ
I've added some additional things to the class in different attempts to solve this:
self.changes - Adds the difference to the previous state into the calculation
self.n_prev_obs - Holds the last n previous observations and using those in the calculation
However the best results came without needing to use either of these
Good Things:
- Solves quickly (can't get much better than 0)
Requirements:
- Action effects must be linear
- Starting value of alpha can't be too small (would result in no actions being selected where pred > 1 or pred < -1
import logging
import gym
from SimulatedAnnealing import SimulatedAnnealingAgent
def main():
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
env = gym.make('Acrobot-v0')
agent = SimulatedAnnealingAgent(env.action_space, decay=0.8, alpha=3)
upload = True # Sets whether to upload to OpenAI
outdir = '/tmp/' + agent.name + '-results'
env.monitor.start(outdir, force=True)
episode_count = 1000
for i in xrange(episode_count):
ob = env.reset()
done = False
reward = 0
while not done:
action = agent.act(ob, reward, done)
ob, reward, done, _ = env.step(action)
agent.act(ob, reward, done) # Final push required so agent receives notification that episode is done and can update model
# Dump result info to disk
env.monitor.close()
if __name__ == '__main__':
main()
import numpy as np
import copy
np.random.seed(0) # So scores are (slightly) more consistent. Randomness in pole counteracts this a bit
class SimulatedAnnealingAgent(object):
def __init__(self, action_space, repeats=1, alpha=1, decay=0.9, n_prev_obs=0, changes=False):
self.name = 'SimAnn' # Name to be submitted to OpenAI
self.action_space = action_space # Just for consistency with other agents, not used in this case
self.alpha = alpha # Spread when selecting new values to test
self.decay = decay # Decay in impact of alpha
self.repeats = repeats # Number of times to repeat testing a value
self.obs_count = 0 # Number of observation returned (can probably get from the environment somehow)
self.best = [] # Holds best values (set on first run of action)
self.test = [] # Holds test values
self.n_prev_obs = n_prev_obs # Number of previous observations to hold
self.prev_obs = [] # Holds previous observations (also used for determining changes
self.changes = changes # Whether to add changes between the current and last state to the model
self.best_score = 0 # Current max score found
self.best_count = 0 # Times hit max score (used for bounded problems)
self.ep_score = 0 # Total score for episode
self.repeat_count = 0 # Times repeated running test
self.start_score = None # Starting score for model
# Set the new test values at the start of the episode
def set_test(self):
# If less than required repeats than just run again
if self.repeat_count < self.repeats:
self.repeat_count += 1
return self.test
# Else reset repeat count and set new values based on current best, spread and alpha
self.repeat_count = 0
# Returns a normalised spread around 0 with std dev of alpha
return [self.best[i] + np.random.normal(scale=self.alpha) for i in range(self.obs_count)]
# Choose action based on observed values
def choose_action(self, observation):
# Previous observations holds last n_prev_obs + len(observation) values with higher indexes being more recent values
# Discard the first len(observation) values is equivalent of discarding the furthest back stored observation
self.prev_obs = self.prev_obs[len(observation):]
# Then add the newest observation to the end
self.prev_obs.extend(observation)
# Make a non-referential copy to changes can be added if required
test = copy.copy(self.prev_obs)
if self.changes:
# calculate the difference between the current and previous state and add that to the test sample
change_obs = [observation[i] - self.prev_obs[i] for i in range(len(observation))]
test.extend(change_obs)
score = sum(test[i] * self.test[i] for i in range(len(test)))
# Took me ages to realise there were three actions.
# Linear selection on works because actions effects are linear
# I can't see it working for more complex problems
if score < -1:
return 0
elif score > 1:
return 2
# else score >= -1 and score <= 1
return 1
# If get the same ep score then update best to average of all values that have reached the best score
def update_best(self):
self.best = [(self.best[i] * self.best_count + self.test[i])/(self.best_count + 1) for i in range(self.obs_count)]
self.best_count += 1
# What gets called
def act(self, observation, reward, done):
# Set initial values if first time agent is seeing observations
if self.obs_count == 0:
self.obs_count = len(observation) * (1 + self.n_prev_obs + self.changes) # Make rooms for storing additional observations
self.best = [0] * self.obs_count
self.test = self.best
# Fill the previous observations up to the correct length
self.prev_obs = [observation for _ in range(1 + self.n_prev_obs)]
self.prev_obs = [value for observation in self.prev_obs for value in observation]
# Set new test values for new episode
if self.ep_score == 0:
self.test = self.set_test()
# Select action
action = self.choose_action(observation)
# Update episode score
self.ep_score += reward
if done:
# Because scores counts down to a capped value keep a copy of the first score found (likely a bad one)
# This is so if the scores are equal that alpha isn't reduced when both scores are the lower bound
if self.start_score is None:
self.start_score = self.ep_score
self.best_score = self.ep_score
# If score is the same as best then the amount of variance in future choices goes down
# Set the new best to be the average of all the best scores so far (using incremental mean)
if self.ep_score == self.best_score and self.ep_score != self.start_score:
self.alpha *= self.decay
self.update_best()
# If new score is greater then set everything to that
elif self.ep_score > self.best_score:
self.best_score = self.ep_score
self.best = self.test
self.best_count = 0
self.alpha *= self.decay
self.ep_score = 0
return action
@baoblackcoal
Copy link

Hmm, random seeds that like human‘s congenital factors, that's why somebody's IQ is higher than others. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment