Dynamic programming可以幫助我們算出Value Function,今天就來實際實做兩種Dynamic programming方法:Policy Iteration跟改良後的Value Iteration,來解決Taxi問題!
詳細可以參考上篇
這裡直接跟著演算法實作
def policy_evaluation(V, policy, gamma, theta):
while True:
delta = 0
for s in range(env.observation_space.n):
v = 0
for action, action_prob in enumerate(policy[s]):
for prob, next_state, reward, done in env.P[s][action]:
if done:
v += action_prob * prob * reward
else:
v += (action_prob * prob * (reward + gamma * V[next_state]))
delta = max(delta, np.abs(v - V[s]))
V[s] = v
if delta < theta:
break
有幾點需要注意
policy[s][a]
就是
V[next_state]=0
v
當作新的V[s]
比較好算def policy_improvent(V, policy, gamma):
policy_stable = True
for s in range(env.observation_space.n):
old_policy = policy[s].copy()
q_s = np.zeros(env.action_space.n)
for a in range(env.action_space.n):
for prob, next_state, reward, done in env.P[s][a]:
if done:
q_s[a] += prob * reward
else:
q_s[a] += (prob * (reward + gamma * V[next_state]))
best_action = np.argmax(q_s)
policy[s] = np.eye(env.action_space.n)[best_action]
if np.any(old_policy != policy[s]):
policy_stable = False
return policy_stable
np.eye
將其他action設為0最後就是iteration的部分,一開始的policy通常為隨機策略
def policy_iteration(gamma = 0.99, max_iteration = 10000, theta = 0.0001):
V = np.zeros(env.observation_space.n)
policy = np.ones([env.observation_space.n, env.action_space.n]) / env.action_space.n
for i in range(max_iteration):
policy_evaluation(V, policy, gamma, theta)
stable = policy_improvent(V, policy, gamma)
if stable:
break
return V, policy
如果一開始policy不是隨機策略、又為1的話,有可能會因為policy沒有考慮到Final State,導致policy evaluation無法收斂(類似continuing task)
實際來跑跑看Taxi環境吧
Taxi環境裡,亂放乘客下車的話reward為-10,成功放乘客下車reward為20,其餘reward皆為-1
import gym
import numpy as np
env = gym.make('Taxi-v2')
env = env.unwrapped
V, policy = policy_iteration(gamma = 0.99, max_iteration = 10000, theta = 0.0001)
state = env.reset()
rewards = 0
steps = 0
while True:
env.render()
action = np.argmax(policy[state])
next_state, reward, done, info = env.step(action)
rewards += reward
steps += 1
if done:
break
state = next_state
print(f'total reward: {rewards}, steps: {steps}')
上面是gym環境一般的邏輯,依照state選擇action,透過step(action)
更新state,再根據新的state選擇action。
如果沒有寫錯的話,應該可以看到我們的車子每次做的行為都是最佳行為,因為每次行為皆有-1的reward存在,要最大化Value Function就必須走最短路徑。
你可能有發現我們將設為0.99,為甚麼不設成1呢?如果將設為1並跑看看,大概跑5~10分鐘後也會得到一樣的policy。
造成跑那麼慢的原因是因為,我們一開始的policy是隨機的,一開始的Value Function會考慮所有action的值,會讓reward一直傳播下去,只要到達目標需要的step越多,reward傳遞需要的step也就越多,結果就是很難收斂。
一種解決方法就是將,這樣能強迫reward的傳播在限制的step內會趨近於0。
另一種方法就是Value Iteration,我們不需要等Value Function收斂才進行Policy Evaluation,回想GPI的圖,跟我們在Policy Improvement中的:
我們不一定要等完全收斂,將Policy Evaluation改成只跑一次,再直接進入Policy Improvement,就可以解決收斂過久的問題。此方法稱為Value Iteration
再回頭看看Policy Iteration的演算法,在Policy Improvement中找的Action當作新的,再根據求下個,等同於把policy去掉,直接找來當作下個,
也就是把 的部分
換成。
改進後的演算法如下:
我們只需要在Value Function收斂後,再求Policy,就是最佳解了。
def value_iteration(gamma, max_iteration, theta):
V = np.zeros(env.observation_space.n)
for i in range(max_iteration):
delta = 0
for s in range(env.observation_space.n):
q_s = np.zeros(env.action_space.n)
for a in range(env.action_space.n):
for prob, next_state, reward, done in env.P[s][a]:
if done:
q_s[a] += prob * reward
else:
q_s[a] += (prob * (reward + gamma * V[next_state]))
best_value = max(q_s)
delta = max(delta, np.abs(best_value - V[s]))
V[s] = best_value
if delta < theta:
break
policy = get_policy(V, gamma)
return V, policy
def get_policy(V, gamma):
policy = np.zeros((env.observation_space.n, env.action_space.n))
for s in range(env.observation_space.n):
q_s = np.zeros(env.action_space.n)
for a in range(env.action_space.n):
for prob, next_state, reward, done in env.P[s][a]:
if done:
q_s[a] += prob * reward
else:
q_s[a] += (prob * (reward + gamma * V[next_state]))
best_action = np.argmax(q_s)
policy[s] = np.eye(env.action_space.n)[best_action]
return policy
get_policy()
只是policy_improvent()
中去掉policy_stable
的檢查
用Value Iteration跑跑看Taxi
import gym
import numpy as np
env = gym.make('Taxi-v2')
env = env.unwrapped
V, policy = value_iteration(gamma = 1.0, max_iteration = 10000, theta = 0.0001)
state = env.reset()
rewards = 0
steps = 0
while True:
env.render()
action = np.argmax(policy[state])
next_state, reward, done, info = env.step(action)
rewards += reward
steps += 1
if done:
break
state = next_state
print(f'total reward: {rewards}, steps: {steps}')
收斂的速度明顯比policy_iteration快了許多~~
可以比較看看兩種policy是否一致
p1 = policy_iteration(gamma = 1.0, max_iteration = 10000, theta = 0.0001)[1]
p2 = value_iteration(gamma = 1.0, max_iteration = 10000, theta = 0.0001)[1]
print(np.all(p1 == p2)) # True
Policy Iteration跟Value Iteration可以幫助我們了解GPI的架構,但是這兩個方法都需要知道環境的Dynamic,如果不知道的話怎麼辦呢?明天介紹的Monte Carlo可以解決這個問題!