A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The system is controlled by applying a force of +1 or -1 to the cart. The pendulum starts upright, and the goal is to prevent it from falling over. A reward of +1 is provided for every timestep that the pole remains upright. The episode ends when the pole is more than 15 degrees from vertical, or the cart moves more than 2.4 units from the center.
A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The pendulum starts upright, and the goal is to prevent it from falling over by increasing and reducing the cart's velocity.
This environment corresponds to the version of the cart-pole problem described by Barto, Sutton, and Anderson
Type: Box(4)
Num Observation Min Max
0 Cart Position -4.8 4.8
1 Cart Velocity -Inf Inf
2 Pole Angle -24 deg 24 deg
3 Pole Velocity At Tip -Inf Inf
Type: Discrete(2)
Num Action
0 Push cart to the left
1 Push cart to the right
Note: The amount the velocity that is reduced or increased is not fixed; it depends on the angle the pole is pointing. This is because the center of gravity of the pole increases the amount of energy needed to move the cart underneath it
Reward is 1 for every step taken, including the termination step
Starting State:
All observations are assigned a uniform random value in [-0.05..0.05]
Episode Termination:
Pole Angle is more than 12 degrees
Cart Position is more than 2.4 (center of the cart reaches the edge of the display)
Episode length is greater than 200
Solved Requirements
Considered solved when the average reward is greater than or equal to 195.0 over 100 consecutive trials.
!apt-get install -y xvfb python-opengl > /dev/null 2>&1
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install x11-utils > /dev/null 2>&1
!pip install pyglet==1.3.2 > /dev/null 2>&1
import gym
import random
import torch
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
%matplotlib inline
import torch
import torch.nn as nn
import torch.nn.functional as F
class QNetwork(nn.Module):
"""Actor (Policy) Model."""
def __init__(self, state_size, action_size, seed, fc1_units=64, fc2_units=64):
"""Initialize parameters and build model.
state_size (int): Dimension of each state
action_size (int): Dimension of each action
seed (int): Random seed
fc1_units (int): Number of nodes in first hidden layer
fc2_units (int): Number of nodes in second hidden layer
super(QNetwork, self).__init__()
self.seed = torch.manual_seed(seed)
# h1: un fully connected qui mappe l'état vers une dimension 64
# h2: un fully connected qui mappe h1 vers une nouvelle dimension 64
# o : un fully connected qui mappe h2 vers une sortie de taille action_size
def forward(self, state):
"""Build a network that maps state -> action values."""
# A compléter
# forward pass de l'état state
class ReplayBuffer:
"""Fixed-size buffer to store experience tuples."""
def __init__(self, buffer_size, batch_size, seed):
"""Initialize a ReplayBuffer object.
buffer_size (int): maximum size of buffer
batch_size (int): size of each training batch
seed (int): random seed
self.action_size = actin_size
self.memory = deque(maxlen=buffer_size)
self.batch_size = batch_size
self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
self.seed = random.seed(seed)
def add(self, state, action, reward, next_state, done):
"""Add a new experience to memory."""
e = self.experience(state, action, reward, next_state, done)
def sample(self):
"""Randomly sample a batch of experiences from memory."""
experiences = random.sample(self.memory, k=self.batch_size)
states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)
return (states, actions, rewards, next_states, dones)
def __len__(self):
"""Return the current size of internal memory."""
return len(self.memory)
import numpy as np
import random
from collections import namedtuple, deque
import torch
import torch.nn.functional as F
import torch.optim as optim
BUFFER_SIZE = int(1e5) # replay buffer size
BATCH_SIZE = 64 # minibatch size
GAMMA = 0.99 # discount factor
LR = 5e-4 # learning rate
UPDATE_EVERY = 4 # how often to update the network
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class Agent():
"""Interacts with and learns from the environment."""
def __init__(self, state_size, action_size, seed):
"""Initialize an Agent object.
state_size (int): dimension of each state
action_size (int): dimension of each action
seed (int): random seed
self.state_size = state_size
self.action_size = action_size
self.seed = random.seed(seed)
# Q-Network
self.qnetwork_local = QNetwork(state_size, action_size, seed).to(device)
self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=LR)
# Replay memory
self.memory = ReplayBuffer(BUFFER_SIZE, BATCH_SIZE, seed)
# Initialize time step (for updating every UPDATE_EVERY steps)
self.t_step = 0
def step(self, state, action, reward, next_state, done):
# Save experience in replay memory
self.memory.add(state, action, reward, next_state, done)
# Learn every UPDATE_EVERY time steps.
self.t_step = (self.t_step + 1) % UPDATE_EVERY
if self.t_step == 0:
# If enough samples are available in memory, get random subset and learn
if len(self.memory) > BATCH_SIZE:
experiences = self.memory.sample()
self.learn(experiences, GAMMA)
def act(self, state, eps=0.):
"""Returns actions for given state as per current policy.
state (array_like): current state
eps (float): epsilon, for epsilon-greedy action selection
state = torch.from_numpy(state).float().unsqueeze(0).to(device)
with torch.no_grad():
action_values = self.qnetwork_local(state)
# Epsilon-greedy action selection
if random.random() > eps:
return np.argmax(action_values.cpu().data.numpy())
return random.choice(np.arange(self.action_size))
def learn(self, experiences, gamma):
"""Update value parameters using given batch of experience tuples.
experiences (Tuple[torch.Tensor]): tuple of (s, a, r, s', done) tuples
gamma (float): discount factor
states, actions, rewards, next_states, dones = experiences
# Get max predicted Q values (for next states) from target model
with torch.no_grad():
Q_targets_next = # A COMPLETER max_a Q(s_t+1, a)
Q_targets_next = Q_targets_next.unsqueeze(1) # ajout d'une dimension bidon pour futurs calculs
# Compute Q targets for current states
Q_targets = # A COMPLETER r_t + (gamma * Q_targets_next)
Q_targets = Q_targets * (1 - dones) # Si fin de l'épisode (dones = 1), on met q_targets à 0
# Get expected Q values from local model
Q_expected = #A COMPLTER Q(s_t,a_t) (hint : utiliser la fonction gather pour prendre les sorties pour les actions "actions")
# Compute loss
loss = #A COMPLETER : loss de regression pour calculer (Q_targets-Q_expected)²
# Minimize the loss
env = gym.make('CartPole-v0')
print('State shape: ', env.observation_space.shape)
print('Number of actions: ', env.action_space.n)
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
import matplotlib.pyplot as plt
display = Display(visible=0, size=(400, 300))
def plot_screen(env):
screen = env.render(mode='rgb_array')
agent = Agent(state_size=4, action_size=2, seed=0)
# watch an untrained agent
state = env.reset()
for j in range(200):
action = agent.act(state)
state, reward, done, _ = env.step(action)
if done:
def dqn(n_episodes=500, max_t=1000, eps_start=1.0, eps_end=0.01, eps_decay=0.995):
"""Deep Q-Learning.
n_episodes (int): maximum number of training episodes
max_t (int): maximum number of timesteps per episode
eps_start (float): starting value of epsilon, for epsilon-greedy action selection
eps_end (float): minimum value of epsilon
eps_decay (float): multiplicative factor (per episode) for decreasing epsilon
scores = [] # list containing scores from each episode
scores_window = deque(maxlen=100) # last 100 scores
eps = eps_start # initialize epsilon
for i_episode in range(1, n_episodes+1):
state = env.reset()
score = 0
for t in range(max_t):
action = agent.act(state, eps)
next_state, reward, done, _ = env.step(action)
agent.step(state, action, reward, next_state, done)
state = next_state
score += reward
if done:
scores_window.append(score) # save most recent score
scores.append(score) # save most recent score
eps = max(eps_end, eps_decay*eps) # decrease epsilon
print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")
if i_episode % 100 == 0:
print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
return scores
agent = Agent(state_size=4, action_size=2, seed=0)
scores = dqn()
torch.save(agent.qnetwork_local.state_dict(), "checkpoint.pth")
# plot the scores
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(len(scores)), scores)
plt.xlabel('Episode #')
state = env.reset()
for j in range(50):
action = agent.act(state)
state, reward, done, _ = env.step(action)
if done: