Last active November 26, 2017 14:37
Jupyter Notebook for developing my PPO integration into keras-rl
# coding: utf-8
# # Getting Started
# First we need to figure out the details of keras and gyms input spec, especially the tensor's shape
# Note that for a numpy (**n-d**imensional) array, it has a `shape` attr which is a python tuple. For example, `s = (2,6,3,)` represents a $2 \times 6 \times 3$ tensor.
# Also note that `np.array` can convert a python array into a numpy array, with the *obvious* interpretation. For example,
# `[ [ [1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12] ], [ [-1, -2, -3], [-4, -5, -6], [-7, -8, -9], [-10, -11, -12] ] ]`
# Represent a $2 \times 4 \times 3$ tensor.
# In[11]:
import numpy as np
import gym
from keras.models import Sequential, Model
from keras.layers import Dense, Activation, Flatten, Input, merge
# In[12]:
ENV_NAME = 'Pendulum-v0'
env = gym.make(ENV_NAME)
assert len(env.action_space.shape) == 1
nb_actions = env.action_space.shape[0]
env_a = env.action_space
env_o = env.observation_space
print(env_a.shape, env_a.low, env_a.high)
print(env_o.shape, env_o.low, env_o.high)
# In[13]:
np.array([ [ [1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12] ], [ [-1, -2, -3], [-4, -5, -6], [-7, -8, -9], [-10, -11, -12] ] ]).shape
# So OpenAI's gym has the `space.Box` class ([Link]( which is just a numpy array with an upper and lower bound for each element, so that it is possible to sample (with a uniform distribution) from the space.
# In[15]:
from gym import spaces
space_In = spaces.Box(-1.0, 1.0, (1,))
space_Ob = spaces.Box(-8.0, 8.0, (3,))
i = space_In.sample()
o = space_Ob.sample()
# In[16]:
type(np.zeros((5,)).shape), type(space_Ob.sample())
# In[17]:
(1,) + env.observation_space.shape
# From the example below we can see that Kera's model assume an input tensor of the form $N \times d_1 \times \cdots \times d_k$, where $N$ is the (mini-)batch size (for parallel computation), and the actual input has shape $d_1 \times \cdots \times d_k$.
# In[18]:
critic = Sequential()
critic.add(Flatten(input_shape=(1,) + env.observation_space.shape))
# In[19]:
critic.compile(optimizer='rmsprop', loss='mse')
# In[20]:
#np.array([env.observation_space.sample(), env.observation_space.sample()]).shape
def genInput(n, m):
return np.array([[env.observation_space.sample() for _ in range(m)] for _ in range(n)])
#test_input = np.array([[env.observation_space.sample()], [env.observation_space.sample()]])
test_input = genInput(10,1)
test_output = critic.predict_on_batch(test_input)
# In[21]:
# What's the difference?
critic.predict(test_input, batch_size=5)
# Now let's test our implementation. First try the easy stuff in util, where we compute the [Generalized Advantage Esimator](
# Here we use the naive way: Let $\delta^V_t = r_t + \gamma V(s_{t+1}) - V(s_t)$, and
# $$A^{\text{GAE}(\gamma, \lambda)}_t = \sum_{l=0}^\infty (\gamma \lambda)^l \delta^V_{t+l}$$
# In[45]:
def GeneralizedAdvantageEstimator(critic, state, reward, gamma, lamb):
# TODO: irresponsible draft, all details likely wrong
assert len(state) == len(reward)
n = len(state)
value = critic.predict_on_batch(state).flatten()
delta = reward + gamma * np.roll(value, -1) - value
# No premature optimization for now...
result = [0]
r = gamma * lamb
for i in range(1, n):
result.append(result[-1] * r + delta[n-1-i])
return result[:0:-1]
# In[32]:
np.random.sample((10, 1))
# In[44]:
hi = [1, 2, 3, 4]
# In[46]:
rs = np.random.sample((10, 1)).flatten()
GeneralizedAdvantageEstimator(critic, test_input, rs, 0.5, 1.0)
#rs + 0.5 * np.roll(test_output.flatten(), -1) - test_output.flatten()
# # Network Architecture for PPO
# (WIP)
# Reference:
# - [AI Gym Workout](
# - [Paper on TRPO: See Section 8 and Appendix D](
# Now let's try to fix the interface. PPO is an improvement over TRPO, which is itself designed to deal with complex, continuous control problem by controlling the size of updates.
# Critic network is simple: Just a function from state (vector) to estimated value (scalar).
# Actor network is more complex. It contain a networks plus an additional function:
# - `actor`: From state to (continuous) action space (vector). The output is the mean parameter $\mu$.
# - `policy_dist`: Function from a tuple $(\mu, \Sigma)$, where $\Sigma$ is a standalone, additional parameter representing exploration, to a probability distribution $\pi_\theta(a)$ over the whole action space. Example uses a Gaussian with diagonal covariances $\Sigma$.
# Outputs from `actor` should be composed with the function `policy_dist` and feed to a sampler that generates the final action according to the resulting distribution.
# Notice that because of the function composition, the whole "network" contains $\Sigma$ as additional parameter to update in the policy gradient step, i.e. $\theta$ contains all parameters of the `actor` network plus $\Sigma$.
# For the categorical case, one can interpret $\mu$ as the probability of each discrete choice and let $\Sigma$ be empty.
# (Note: the TRPO paper uses logarithm of standard deviation instead)
# In[102]:
#action = Input(name='action')
#state = Input(name='state')
#advantage = Input(name='advantage')
#prob_theta = self.policy_dist(action,, sigma)
#prob_thetaold = self.policy_dist(action, self.target_actor(state), target_sigma)
#Let's try to handle sigma using a dense layer
tmp_disp = Input(name='delta', shape=(1,) + (4,))
test_sigma = Dense(4, use_bias=False)
# In[142]:
from keras import backend as K
from keras.layers import Lambda
c = Input(name='dummy', tensor=K.constant(np.array([[1.0]])), shape=(1,))
#out = K.sum(K.exp(K.log(Flatten()(tmp_disp)) - test_sigma(c)), axis=1)
out = Lambda(lambda x: K.sum(K.exp(K.log(K.abs(x[0])) - x[1]), axis=1, keepdims=True))([Flatten()(tmp_disp), test_sigma(c)])
cust_model = Model(inputs=[tmp_disp, c], outputs=out)
# In[143]:
# In[144]:
cust_model.compile(optimizer='rmsprop', loss='mse')
# In[150]:
cust_model.predict(np.array([[[1.0, 0.3, -1.0, 0.5]], [[1.0, 0.0, -1.0, 0.5]]]))
# In[149]:
K.eval(test_sigma(c)) # current weight
# In[148]:
cust_model.train_on_batch(np.array([[[1.0, 0.3, -1.0, 0.5]], [[1.0, 0.0, -1.0, 0.5]]]), np.array([3.217, 3.609]))
# In[166]:
# In[165]:
np.random.normal(np.array([1.0, -1.0, 0.3, 2.6]), np.array([0.01, 0.7, 2.1, 4.5]))
# In[167]:
xx = np.random.sample((10, 1))
xx, xx.flatten()
# Also let's get the multivariate Gaussian distribution right (for diagonal covariance matrix only!)
# $$ \log p(x_k) = -\frac{\log (2\pi)}{2} -\log \sigma_k - \frac{(x-\mu_k)^2}{2\sigma_k^2}$$
# So sum then to get
# $$ \log p(x) = -\frac{n \log (2\pi)}{2} -\sum_k \log \sigma_k - \sum_k \frac{(x-\mu_k)^2}{2\sigma_k^2}$$
# Now let $s_k = \log \sigma_k$, we have
# $$ \log p(x) = -\frac{n \log (2\pi)}{2} -\sum_k s_k - \sum_k \exp(\log\frac{(x-\mu_k)^2}{2\sigma_k^2})
# = -\frac{n \log (2\pi)}{2} -\sum_k s_k - \sum_k \exp(2\log(x-\mu_k) - \log 2 - 2s_k) $$
# In[171]:
import math
math.log(4.5), K.eval(K.constant(math.log(4.5)) + test_sigma(c))
# Now let's start fixing interfaces, starting with the network that wrap user supplied actor network (which we assume to also include the dummy part)
# In[201]:
state_in = Input(shape=(1,) + (3,), name='state_input')
x = Dense(16)(Flatten()(state_in))
x = Activation('relu')(x)
x = Dense(16)(x)
x = Activation('relu')(x)
x = Dense(16)(x)
x = Activation('relu')(x)
x = Dense(2)(x)
out_mu = Activation('linear', name='out_mu')(x)
dummy_in = Input(name='dummy_input', tensor=K.constant(np.array([[1.0]])), shape=(1,))
out_sigma = Dense(2, use_bias=False, name='out_sigma')(dummy_in)
cts_actor = Model(inputs=[state_in, dummy_in], outputs=[out_mu, out_sigma])
# In[202]:
# In[203]:
cts_actor.compile(optimizer='rmsprop', loss='mse')
# In[204]:
# Note that keras 2 API does not support returning dict even if all outputs are named
sample_outs = cts_actor.predict(np.array([ [[1.0, 0.2, -0.5]], [[2.6, -7.9, -2.4]], [[3.1, 8.6, -4.7]], [[0.2, 9.1, 0.0]] ]))
# In[221]:
# In[222]:
train_in = Input(shape=(1,) + (3,), name='train_in')
actual_in = Input(shape=(2,), name='actual_in')
def err1(x):
out_mu, out_sigma, actual_out = x
return K.sum(K.abs(out_mu - actual_out) * K.exp(out_sigma), axis=1, keepdims=True)
net_out = cts_actor([train_in] + cts_actor.inputs[1:])
tt_1 = Lambda(err1)(net_out + [actual_in])
combined_test = Model(inputs=[train_in, actual_in] + cts_actor.inputs[1:], outputs=tt_1)
# In[223]:
# In[224]:
combined_test.compile(optimizer='rmsprop', loss='mse')
# In[225]:
combined_test.predict({'train_in': np.array([ [[1.0, 0.2, -0.5]], [[2.6, -7.9, -2.4]], [[3.1, 8.6, -4.7]], [[0.2, 9.1, 0.0]] ]),
'actual_in': np.array([ [2.3, 0.8], [1.4, -5.3], [0.0, -7.2], [1.1, -0.4] ]) })
# In[226]:
class IndependentGaussianProcess(object):
def __init__(self, n):
self.n = n
def sample(self, x):
mu, sigma = x
assert mu.shape == (self.n,)
assert sigma.shape == (self.n,)
return np.random.normal(mu, sigma)
def get_dist(self, x):
mu, sigma, y = x
return K.constant(- self.n * math.log(2*math.pi) / 2 ) - K.sum(sigma, axis=1, keepdims=True) - K.sum(K.exp( 2 * K.log(K.abs(y - mu)) - K.constant(math.log(2.0)) - 2 * sigma), axis=1, keepdims=True)
sampler_test = IndependentGaussianProcess(2)
# In[227]:
t2_mu = Input(shape=(2,))
t2_sigma = Input(shape=(2,))
t2_y = Input(shape=(2,))
tt_2 = Lambda(sampler_test.get_dist)([t2_mu, t2_sigma, t2_y])
log_p_test = Model(inputs=[t2_mu, t2_sigma, t2_y], outputs=tt_2)
log_p_test.compile(optimizer='rmsprop', loss='mse')
# In[228]:
np.log( np.array([ [0.0, 1.0], [2.0, 3.0] ]) )
# In[229]:
log_p_test.predict([ np.array([ [0.0, 1.0], [1.0, 0.0], [-1.0, 2.0] ]),
np.log(np.array([ [2.0, 3.0], [4.0, 5.0], [ 6.0, 0.0] ])),
np.array([ [0.0 + 2.0 * 1, 1.0 - 3.0 * 2],
[1.0 + 4.0 * 0, 0.0 - 5.0 * 0.5],
[-1.0 + 6.0 * 1.5, 2.0] ]) ])
# In[231]:
K.eval(K.constant(np.array([[1.0], [2.0], [-1.0]])) * K.constant(np.array([[4.0], [8.0], [3.6]])))
# In[242]:
sampler_in1 = [ x.flatten() for x in cts_actor.predict_on_batch(np.array([ [[1.0, 0.2, -0.5]] ])) ]
sampler_in1, sampler_test.sample(sampler_in1)
# So `observation` passed to the `forward` method is exactly what comes out of the Gym's `env.step` function.
# In[243]:
np.array([[ np.array([4.3, -9.5, -0.4]) ]])
# # Fixing the interfaces
# ## Keras-rl agent spec
# Given an action, keras-rl call the gym function to get results:
# `observation, r, done, info = env.step(action)`
# The state is then feed to agent's forward function to determine action:
# `action = self.forward(observation)`
# And the reward is feed to backward function for training:
# `metrics = self.backward(reward, terminal=done)`
# Notice that because of this split, in implementation we remember the observation action pair and append to memory in the backward call.
# In[245]:
observation, r, done, info = env.step(np.array([0.4]))
observation, r, done
# Also notice that keras-rl agent supports making decision on not just the current state but also some fixed length window of previous states. This is done as follows:
# - In the forward call, `memory.get_recent_state` is called which return a python list of previous states (including current state), `state_window = [state_(-3), state_(-2), state_(-1), state_0]` for example. The window length is in the `window_length` attr of the Memory object. (Remember, each state is straight from gym API)
# - We then call `batch = self.process_state_batch([state_window])` to transform into a batch that has the right format for directly feeding it into the neural network. This method is generic and in the simplest case just convert a python list into numpy array through the `np.array` call. In other context it is used to handle an actual batch such as `[state_window1, state_window2, ... ]` generated by the memory class's `sample` method, which uniformly randomly sample from the store, and then fill in the history for each sampled (single) state.
# ## Training Architecture
# Given the constraint of existing keras-rl framework vs PPO algorithm, here is a compromise:
# - Assume a single threaded execution
# - Push history into the memory, and optionally at the end of each episode (`done = True` in gym API), compute the GAE for the entire episode.
# - This requires extending the `SequentialMemory` class to support additional info
# - In training phase, do the minibatch sampling as usual (which is *not* sequential). The `state0` data of each sample may contain a window of states - which is for use by the actor network - and should not be confused with the window used for GAE computation.
# - An alternative to computing GAE for the entire episode is to do it on demand (but how?)
# And here is the detail:
# - Maintain a fixed length buffer of episode memory, in each backward call add to the buffer.
# - Add once more for the call immediately after a `terminal=True` one - that one is the actual terminal state
# - Each time an episode finishes (the only reliably way to check is again by the `terminal` flag as `self.step` is not reset across episodes and some episode may finishes before `nb_max_episode_steps` is run), process the fixed buffer and then start a new one.
# - First compute the GAE
# - Then perform windowing so that actor network can recover the action distribution.
# - On every N episodes, run the optimizer on all collected data, then discard.
# So this is actually the A2C framework. (And I've unintentionally pivoted) Some links:
# -
# Great post (though a bit long) with lots of tips/tricks/points not mentioned at introductory level
# -
# Discussion that clears up some confusion for me
# -
# Google paper: Asynchronous Methods for Deep Reinforcement Learning
# -
# Review of above paper
# -
# The noise/asynchronity of A3C doesn't help a lots, so other than performance consideration, just A2C is still okay?
# -
# Another long tutorial series (just a random link, honestly there's ton of materials out there)
To get back the notebook, do the import manually? (opps)

Codes following # In[11]: etc is a code cell, while multiple lines comments are markdown cell.

Copy link

This (smallish) project is much harder than I initially thought...

So after lots of struggle I hope I really got all the details of my design right this time. I will be pivoting (i.e. drastically changing direction and/or re-plan everything from scratch) my PPO implementation.

In particular, see the "Training Architecture" section in the gist - I will effectively be also implementing the A2C framework while trying to only use existing 's Agent logic. @ParthaEth This part will probably conflict with your parallel env PR inevitably if I try to change the core, so I will leave it at that and leave further integration for next iteration or something.

