0. Overview¶
Problem: Train a policy network using REINFORCE with baseline to balance a pole on a cart (simplified CartPole dynamics), achieving >150 average episode length.
Concepts used: RL foundations (ch329), policy gradients (ch330), MLP forward/backward (ch304, ch306), Adam (ch312), advantage estimation.
Expected output: trained policy achieving stable pole balance, learning curve showing episode length growth, and a visualisation of the policy’s action probabilities.
Difficulty: ★★★★☆ | Estimated time: 90 minutes
1. Setup — CartPole Environment¶
import numpy as np
import matplotlib.pyplot as plt
class CartPole:
"""
Linearised CartPole dynamics.
State: [cart_x, cart_v, pole_angle, pole_angular_v]
Actions: 0 (push left), 1 (push right)
"""
GRAVITY = 9.8; MASSCART = 1.0; MASSPOLE = 0.1; LENGTH = 0.5
FORCE_MAG = 10.0; TAU = 0.02
THETA_LIMIT = 0.2094 # ~12 degrees
X_LIMIT = 2.4
def reset(self, rng):
self.state = rng.uniform(-0.05, 0.05, 4)
return self.state.copy()
def step(self, action):
x,xd,theta,thetad = self.state
force = self.FORCE_MAG if action==1 else -self.FORCE_MAG
total_mass = self.MASSCART + self.MASSPOLE
pm_l = self.MASSPOLE * self.LENGTH
cosT = np.cos(theta); sinT = np.sin(theta)
temp = (force + pm_l*thetad**2*sinT)/total_mass
alpha = (self.GRAVITY*sinT - cosT*temp) / (
self.LENGTH*(4./3. - self.MASSPOLE*cosT**2/total_mass))
ax = temp - pm_l*alpha*cosT/total_mass
x += self.TAU*xd
xd += self.TAU*ax
theta += self.TAU*thetad
thetad += self.TAU*alpha
self.state = np.array([x,xd,theta,thetad])
done = (abs(x)>self.X_LIMIT or abs(theta)>self.THETA_LIMIT)
reward = 0.0 if done else 1.0
return self.state.copy(), reward, done
def relu(z): return np.maximum(0,z)
def relu_grad(z): return (z>0).astype(float)
def softmax(z):
z_s=z-z.max(); e=np.exp(z_s); return e/e.sum()
class PolicyNet:
def __init__(self, state_dim=4, n_actions=2, hidden=64, seed=0):
rng=np.random.default_rng(seed)
fi=state_dim; fo=hidden
self.W1=rng.normal(0,np.sqrt(2./fi),(fo,fi)); self.b1=np.zeros(fo)
self.W2=rng.normal(0,np.sqrt(2./fo),(n_actions,fo)); self.b2=np.zeros(n_actions)
def forward(self,s):
h=relu(self.W1@s+self.b1); logits=self.W2@h+self.b2
return softmax(logits),(h,s,logits)
def log_prob_grad(self,s,a,cache):
"""Gradient of log π(a|s) w.r.t. parameters."""
h,s_in,logits=cache
probs=softmax(logits)
d_logits=-probs; d_logits[a]+=1.
dW2=np.outer(d_logits,h); db2=d_logits
d_h=self.W2.T@d_logits; d_h*=relu_grad(self.W1@s+self.b1)
dW1=np.outer(d_h,s_in); db1=d_h
return {'W1':dW1,'b1':db1,'W2':dW2,'b2':db2}
class ValueNet:
"""Baseline: V(s) for variance reduction."""
def __init__(self, state_dim=4, hidden=64, seed=1):
rng=np.random.default_rng(seed)
self.W1=rng.normal(0,np.sqrt(2./state_dim),(hidden,state_dim)); self.b1=np.zeros(hidden)
self.W2=rng.normal(0,np.sqrt(2./hidden),(1,hidden)); self.b2=np.zeros(1)
def forward(self,s):
h=relu(self.W1@s+self.b1); return float(self.W2@h+self.b2)
env=CartPole(); policy=PolicyNet(); value_net=ValueNet()
print("Environment and networks ready.")2. Stage 1 — REINFORCE with Baseline¶
def collect_episode(policy, env, rng, max_steps=500):
s=env.reset(rng); done=False
states,actions,rewards,log_prob_grads=[],[],[],[]
while not done and len(rewards)<max_steps:
probs,cache=policy.forward(s)
a=rng.choice(2,p=probs)
s_next,r,done=env.step(a)
grads=policy.log_prob_grad(s,a,cache)
states.append(s); actions.append(a); rewards.append(r)
log_prob_grads.append(grads)
s=s_next
return states,actions,rewards,log_prob_grads
def discounted_returns(rewards,gamma=0.99):
G=0; returns=[]
for r in reversed(rewards):
G=r+gamma*G; returns.insert(0,G)
return np.array(returns)
# Adam for policy
pm={'W1':policy.W1,'b1':policy.b1,'W2':policy.W2,'b2':policy.b2}
vm={'W1':value_net.W1,'b1':value_net.b1,'W2':value_net.W2,'b2':value_net.b2}
am={'t':0,'mp':{},'vp':{},'mv':{},'vv':{}}
def adam_update_param(P,g,m,v,t,lr=3e-3,b1=0.9,b2=0.999,eps=1e-8):
m[:]=b1*m+(1-b1)*g; v[:]=b2*v+(1-b2)*g**2
mh=m/(1-b1**t); vh=v/(1-b2**t)
P-=lr/(np.sqrt(vh)+eps)*mh
rng=np.random.default_rng(0)
episode_lengths=[]; moving_avg=[]
for ep in range(600):
states,actions,rewards,lpg=collect_episode(policy,env,rng)
G=discounted_returns(rewards)
# Baseline: value net predictions
baselines=np.array([value_net.forward(s) for s in states])
advantages=G-baselines
# Normalise advantages
if advantages.std()>1e-6: advantages=(advantages-advantages.mean())/advantages.std()
am['t']+=1; t=am['t']
# Accumulate policy gradients (sum over timesteps, weighted by advantage)
acc_grads={k:np.zeros_like(pm[k]) for k in pm}
for grad_dict,adv in zip(lpg,advantages):
for k in acc_grads:
acc_grads[k]+=adv*grad_dict[k] # gradient ASCENT
for k in pm:
am['mp'].setdefault(k,np.zeros_like(pm[k])); am['vp'].setdefault(k,np.zeros_like(pm[k]))
adam_update_param(pm[k],-acc_grads[k]/len(states),am['mp'][k],am['vp'][k],t,lr=3e-3)
# Update value net to fit returns (MSE, numerical grad)
for i in range(0,len(states),10):
s=states[i]; target=G[i]
pred=value_net.forward(s)
eps_fd=1e-4
for pname,P in vm.items():
flat=P.ravel()
for idx in range(0,len(flat),max(1,len(flat)//10)):
flat[idx]+=eps_fd; lp=(value_net.forward(s)-target)**2
flat[idx]-=2*eps_fd; lm=(value_net.forward(s)-target)**2
flat[idx]+=eps_fd; flat[idx]-=1e-3*(lp-lm)/(2*eps_fd)
L=len(rewards); episode_lengths.append(L)
if len(episode_lengths)>=20:
moving_avg.append(np.mean(episode_lengths[-20:]))
if (ep+1)%100==0:
ma=np.mean(episode_lengths[-20:])
print(f"Episode {ep+1:4d}: length={L:4d} 20-ep avg={ma:.1f}")3. Stage 2 — Evaluation and Visualisation¶
fig,axes=plt.subplots(1,3,figsize=(15,4))
axes[0].plot(episode_lengths,alpha=0.3,color='#3498db',lw=1)
if moving_avg:
axes[0].plot(range(19,len(episode_lengths)),moving_avg,color='#e74c3c',lw=2,label='20-ep avg')
axes[0].axhline(150,color='green',linestyle='--',alpha=0.6,label='Target: 150')
axes[0].set_title('Episode lengths during training'); axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Steps survived'); axes[0].legend()
# Policy action probabilities across pole angles
theta=np.linspace(-0.2,0.2,100)
prob_right=[]
for t in theta:
s=np.array([0.,0.,t,0.])
p,_=policy.forward(s); prob_right.append(float(p[1]))
axes[1].plot(np.degrees(theta),prob_right,color='#e74c3c',lw=2)
axes[1].axhline(0.5,color='gray',linestyle='--',alpha=0.5)
axes[1].axvline(0,color='black',lw=0.8)
axes[1].set_title('Policy: P(push right) vs pole angle')
axes[1].set_xlabel('Pole angle (degrees)'); axes[1].set_ylabel('P(right)')
axes[1].fill_between(np.degrees(theta),prob_right,0.5,alpha=0.2,color='#e74c3c')
# Evaluate final policy
final_lengths=[len(collect_episode(policy,env,np.random.default_rng(i))[0]) for i in range(50)]
axes[2].hist(final_lengths,bins=20,color='#2ecc71',edgecolor='white',alpha=0.8)
axes[2].axvline(np.mean(final_lengths),color='#e74c3c',lw=2,
label=f'Mean={np.mean(final_lengths):.1f}')
axes[2].set_title('Final policy: episode length distribution\n(50 eval episodes)')
axes[2].set_xlabel('Steps survived'); axes[2].legend()
plt.suptitle('ch339: REINFORCE with Baseline — CartPole',fontsize=12)
plt.tight_layout(); plt.savefig('ch339_rl.png',dpi=120); plt.show()
print(f"Final policy — mean episode length: {np.mean(final_lengths):.1f}")
print(f"Max episode length: {max(final_lengths)}")5. Results & Reflection¶
What was built: A policy network trained with REINFORCE + baseline (advantage estimation) to balance a pole on a cart using the true CartPole physics equations.
What math made it possible:
Log-derivative trick (ch330):
Discounted returns (ch329):
Advantage function (ch330): reduces gradient variance
Adam (ch312): adaptive LR essential for policy gradient stability
Extension challenges:
Implement actor-critic: update using TD(0) rather than Monte Carlo returns.
Add entropy regularisation to the policy loss to encourage exploration.
Try PPO’s clipped objective (ch330) and compare learning speed.