Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

ch339 — Project: Reinforcement Learning — CartPole

0. Overview

Problem: Train a policy network using REINFORCE with baseline to balance a pole on a cart (simplified CartPole dynamics), achieving >150 average episode length.

Concepts used: RL foundations (ch329), policy gradients (ch330), MLP forward/backward (ch304, ch306), Adam (ch312), advantage estimation.

Expected output: trained policy achieving stable pole balance, learning curve showing episode length growth, and a visualisation of the policy’s action probabilities.

Difficulty: ★★★★☆ | Estimated time: 90 minutes

1. Setup — CartPole Environment

import numpy as np
import matplotlib.pyplot as plt


class CartPole:
    """
    Linearised CartPole dynamics.
    State: [cart_x, cart_v, pole_angle, pole_angular_v]
    Actions: 0 (push left), 1 (push right)
    """
    GRAVITY = 9.8; MASSCART = 1.0; MASSPOLE = 0.1; LENGTH = 0.5
    FORCE_MAG = 10.0; TAU = 0.02
    THETA_LIMIT = 0.2094  # ~12 degrees
    X_LIMIT = 2.4

    def reset(self, rng):
        self.state = rng.uniform(-0.05, 0.05, 4)
        return self.state.copy()

    def step(self, action):
        x,xd,theta,thetad = self.state
        force = self.FORCE_MAG if action==1 else -self.FORCE_MAG
        total_mass = self.MASSCART + self.MASSPOLE
        pm_l = self.MASSPOLE * self.LENGTH
        cosT = np.cos(theta); sinT = np.sin(theta)
        temp = (force + pm_l*thetad**2*sinT)/total_mass
        alpha = (self.GRAVITY*sinT - cosT*temp) / (
            self.LENGTH*(4./3. - self.MASSPOLE*cosT**2/total_mass))
        ax = temp - pm_l*alpha*cosT/total_mass

        x     += self.TAU*xd
        xd    += self.TAU*ax
        theta += self.TAU*thetad
        thetad += self.TAU*alpha
        self.state = np.array([x,xd,theta,thetad])

        done = (abs(x)>self.X_LIMIT or abs(theta)>self.THETA_LIMIT)
        reward = 0.0 if done else 1.0
        return self.state.copy(), reward, done


def relu(z): return np.maximum(0,z)
def relu_grad(z): return (z>0).astype(float)

def softmax(z):
    z_s=z-z.max(); e=np.exp(z_s); return e/e.sum()

class PolicyNet:
    def __init__(self, state_dim=4, n_actions=2, hidden=64, seed=0):
        rng=np.random.default_rng(seed)
        fi=state_dim; fo=hidden
        self.W1=rng.normal(0,np.sqrt(2./fi),(fo,fi)); self.b1=np.zeros(fo)
        self.W2=rng.normal(0,np.sqrt(2./fo),(n_actions,fo)); self.b2=np.zeros(n_actions)

    def forward(self,s):
        h=relu(self.W1@s+self.b1); logits=self.W2@h+self.b2
        return softmax(logits),(h,s,logits)

    def log_prob_grad(self,s,a,cache):
        """Gradient of log π(a|s) w.r.t. parameters."""
        h,s_in,logits=cache
        probs=softmax(logits)
        d_logits=-probs; d_logits[a]+=1.
        dW2=np.outer(d_logits,h); db2=d_logits
        d_h=self.W2.T@d_logits; d_h*=relu_grad(self.W1@s+self.b1)
        dW1=np.outer(d_h,s_in); db1=d_h
        return {'W1':dW1,'b1':db1,'W2':dW2,'b2':db2}

class ValueNet:
    """Baseline: V(s) for variance reduction."""
    def __init__(self, state_dim=4, hidden=64, seed=1):
        rng=np.random.default_rng(seed)
        self.W1=rng.normal(0,np.sqrt(2./state_dim),(hidden,state_dim)); self.b1=np.zeros(hidden)
        self.W2=rng.normal(0,np.sqrt(2./hidden),(1,hidden)); self.b2=np.zeros(1)

    def forward(self,s):
        h=relu(self.W1@s+self.b1); return float(self.W2@h+self.b2)

env=CartPole(); policy=PolicyNet(); value_net=ValueNet()
print("Environment and networks ready.")

2. Stage 1 — REINFORCE with Baseline

def collect_episode(policy, env, rng, max_steps=500):
    s=env.reset(rng); done=False
    states,actions,rewards,log_prob_grads=[],[],[],[]
    while not done and len(rewards)<max_steps:
        probs,cache=policy.forward(s)
        a=rng.choice(2,p=probs)
        s_next,r,done=env.step(a)
        grads=policy.log_prob_grad(s,a,cache)
        states.append(s); actions.append(a); rewards.append(r)
        log_prob_grads.append(grads)
        s=s_next
    return states,actions,rewards,log_prob_grads

def discounted_returns(rewards,gamma=0.99):
    G=0; returns=[]
    for r in reversed(rewards):
        G=r+gamma*G; returns.insert(0,G)
    return np.array(returns)

# Adam for policy
pm={'W1':policy.W1,'b1':policy.b1,'W2':policy.W2,'b2':policy.b2}
vm={'W1':value_net.W1,'b1':value_net.b1,'W2':value_net.W2,'b2':value_net.b2}
am={'t':0,'mp':{},'vp':{},'mv':{},'vv':{}}

def adam_update_param(P,g,m,v,t,lr=3e-3,b1=0.9,b2=0.999,eps=1e-8):
    m[:]=b1*m+(1-b1)*g; v[:]=b2*v+(1-b2)*g**2
    mh=m/(1-b1**t); vh=v/(1-b2**t)
    P-=lr/(np.sqrt(vh)+eps)*mh

rng=np.random.default_rng(0)
episode_lengths=[]; moving_avg=[]

for ep in range(600):
    states,actions,rewards,lpg=collect_episode(policy,env,rng)
    G=discounted_returns(rewards)
    # Baseline: value net predictions
    baselines=np.array([value_net.forward(s) for s in states])
    advantages=G-baselines
    # Normalise advantages
    if advantages.std()>1e-6: advantages=(advantages-advantages.mean())/advantages.std()

    am['t']+=1; t=am['t']
    # Accumulate policy gradients (sum over timesteps, weighted by advantage)
    acc_grads={k:np.zeros_like(pm[k]) for k in pm}
    for grad_dict,adv in zip(lpg,advantages):
        for k in acc_grads:
            acc_grads[k]+=adv*grad_dict[k]  # gradient ASCENT

    for k in pm:
        am['mp'].setdefault(k,np.zeros_like(pm[k])); am['vp'].setdefault(k,np.zeros_like(pm[k]))
        adam_update_param(pm[k],-acc_grads[k]/len(states),am['mp'][k],am['vp'][k],t,lr=3e-3)

    # Update value net to fit returns (MSE, numerical grad)
    for i in range(0,len(states),10):
        s=states[i]; target=G[i]
        pred=value_net.forward(s)
        eps_fd=1e-4
        for pname,P in vm.items():
            flat=P.ravel()
            for idx in range(0,len(flat),max(1,len(flat)//10)):
                flat[idx]+=eps_fd; lp=(value_net.forward(s)-target)**2
                flat[idx]-=2*eps_fd; lm=(value_net.forward(s)-target)**2
                flat[idx]+=eps_fd; flat[idx]-=1e-3*(lp-lm)/(2*eps_fd)

    L=len(rewards); episode_lengths.append(L)
    if len(episode_lengths)>=20:
        moving_avg.append(np.mean(episode_lengths[-20:]))
    if (ep+1)%100==0:
        ma=np.mean(episode_lengths[-20:])
        print(f"Episode {ep+1:4d}: length={L:4d}  20-ep avg={ma:.1f}")

3. Stage 2 — Evaluation and Visualisation

fig,axes=plt.subplots(1,3,figsize=(15,4))
axes[0].plot(episode_lengths,alpha=0.3,color='#3498db',lw=1)
if moving_avg:
    axes[0].plot(range(19,len(episode_lengths)),moving_avg,color='#e74c3c',lw=2,label='20-ep avg')
axes[0].axhline(150,color='green',linestyle='--',alpha=0.6,label='Target: 150')
axes[0].set_title('Episode lengths during training'); axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Steps survived'); axes[0].legend()

# Policy action probabilities across pole angles
theta=np.linspace(-0.2,0.2,100)
prob_right=[]
for t in theta:
    s=np.array([0.,0.,t,0.])
    p,_=policy.forward(s); prob_right.append(float(p[1]))

axes[1].plot(np.degrees(theta),prob_right,color='#e74c3c',lw=2)
axes[1].axhline(0.5,color='gray',linestyle='--',alpha=0.5)
axes[1].axvline(0,color='black',lw=0.8)
axes[1].set_title('Policy: P(push right) vs pole angle')
axes[1].set_xlabel('Pole angle (degrees)'); axes[1].set_ylabel('P(right)')
axes[1].fill_between(np.degrees(theta),prob_right,0.5,alpha=0.2,color='#e74c3c')

# Evaluate final policy
final_lengths=[len(collect_episode(policy,env,np.random.default_rng(i))[0]) for i in range(50)]
axes[2].hist(final_lengths,bins=20,color='#2ecc71',edgecolor='white',alpha=0.8)
axes[2].axvline(np.mean(final_lengths),color='#e74c3c',lw=2,
                label=f'Mean={np.mean(final_lengths):.1f}')
axes[2].set_title('Final policy: episode length distribution\n(50 eval episodes)')
axes[2].set_xlabel('Steps survived'); axes[2].legend()

plt.suptitle('ch339: REINFORCE with Baseline — CartPole',fontsize=12)
plt.tight_layout(); plt.savefig('ch339_rl.png',dpi=120); plt.show()
print(f"Final policy — mean episode length: {np.mean(final_lengths):.1f}")
print(f"Max episode length: {max(final_lengths)}")

5. Results & Reflection

What was built: A policy network trained with REINFORCE + baseline (advantage estimation) to balance a pole on a cart using the true CartPole physics equations.

What math made it possible:

  • Log-derivative trick (ch330): θJ=E[θlogπA]\nabla_\theta J = \mathbb{E}[\nabla_\theta \log \pi \cdot A]

  • Discounted returns (ch329): Gt=kγkrt+kG_t = \sum_k \gamma^k r_{t+k}

  • Advantage function (ch330): A=GV(s)A = G - V(s) reduces gradient variance

  • Adam (ch312): adaptive LR essential for policy gradient stability

Extension challenges:

  1. Implement actor-critic: update V(s)V(s) using TD(0) rather than Monte Carlo returns.

  2. Add entropy regularisation to the policy loss to encourage exploration.

  3. Try PPO’s clipped objective (ch330) and compare learning speed.