Skip to content

Instantly share code, notes, and snippets.

@lemonteaa
Last active November 26, 2017 14:37
Show Gist options
  • Save lemonteaa/9b53741778ab177049973df9c8b1eefd to your computer and use it in GitHub Desktop.
Save lemonteaa/9b53741778ab177049973df9c8b1eefd to your computer and use it in GitHub Desktop.
Jupyter Notebook for developing my PPO integration into keras-rl
# coding: utf-8
# # Getting Started
#
# First we need to figure out the details of keras and gyms input spec, especially the tensor's shape
#
# Note that for a numpy (**n-d**imensional) array, it has a `shape` attr which is a python tuple. For example, `s = (2,6,3,)` represents a $2 \times 6 \times 3$ tensor.
#
# Also note that `np.array` can convert a python array into a numpy array, with the *obvious* interpretation. For example,
#
# `[ [ [1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12] ], [ [-1, -2, -3], [-4, -5, -6], [-7, -8, -9], [-10, -11, -12] ] ]`
#
# Represent a $2 \times 4 \times 3$ tensor.
# In[11]:
import numpy as np
import gym
from keras.models import Sequential, Model
from keras.layers import Dense, Activation, Flatten, Input, merge
# In[12]:
ENV_NAME = 'Pendulum-v0'
gym.undo_logger_setup()
env = gym.make(ENV_NAME)
np.random.seed(123)
env.seed(123)
assert len(env.action_space.shape) == 1
nb_actions = env.action_space.shape[0]
env_a = env.action_space
env_o = env.observation_space
print(env_a.shape, env_a.low, env_a.high)
print(env_o.shape, env_o.low, env_o.high)
# In[13]:
np.array([ [ [1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12] ], [ [-1, -2, -3], [-4, -5, -6], [-7, -8, -9], [-10, -11, -12] ] ]).shape
# So OpenAI's gym has the `space.Box` class ([Link](https://github.com/openai/gym/blob/master/gym/spaces/box.py)) which is just a numpy array with an upper and lower bound for each element, so that it is possible to sample (with a uniform distribution) from the space.
# In[15]:
from gym import spaces
space_In = spaces.Box(-1.0, 1.0, (1,))
space_Ob = spaces.Box(-8.0, 8.0, (3,))
i = space_In.sample()
o = space_Ob.sample()
print(i)
print(o)
# In[16]:
type(np.zeros((5,)).shape), type(space_Ob.sample())
# In[17]:
(1,) + env.observation_space.shape
# From the example below we can see that Kera's model assume an input tensor of the form $N \times d_1 \times \cdots \times d_k$, where $N$ is the (mini-)batch size (for parallel computation), and the actual input has shape $d_1 \times \cdots \times d_k$.
# In[18]:
critic = Sequential()
critic.add(Flatten(input_shape=(1,) + env.observation_space.shape))
critic.add(Dense(16))
critic.add(Activation('relu'))
critic.add(Dense(16))
critic.add(Activation('relu'))
critic.add(Dense(16))
critic.add(Activation('relu'))
critic.add(Dense(1))
critic.add(Activation('linear'))
print(critic.summary())
# In[19]:
critic.compile(optimizer='rmsprop', loss='mse')
# In[20]:
#np.array([env.observation_space.sample(), env.observation_space.sample()]).shape
def genInput(n, m):
return np.array([[env.observation_space.sample() for _ in range(m)] for _ in range(n)])
#test_input = np.array([[env.observation_space.sample()], [env.observation_space.sample()]])
test_input = genInput(10,1)
test_output = critic.predict_on_batch(test_input)
print(test_input)
print(test_input.shape)
print(test_output)
# In[21]:
# What's the difference?
# https://stackoverflow.com/questions/44972565/what-is-the-difference-between-the-predict-and-predict-on-batch-methods-of-a-ker
critic.predict(test_input, batch_size=5)
# Now let's test our implementation. First try the easy stuff in util, where we compute the [Generalized Advantage Esimator](https://danieltakeshi.github.io/2017/04/02/notes-on-the-generalized-advantage-estimation-paper/).
#
# Here we use the naive way: Let $\delta^V_t = r_t + \gamma V(s_{t+1}) - V(s_t)$, and
#
# $$A^{\text{GAE}(\gamma, \lambda)}_t = \sum_{l=0}^\infty (\gamma \lambda)^l \delta^V_{t+l}$$
# In[45]:
def GeneralizedAdvantageEstimator(critic, state, reward, gamma, lamb):
# TODO: irresponsible draft, all details likely wrong
assert len(state) == len(reward)
n = len(state)
value = critic.predict_on_batch(state).flatten()
delta = reward + gamma * np.roll(value, -1) - value
# No premature optimization for now...
result = [0]
r = gamma * lamb
for i in range(1, n):
result.append(result[-1] * r + delta[n-1-i])
return result[:0:-1]
# In[32]:
np.random.sample((10, 1))
# In[44]:
hi = [1, 2, 3, 4]
hi[:0:-1]
# In[46]:
rs = np.random.sample((10, 1)).flatten()
GeneralizedAdvantageEstimator(critic, test_input, rs, 0.5, 1.0)
#rs + 0.5 * np.roll(test_output.flatten(), -1) - test_output.flatten()
#print(rs)
# # Network Architecture for PPO
#
# (WIP)
#
# Reference:
# - [AI Gym Workout](https://learningai.io/projects/2017/07/28/ai-gym-workout.html)
# - [Paper on TRPO: See Section 8 and Appendix D](https://arxiv.org/pdf/1502.05477.pdf)
#
# Now let's try to fix the interface. PPO is an improvement over TRPO, which is itself designed to deal with complex, continuous control problem by controlling the size of updates.
#
# Critic network is simple: Just a function from state (vector) to estimated value (scalar).
#
# Actor network is more complex. It contain a networks plus an additional function:
# - `actor`: From state to (continuous) action space (vector). The output is the mean parameter $\mu$.
# - `policy_dist`: Function from a tuple $(\mu, \Sigma)$, where $\Sigma$ is a standalone, additional parameter representing exploration, to a probability distribution $\pi_\theta(a)$ over the whole action space. Example uses a Gaussian with diagonal covariances $\Sigma$.
#
# Outputs from `actor` should be composed with the function `policy_dist` and feed to a sampler that generates the final action according to the resulting distribution.
#
# Notice that because of the function composition, the whole "network" contains $\Sigma$ as additional parameter to update in the policy gradient step, i.e. $\theta$ contains all parameters of the `actor` network plus $\Sigma$.
#
# For the categorical case, one can interpret $\mu$ as the probability of each discrete choice and let $\Sigma$ be empty.
#
# (Note: the TRPO paper uses logarithm of standard deviation instead)
# In[102]:
#WIP
#action = Input(name='action')
#state = Input(name='state')
#advantage = Input(name='advantage')
#prob_theta = self.policy_dist(action, self.actor(state), sigma)
#prob_thetaold = self.policy_dist(action, self.target_actor(state), target_sigma)
#Let's try to handle sigma using a dense layer
tmp_disp = Input(name='delta', shape=(1,) + (4,))
test_sigma = Dense(4, use_bias=False)
# In[142]:
from keras import backend as K
from keras.layers import Lambda
c = Input(name='dummy', tensor=K.constant(np.array([[1.0]])), shape=(1,))
#out = K.sum(K.exp(K.log(Flatten()(tmp_disp)) - test_sigma(c)), axis=1)
out = Lambda(lambda x: K.sum(K.exp(K.log(K.abs(x[0])) - x[1]), axis=1, keepdims=True))([Flatten()(tmp_disp), test_sigma(c)])
cust_model = Model(inputs=[tmp_disp, c], outputs=out)
print(K.is_keras_tensor(test_sigma(c)))
# In[143]:
print(cust_model.summary())
# In[144]:
cust_model.compile(optimizer='rmsprop', loss='mse')
# In[150]:
cust_model.predict(np.array([[[1.0, 0.3, -1.0, 0.5]], [[1.0, 0.0, -1.0, 0.5]]]))
# In[149]:
K.eval(test_sigma(c)) # current weight
# In[148]:
cust_model.train_on_batch(np.array([[[1.0, 0.3, -1.0, 0.5]], [[1.0, 0.0, -1.0, 0.5]]]), np.array([3.217, 3.609]))
# In[166]:
test_sigma.get_weights()[0].flatten()
# In[165]:
np.random.normal(np.array([1.0, -1.0, 0.3, 2.6]), np.array([0.01, 0.7, 2.1, 4.5]))
# In[167]:
xx = np.random.sample((10, 1))
xx, xx.flatten()
# Also let's get the multivariate Gaussian distribution right (for diagonal covariance matrix only!)
#
# $$ \log p(x_k) = -\frac{\log (2\pi)}{2} -\log \sigma_k - \frac{(x-\mu_k)^2}{2\sigma_k^2}$$
#
# So sum then to get
#
# $$ \log p(x) = -\frac{n \log (2\pi)}{2} -\sum_k \log \sigma_k - \sum_k \frac{(x-\mu_k)^2}{2\sigma_k^2}$$
#
# Now let $s_k = \log \sigma_k$, we have
#
# $$ \log p(x) = -\frac{n \log (2\pi)}{2} -\sum_k s_k - \sum_k \exp(\log\frac{(x-\mu_k)^2}{2\sigma_k^2})
# = -\frac{n \log (2\pi)}{2} -\sum_k s_k - \sum_k \exp(2\log(x-\mu_k) - \log 2 - 2s_k) $$
# In[171]:
import math
math.log(4.5), K.eval(K.constant(math.log(4.5)) + test_sigma(c))
# Now let's start fixing interfaces, starting with the network that wrap user supplied actor network (which we assume to also include the dummy part)
# In[201]:
state_in = Input(shape=(1,) + (3,), name='state_input')
x = Dense(16)(Flatten()(state_in))
x = Activation('relu')(x)
x = Dense(16)(x)
x = Activation('relu')(x)
x = Dense(16)(x)
x = Activation('relu')(x)
x = Dense(2)(x)
out_mu = Activation('linear', name='out_mu')(x)
dummy_in = Input(name='dummy_input', tensor=K.constant(np.array([[1.0]])), shape=(1,))
out_sigma = Dense(2, use_bias=False, name='out_sigma')(dummy_in)
cts_actor = Model(inputs=[state_in, dummy_in], outputs=[out_mu, out_sigma])
# In[202]:
print(cts_actor.summary())
# In[203]:
cts_actor.compile(optimizer='rmsprop', loss='mse')
# In[204]:
# Note that keras 2 API does not support returning dict even if all outputs are named
sample_outs = cts_actor.predict(np.array([ [[1.0, 0.2, -0.5]], [[2.6, -7.9, -2.4]], [[3.1, 8.6, -4.7]], [[0.2, 9.1, 0.0]] ]))
sample_outs
# In[221]:
cts_actor.inputs
# In[222]:
train_in = Input(shape=(1,) + (3,), name='train_in')
actual_in = Input(shape=(2,), name='actual_in')
def err1(x):
out_mu, out_sigma, actual_out = x
return K.sum(K.abs(out_mu - actual_out) * K.exp(out_sigma), axis=1, keepdims=True)
net_out = cts_actor([train_in] + cts_actor.inputs[1:])
tt_1 = Lambda(err1)(net_out + [actual_in])
combined_test = Model(inputs=[train_in, actual_in] + cts_actor.inputs[1:], outputs=tt_1)
# In[223]:
print(combined_test.summary())
# In[224]:
combined_test.compile(optimizer='rmsprop', loss='mse')
# In[225]:
combined_test.predict({'train_in': np.array([ [[1.0, 0.2, -0.5]], [[2.6, -7.9, -2.4]], [[3.1, 8.6, -4.7]], [[0.2, 9.1, 0.0]] ]),
'actual_in': np.array([ [2.3, 0.8], [1.4, -5.3], [0.0, -7.2], [1.1, -0.4] ]) })
# In[226]:
class IndependentGaussianProcess(object):
def __init__(self, n):
self.n = n
def sample(self, x):
mu, sigma = x
assert mu.shape == (self.n,)
assert sigma.shape == (self.n,)
return np.random.normal(mu, sigma)
def get_dist(self, x):
mu, sigma, y = x
return K.constant(- self.n * math.log(2*math.pi) / 2 ) - K.sum(sigma, axis=1, keepdims=True) - K.sum(K.exp( 2 * K.log(K.abs(y - mu)) - K.constant(math.log(2.0)) - 2 * sigma), axis=1, keepdims=True)
sampler_test = IndependentGaussianProcess(2)
# In[227]:
t2_mu = Input(shape=(2,))
t2_sigma = Input(shape=(2,))
t2_y = Input(shape=(2,))
tt_2 = Lambda(sampler_test.get_dist)([t2_mu, t2_sigma, t2_y])
log_p_test = Model(inputs=[t2_mu, t2_sigma, t2_y], outputs=tt_2)
print(log_p_test.summary())
log_p_test.compile(optimizer='rmsprop', loss='mse')
# In[228]:
np.log( np.array([ [0.0, 1.0], [2.0, 3.0] ]) )
# In[229]:
log_p_test.predict([ np.array([ [0.0, 1.0], [1.0, 0.0], [-1.0, 2.0] ]),
np.log(np.array([ [2.0, 3.0], [4.0, 5.0], [ 6.0, 0.0] ])),
np.array([ [0.0 + 2.0 * 1, 1.0 - 3.0 * 2],
[1.0 + 4.0 * 0, 0.0 - 5.0 * 0.5],
[-1.0 + 6.0 * 1.5, 2.0] ]) ])
# In[231]:
K.eval(K.constant(np.array([[1.0], [2.0], [-1.0]])) * K.constant(np.array([[4.0], [8.0], [3.6]])))
# In[242]:
sampler_in1 = [ x.flatten() for x in cts_actor.predict_on_batch(np.array([ [[1.0, 0.2, -0.5]] ])) ]
sampler_in1, sampler_test.sample(sampler_in1)
# So `observation` passed to the `forward` method is exactly what comes out of the Gym's `env.step` function.
# In[243]:
np.array([[ np.array([4.3, -9.5, -0.4]) ]])
# # Fixing the interfaces
#
# ## Keras-rl agent spec
#
# Given an action, keras-rl call the gym function to get results:
#
# `observation, r, done, info = env.step(action)`
#
# The state is then feed to agent's forward function to determine action:
#
# `action = self.forward(observation)`
#
# And the reward is feed to backward function for training:
#
# `metrics = self.backward(reward, terminal=done)`
#
# Notice that because of this split, in implementation we remember the observation action pair and append to memory in the backward call.
# In[245]:
env.reset()
observation, r, done, info = env.step(np.array([0.4]))
observation, r, done
# Also notice that keras-rl agent supports making decision on not just the current state but also some fixed length window of previous states. This is done as follows:
#
# - In the forward call, `memory.get_recent_state` is called which return a python list of previous states (including current state), `state_window = [state_(-3), state_(-2), state_(-1), state_0]` for example. The window length is in the `window_length` attr of the Memory object. (Remember, each state is straight from gym API)
# - We then call `batch = self.process_state_batch([state_window])` to transform into a batch that has the right format for directly feeding it into the neural network. This method is generic and in the simplest case just convert a python list into numpy array through the `np.array` call. In other context it is used to handle an actual batch such as `[state_window1, state_window2, ... ]` generated by the memory class's `sample` method, which uniformly randomly sample from the store, and then fill in the history for each sampled (single) state.
#
# ## Training Architecture
#
# Given the constraint of existing keras-rl framework vs PPO algorithm, here is a compromise:
# - Assume a single threaded execution
# - Push history into the memory, and optionally at the end of each episode (`done = True` in gym API), compute the GAE for the entire episode.
# - This requires extending the `SequentialMemory` class to support additional info
# - In training phase, do the minibatch sampling as usual (which is *not* sequential). The `state0` data of each sample may contain a window of states - which is for use by the actor network - and should not be confused with the window used for GAE computation.
# - An alternative to computing GAE for the entire episode is to do it on demand (but how?)
#
# And here is the detail:
# - Maintain a fixed length buffer of episode memory, in each backward call add to the buffer.
# - Add once more for the call immediately after a `terminal=True` one - that one is the actual terminal state
# - Each time an episode finishes (the only reliably way to check is again by the `terminal` flag as `self.step` is not reset across episodes and some episode may finishes before `nb_max_episode_steps` is run), process the fixed buffer and then start a new one.
# - First compute the GAE
# - Then perform windowing so that actor network can recover the action distribution.
# - On every N episodes, run the optimizer on all collected data, then discard.
# So this is actually the A2C framework. (And I've unintentionally pivoted) Some links:
#
# - https://rubenfiszel.github.io/posts/rl4j/2016-08-24-Reinforcement-Learning-and-DQN.html
# Great post (though a bit long) with lots of tips/tricks/points not mentioned at introductory level
# - https://www.reddit.com/r/MachineLearning/comments/6unujm/d_conceptual_differences_a2c_ppo_reinforcement/
# Discussion that clears up some confusion for me
# - https://arxiv.org/pdf/1602.01783.pdf
# Google paper: Asynchronous Methods for Deep Reinforcement Learning
# - https://theberkeleyview.wordpress.com/2016/04/25/asynchronous-methods-for-deep-reinforcement-learning/
# Review of above paper
# - https://blog.openai.com/baselines-acktr-a2c/
# The noise/asynchronity of A3C doesn't help a lots, so other than performance consideration, just A2C is still okay?
# - https://medium.com/@awjuliani/simple-reinforcement-learning-with-tensorflow-part-4-deep-q-networks-and-beyond-8438a3e2b8df
# Another long tutorial series (just a random link, honestly there's ton of materials out there)
@lemonteaa
Copy link
Author

To get back the notebook, do the import manually? (opps)

Codes following # In[11]: etc is a code cell, while multiple lines comments are markdown cell.

@lemonteaa
Copy link
Author

This (smallish) project is much harder than I initially thought...

So after lots of struggle I hope I really got all the details of my design right this time. I will be pivoting (i.e. drastically changing direction and/or re-plan everything from scratch) my PPO implementation.

In particular, see the "Training Architecture" section in the gist - I will effectively be also implementing the A2C framework while trying to only use existing core.py 's Agent logic. @ParthaEth This part will probably conflict with your parallel env PR inevitably if I try to change the core, so I will leave it at that and leave further integration for next iteration or something.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment