Policy gradient descent is one of the backbones of AI. At the time of writing (2020, yes that year (This joke will become more fun with the years)) a lot of research went into this, and there are some good results using this method about which we will talk later on.
The goal of this tutorial is really to give you an idea or intuition into what it is, and how to program it. But do not be alarmed if you do not understand this directly. This understanding will mostly come with time, for now implementing this code and having a feeling for what is happening is more than fine.
The first chapters were showing off how to beat CartPole
and Taxi
using a Keras model from Tensorflow (and one numpy implementation). Keras is fitting a model and updating the parameters with certain correction values. In this lesson we are going to show you how to make a Keras model without using tensorflow, but only numpy. We are going to implement a basic policy gradient descent algorithm. What does this mean?
It means that we are going to use an array of values to represent the weights of our model and manually calculate the forward pass, which determines the actions to take, and the backward pass, which tells us how to update the weights. During this lesson we will focus a lot more on the intuition behind it than the mathematical proof and background.
Now that, that is said, if you are interested to find out more about Policy Gradient Descent and how it works exactly there are some blogs trying to explain this in much more painful details, than will be done here. Let me repeat the warning, there is a lot of mathematics involved for the proof, so it is a though read and will definitely take some time before it becomes any clearer.
Side note: As one of my professors used to say, ''Math is a dragon, but do not be afraid of it. Together we can slay the beast and show that there is really some beauty hidden inside.'' To help make you more sense about his phrase. His last lesson was always on the Beauty of Rainbows, seen from an Electro Magnetic perspective (light).
Just the old file, besides that nothing, no I am serious, all we need is an empty programming file and some power for training.
The theory of Neural Networks and how it works is explained much better by the 3BLUE1BROWN
series by means of visual cues, than can be done by text. So before continuing, please take a look at the whole series (~1 hour as a whole, or ~15 minutes per video). The serie will explain what a neuron is and how backpropagation works.
One of the extra libraries that will be used throughout this tutorial is numpy. We installed the package in the earlier tutorials using pip install numpy
. Often when importing numpy you will see the following import numpy as np
, this means that np
is now an alias for numpy
. There are a few more common aliases and one other you have already seen in the imitation network namely import matplotlib.pyplot as plt
. This was then used to plot the loss and accuracy.
If you have absolutely no idea about how array indexing works, take a look at the turoial from scipy-lectures. There they explain what it is, why it is useful and how you can use it.
In this tutorial we will use some advanced calculus terms such as dot product
and cross products
, if you are unsure about these just view them for now as mathematical operations on arrays. What is important is how to use array slicing and indexing to locate elements in an array, see indexing and slicing for a quick overview.
The np
methods that we will use in this tutorial are the following:
Method | inputs | Description |
---|---|---|
shape | None (property) | Returns the dimension of the input (tuple) |
random.rand() | Returns random values between 0 and 1, and creates a new array with the input dimensions as shape. | |
sqrt | A numpy array | Returns a new numpy array with the square root of every element. |
vstack | A list of numpy arrays | Returns the vertical stack of all the images (concatenates based on the first axis) |
hstack | A list of numpy arrays | Returns the horizontal stack of all the images (concatenates based on the first axis) |
mean | A numpy array | Return the mean value (sum of the array / lenght of array) |
std | A numpy array | Return the standard deviation (sqrt(mean(abs(x - x.mean) ** 2) |
dot | Two numpy arrays | Return the dot product of the two arrays (Geometrically, it is the product of the Euclidean magnitudes of the two vectors and the cosine of the angle between them.) |
cross | Two numpy arrays | Return the cross product of the two arrays (The cross product a × b is defined as a vector c that is perpendicular (orthogonal) to both a and b, with a direction given by the right-hand rule and a magnitude equal to the area of the parallelogram that the vectors span.) |
outer | Two numpy arrays | Return the outer product of the two arrays (elementwise multiplication) |
As we have learned from the video above, a model is a collection of weights and biases. For simplicity we will ignore the biases for now and concentrate on making two layers of random weights.
For this model we have as inputs the 4 observations from CartPole, as a hidden layer we will have 200 nodes and as output a single value (left or right).
The first layer (W1) will have to go from the input layer to the hidden layer, while the second layer (W2) goes from the hidden layer to the output layer, which is a single value (left or right).
There are many different initializations, but for this one we will use the Xavier initalization. This means that we will divide the first layer by the square root of the input size and the second layer by the square root of the hidden layer size.
Note that the dimensions are in capitals, this means that they are constanst and shouldn't change during the running of the program.
def create_model(self):
model = dict()
model['W1'] = np.random.randn(self.HIDDEN_LAYER, self.INPUT_DIMENSION) / np.sqrt(self.INPUT_DIMENSION)
model['W2'] = np.random.randn(self.HIDDEN_LAYER) / np.sqrt(self.HIDDEN_LAYER)
return model
For the backpropagation we have to keep track of several values. All observations (states), the hidden layer values, the final layer value (action probability) and the reward. The reward will be used as an advantage or value function. This is an indicator of how well the action was and will be calculated using a discounted reward, which we will show later on.
The memory itself will contain all of the above for an entire episode. Because we do not know how long an episode runs we will use a list and append the values. A useful addition is to create a namedtuple for all the values we will have to store.
from collections import namedtuple
transition_ = namedtuple('transition', ('state', 'hidden', 'probability', 'reward'))
Note: The trailing underscore is used to prevent name collision with the actual transitions.
For this tutorial we will train on a per episode base, this is because we are going to use discounted reward, which are calculated on a per episode base. This means that in contrast with training on every step we are training using whole episodes. This will become more clear the following sections.
The feedforward will define our forward policy or the probability distribution of our actions. The feedbackward will define our gradients or update values per layer (W1, W2). We will treat both of these methods separately.
The feedforward is calculated by multiplying the input with the first layer, applying any activations. Then those (hidden) values are multiplied with the next layer, appyling any activations etc., until the end is reached.
The current model structure consists only out fully connected layers. Which means that every input is connected to every neuron in the hidden layer and each neuron in the hidden layer is connected to every neuron the output layer. A common name for this construction of multiple fully connected layers behind each other is a Multi Layer Perceptron
or MLP in short.
For the feed backward we are going to need the values of the intermediate states, or hidden layers. This will be the hidden
value. The final prediction is going to be about moving left or right. For this we are going to calculate the chance of the model moving right (1). For this we are going to use a sigmoid function, which maps values in the range 0 to 1. This means the closer we are to 1, to more confident the model is about moving to the right.
def sigmoid(x):
return 1.0 / (1.0 + np.exp(-x))
def policy_forward(self, obs):
""" Return probability of taking action 1 (right), and the hidden state """
hidden = np.dot(self.model['W1'], obs)
hidden[hidden < 0] = 0 # ReLU nonlinearity
log_probability = np.dot(self.model['W2'], hidden)
probability = self.sigmoid(log_probability)
return probability, hidden
In order to help you visualize the steps, the following dimensions are used:
Line | variable | dimension | Remark |
---|---|---|---|
3 | self.model['W1'] obs hidden |
(200, 4) (4,) (200,) |
|
4 | hidden |
(200,) | These are the activation values of W1 |
5 | self.model['W2'] log_probability |
(200,) (1,) |
|
6 | probability |
(1,) | This is the probability of taking action 1 (or 0). These are the activation values of W2 |
For the backward propagation we have to take into account the number of observations we had in the episodes, this value will be called N
. Remember that we want to update the gradients with respect to the weights (and biases) in the previous layers. For this we also stored the hidden values and action probabilities.
def policy_backward(self, episode_obs, episode_hidden, episode_probability):
""" backward pass. (eph is array of intermediate hidden states) """
dW2 = np.dot(episode_hidden.T, episode_probability)
dh = np.outer(episode_probability, self.model['W2'])
dh[episode_hidden < 0] = 0 # backprop relu
dW1 = np.dot(dh.T, episode_obs)
return {'W1': dW1, 'W2': dW2}
Line | variable | dimension | Remark |
---|---|---|---|
3 | episode_hidden episode_hidden.T episode_probability dW2 |
(N , 200) (200, N ) ( N ,) (200,) |
The update values for W2. |
4 | episode_probability self.model['W2'] dh |
(N ,) (200,) ( N , 200) |
The outer product multiplies every input with every output |
5 | dh |
(N , 200) |
|
6 | dh.T episode_states dW1 |
(200, N ) ( N , 4) (200, 4) |
The update values for W1. |
If you are unfamiliar with the term gradients it might seems kinda weird that this would work. But what will happen is that we are going to multiply the probability of the model with a reward value. This will mean that the model will try to optimize those values. The gradient is defined as the direction of greatest change of a function. For this we can view our feed forward model as a function and we are calculating the direction for which we can change the direction the fastest.
Now that we have an idea of how everything that we need, we can start building an Agent. Try and build a loop that only takes into account the forward pass and ignores the backpropagation, but does store everything that is required for the backpropagation.
Tip: always include a running mean, so you have an idea if there is actual learning behaviour.
There are quite a few steps that we have to do, but you can try to implement everything in this order:
PGCartPole
), and define the constant values HIDDEN_LAYER
, INPUT_DIMENSION
and namedtuple transition_
.create_model
, that will create the model, with the code above.sigmoid
and policy_forward
, with the code from above.policy_backward
, with the code from above.run
, and the default gym game loop that you are used to, with random input actions.
import gym
import numpy as np
from collections import namedtuple
class PGCartPole:
HIDDEN_LAYER = 200 # number of hidden layer neurons
INPUT_DIMENSION = 4 # input dimension for the model
transition_ = namedtuple('transition', ('state', 'hidden', 'probability', 'reward'))
def __init__(self):
self.model = self.create_model()
@staticmethod
def sigmoid(x):
return 1.0 / (1.0 + np.exp(-x))
def create_model(self):
pass # Provided above
def policy_forward(self, obs):
pass # Provided above
def policy_backward(self, episode_states, episode_hidden, episode_probability):
pass # Provided above
def run(self, nr_games=100):
env = gym.make('CartPole-v1')
running_mean = 0
for episode in range(nr_games):
obs = env.reset()
memory = list()
score = 0
done = False
while not done:
probability, hidden = self.policy_forward(obs)
action = 1 if probability > 0.5 else 0
next_obs, reward, done, info = env.step(action)
score += reward
memory.append(self.transition_(obs, hidden, probability, reward))
obs = next_obs
running_mean = running_mean * 0.99 + score * 0.01
print(f"Episode {episode:6d}, score: {score: 4.0f}, running mean: {running_mean: 6.2f}")
if __name__ == '__main__':
agent = PGCartPole()
agent.run()
Now that we have the agent, we can add the backpropagation to the loop in the run
method:
First we need to prepare the data in the right format, since now we only have transitions in our memory, but we want to get arrays from the columns. Such that all the observations
, hiddens
, probabilities
and rewards
are stacked together.
Then we have to store the gradients for every episode until we have a good batch to update the model weights.
And finally we have to determine a good batch size to update (this can be done empirically).
Tip: It is often advantage to perform the final update in even smaller steps by applying a learning rate, when updating the weights (lr= ~1e-3).
For the backpropagation we need to get an array of the observations
, hiddens
, probabilities
and rewards
. For this we have to merge all the steps in memory per input. This can be done by using the inverse of a zip value, which is itself with a star, in any case zip(*memory)
.
# Convert memory to a stack
transition = self.transition_(*zip(*memory))
observations = np.vstack(transition.state)
hiddens = np.vstack(transition.hidden)
probabilities = np.hstack(transition.probability)
rewards = np.hstack(transition.reward)
To get the gradients, we can calculate the backward pass and add these values to the a gradient buffer, until we have a sufficiently large enough batch. For this game about 50 games is a good batch, especially since the games are relatively short.
grad = self.policy_backward(observations, hiddens, probabilities)
# accumulate grad over batch
for weight in self.model:
gradient_buffer[weight] += np.array(grad[weight], dtype=np.float)
if episodes % self.batch_size == 0:
for layer, weights in self.model.items():
self.model[layer] += self.learning_rate * gradient_buffer[layer]
gradient_buffer[layer] = np.zeros_like(weights)
In order to explain how well an action is, we are going to use a discounted reward. What does this mean? It means that we will calculate an expected reward, similar to Q-tables, but that we are using the actual rewards received over the episodes. By going over the received rewards in a reversed order, and adding it to a running sum (with a discount factor ).
In order to further stabilize the training we are going to normalize the discounted rewards. This means substracting the mean and dividing by the variance, in any case
These discounted values are indicators, or critics on the action probabilities. By multiplying the probabilities with the discounted rewards we get a proper idea of how well the probabilities are.
probabilities *= discounter_reward
The @staticmethod
is optional, but it indicates that the method doesn't require the self
argument. This means that this method cannot access any other methods from the class and is therefore a static
part of the class.
@staticmethod
def discount_rewards(r, gamma=0.99):
""" take 1D float array of rewards and compute discounted reward """
running_add = 0
discounted_r = np.zeros_like(r)
for idx in reversed(range(0, r.size)):
running_add = running_add * gamma + r[idx]
discounted_r[idx] = running_add
return discounted_r
# Calculate discounted rewards
discounter_reward = self.discount_rewards(rewards, self.gamma)
# standardize the rewards to be unit normal (helps control the gradient estimator variance)
discounter_reward -= np.mean(discounter_reward)
discounter_reward /= np.std(discounter_reward)
The normalization step together with the knowledge that the probabilities are between 0 and 1, assurs that we always get values that are relatively close to zero. By updating the weights with small variants around 0, we ensure that we do not make the weights to big, which result in relative small gradients. If we wouldn't perform this step this could lead to exploding gradients.
The model that we have right now, will work in most cases, but there are still some small tweaks that we can make to increase the learning speed of the model, and make it possible to show of your model to others.
Encouraging gradients: We want to encourage the action that is take, so if there is a big difference between the taken action and the probability of taking that action we want to update the weights more, than when the probability is close to the prefered action (probability 0 for action 0 and 1 for action 1). Also we can add a little bit of randomness when our model is very indecisive.
Saving and loading: Now if you want to show off your model, you can save and load a model using pickle
. This package allows to save object such as dictionaries and lists.
Evaluation: Evaluate your model final results.
# Calculate forward policy
action_probability, hidden = self.policy_forward(obs)
action = 1 if np.random.uniform() < action_probability else 0
# grad that encourages the action that was taken to be taken
# (see http://cs231n.github.io/neural-networks-2/#losses if confused)
probability = action - action_probability
When our action probablity is around 0.5 in np.random.uniform() < action_probability
, we have around 50% change of picking up or down. The better our model gets the closer the clearer the action probabilities will be.
For example:
Now if you want to show off your model, you can save and load a model using pickle
. This package allows to save object such as dictionaries and lists.
def load(self, save_path):
try:
self.model = pickle.load(open(save_path, 'rb'))
print("The following model is loaded:", save_path)
except FileNotFoundError:
print("The following model could not be found:", save_path)
def save(self, save_path):
with open(save_path, 'wb') as file:
pickle.dump(self.model, file)
As a final part add an evaluate to test how well the trained model is. A good run will take about 1500 episodes to reach a running mean of ~180, and an evaluation score of >195.
import gym
import numpy as np
import pickle
from collections import namedtuple
class PGCartPole:
# hyperparameters
HIDDEN_LAYER = 200 # number of hidden layer neurons
INPUT_DIMENSION = 4 # input dimension for the model
batch_size = 50 # every how many episodes to do a param update?
learning_rate = 1e-3
gamma = 0.99 # discount factor for reward
render = False
save_model = False
save_interval = 100
# resume from previous checkpoint?
resume = False
save_path = 'CartPole.pkl'
transition_ = namedtuple('transition', ('state', 'hidden', 'probability', 'reward'))
def __init__(self, game_name):
self.game_name = game_name
self.model = self.create_model()
if self.resume:
self.load(self.save_path)
@staticmethod
def sigmoid(x):
return 1.0 / (1.0 + np.exp(-x))
@staticmethod
def discount_rewards(r, gamma):
""" take 1D float array of rewards and compute discounted reward """
running_add = 0
discounted_r = np.zeros_like(r)
for idx in reversed(range(0, r.size)):
running_add = running_add * gamma + r[idx]
discounted_r[idx] = running_add
return discounted_r
def action(self, obs):
action_probability, hidden = self.policy_forward(obs)
action = 1 if action_probability >= 0.5 else 0
return action
def load(self, save_path):
try:
self.model = pickle.load(open(save_path, 'rb'))
print("The following model is loaded:", save_path)
except FileNotFoundError:
print("The following model could not be found:", save_path)
def save(self, save_path):
with open(save_path, 'wb') as file:
pickle.dump(self.model, file)
def create_model(self):
model = dict()
model['W1'] = np.random.randn(self.HIDDEN_LAYER, self.INPUT_DIMENSION) / np.sqrt(self.INPUT_DIMENSION)
model['W2'] = np.random.randn(self.HIDDEN_LAYER) / np.sqrt(self.HIDDEN_LAYER)
return model
def policy_forward(self, obs):
""" Return probability of taking action 1 (right), and the hidden state """
hidden = np.dot(self.model['W1'], obs)
hidden[hidden < 0] = 0 # ReLU nonlinearity
log_probability = np.dot(self.model['W2'], hidden)
probability = self.sigmoid(log_probability)
return probability, hidden
def policy_backward(self, episode_observations, episode_hidden, episode_probability):
""" backward pass. (eph is array of intermediate hidden states) """
dW2 = np.dot(episode_hidden.T, episode_probability)
dh = np.outer(episode_probability, self.model['W2'])
dh[episode_hidden <= 0] = 0 # backprop relu
dW1 = np.dot(dh.T, episode_observations)
return {'W1': dW1, 'W2': dW2}
def run(self, nr_games=100):
env = gym.make(self.game_name)
running_reward = self.evaluate(nr_games=200)
# update buffers that add up gradients over a batch and rmsprop memory
gradient_buffer = {k: np.zeros_like(v, dtype=np.float) for k, v in self.model.items()}
for episode in range(nr_games):
score = 0
memory = list()
obs = env.reset()
done = False
while not done:
# Render environment
if self.render:
env.render()
# Calculate forward policy
action_probability, hidden = self.policy_forward(obs)
action = 1 if np.random.uniform() < action_probability else 0
next_obs, reward, done, info = env.step(action)
score += reward
# grad that encourages the action that was taken to be taken
# (see http://cs231n.github.io/neural-networks-2/#losses if confused)
probability = action - action_probability
memory.append(self.transition_(obs, hidden, probability, reward))
obs = next_obs
# Convert memory to a stack
observations, hiddens, probabilities, rewards = np.array(list(zip(*memory)), dtype=np.object)
observations = np.array(list(observations), dtype=np.float)
hiddens = np.array(list(hiddens), dtype=np.float)
# Calculate discounted rewards
discounter_reward = self.discount_rewards(rewards, self.gamma)
# standardize the rewards to be unit normal (helps control the gradient estimator variance)
discounter_reward -= np.mean(discounter_reward)
discounter_reward /= np.std(discounter_reward)
# modulate the gradient with advantage (PG magic happens right here.)
probabilities *= discounter_reward
grad = self.policy_backward(observations, hiddens, probabilities)
# accumulate grad over batch
for weight in self.model:
gradient_buffer[weight] += np.array(grad[weight], dtype=np.float)
if episode % self.batch_size == 0:
for layer, weights in self.model.items():
self.model[layer] += self.learning_rate * gradient_buffer[layer]
gradient_buffer[layer] = np.zeros_like(weights)
running_reward = running_reward * 0.99 + score * 0.01
print(f"Episode {episode:6d}, score: {score: 4.0f}, running mean: {running_reward: 6.2f}")
if self.save_model and episode % self.save_interval == 0:
self.save(self.save_path)
env.close()
def evaluate(self, nr_games=100):
""" Evaluate the model results. """
env = gym.make(self.game_name)
collected_scores = []
for episode in range(1, nr_games + 1):
obs = env.reset()
done = False
score = 0
while not done:
# Get action from model
action = self.action(obs)
# update everything
obs, reward, done, info = env.step(action)
score += reward
print(f"\r\tGame {episode:3d}/{nr_games:3d} score: {score}", end='')
collected_scores.append(score)
average = sum(collected_scores) / nr_games
print(f"\n\nThe model played: {nr_games} games, with an average score of: {average: 5.2f}")
return average
if __name__ == '__main__':
agent = PGCartPole(game_name='CartPole-v1')
agent.run(nr_games=2_000)
agent.save(agent.save_path)
agent.evaluate(nr_games=100)
In this tutorial we tried to give you a glimpse on the background of Keras and Tensorflow. It is a great indactor of how happy we are with prebuilt libraries. There are many, and we mean many variation on top of the above concept.
If you would to get a better understanding, there is a beginners friendly explanation of the whole process by spinning up from openai. The applied technique is called Vanilla policy gradient descent, but we use a slight variant by changing the value function by the discounted rewards, which is better explained in this quora question.
The main difference between a Keras Q-table and Gradient Descent is that the Q-table is using Gradient Descent to determine the Q-values, which are the expected reward. While the Gradient Descent is using the rewards to determine the right actions directly, not taking into account all different states.
This is the real start in machine learning. Understanding all of the above and its consequences are quite extensive and will probabily take a while. We really encourage you to talk about this with other people, inside Serpentine or outside Serpentine. Explaining this to other people is not a trivial task, but a required one for people getting into AI.
For better understanding it is adviced to try out the following changes
And as a final question, what would be different if this was implemented using Keras?
So the final question was how would we implement the same thing with Keras?
The answer is rather easy, we take the exact same code, but fit the model on the observations
and probabilities
. This would mean that we can throw away the gradient part and replace the feed forward part with model.predict()
.
There is an example of how to do this using a model in this blog post: Policy Gradient Reinforcement Learning with Keras. Some small variants, instead of using the action he is using a onehot encoding
. But for two actions this can be replaced by a sigmoidal function (as we did). The secondary difference is in the model, but there are many variants that will work. To get a better idea of the different optimizers there are take a look at the Keras optimizers docs.