Skip to content

Instantly share code, notes, and snippets.

@bercikr
Created July 28, 2017 17:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bercikr/029056b0639126bd019aa91ffe408a98 to your computer and use it in GitHub Desktop.
Save bercikr/029056b0639126bd019aa91ffe408a98 to your computer and use it in GitHub Desktop.
# adapted from https://gist.github.com/tsdaemon/8a8ac88361b2fb94348e59f95d63cf56
import gym
from gym import configuration
import numpy as np
import logging, sys
configuration.undo_logger_setup()
format = '%(asctime)s:%(levelname)s %(filename)s(%(lineno)s): %(message)s'
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=format)
logger = logging.getLogger(__name__)
def episode_f(w, env, goal_steps, actions):
logger.debug('run mini batch: {}'.format(w))
done = False
observation = env.reset()
ep_steps_done = 0
success = False
while not done:
action = sigm_policy(w, observation, actions)
observation, reward, done, info = env.step(action)
if reward == 0.0 and ep_steps_done < goal_steps:
logger.info('steps: {}'.format(ep_steps_done))
success = True
ep_steps_done += 1
return steps_evaluation(ep_steps_done, goal_steps, success), ep_steps_done
def sigm_policy(weights, state, actions):
inner = np.inner(state, weights)
if inner < 0.0:
action = actions.sample()
else:
action = 0
return action
def steps_evaluation(n_steps, goal_steps, success):
if success:
return -1
else:
return 1 - n_steps
def main():
np.random.seed(42)
env = gym.make('Acrobot-v1')
env = gym.wrappers.Monitor(env, 'nes', force=True)
alpha = 0.2
actions = range(env.action_space.n)
# could be any length
n_episodes = 1000
n_additional_episodes = 100
goal_steps = 150
sum_reward_goal = 450
n_states = env.observation_space.shape[0]
w_states = np.zeros(n_states)
n_episodes_in_batch = 10
reward_h = []
rewards = np.zeros(n_episodes_in_batch)
sigma = 0.05
for j in range(n_episodes):
#env.render()
logger.info('w_states: {}'.format(w_states))
N_matrix = np.random.normal(scale=sigma, size=(n_episodes_in_batch, w_states.shape[0]))
logger.info('running episode: {}'.format(j))
for i in range(n_episodes_in_batch):
logger.debug('running batch of size:{}'.format(n_episodes_in_batch))
w_try = w_states + N_matrix[i]
logger.debug(w_try)
reward, steps = episode_f(w_try, env, goal_steps, env.action_space)
logger.debug('reward: {}'.format(reward))
rewards[i] = reward
reward_average = np.mean(rewards)
reward_h.append(reward_average)
logger.info(rewards)
reward_std_deviation = np.std(rewards)
logger.debug('std deviation of rewards: {}'.format(reward_std_deviation))
rewards_sum = abs(sum(rewards))
logger.info('sum of rewards: {}'.format(rewards_sum))
if rewards_sum <= sum_reward_goal:
break
if reward_std_deviation == 0:
break
A_vector = (rewards - reward_average) / reward_std_deviation
w_states += alpha/(n_episodes_in_batch*sigma) * np.matmul(N_matrix.T, A_vector)
for j in range(n_additional_episodes):
# w is static at this point
episode_f(w_states, env, goal_steps, env.action_space)
env.close()
gym.upload('nes', api_key='', )
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment