深度強(qiáng)化學(xué)習(xí)之:PPO訓(xùn)練紅白機(jī)1942
本篇是深度強(qiáng)化學(xué)習(xí)動(dòng)手系列文章,自MyEncyclopedia公眾號(hào)文章深度強(qiáng)化學(xué)習(xí)之:DQN訓(xùn)練超級(jí)瑪麗闖關(guān)發(fā)布后收到不少關(guān)注和反饋,這一期,讓我們實(shí)現(xiàn)目前主流深度強(qiáng)化學(xué)習(xí)算法PPO來打另一個(gè)紅白機(jī)經(jīng)典游戲1942。
相關(guān)文章鏈接如下:
強(qiáng)化學(xué)習(xí)開源環(huán)境集
視頻論文解讀:組合優(yōu)化的強(qiáng)化學(xué)習(xí)方法
解讀TRPO論文,深度強(qiáng)化學(xué)習(xí)結(jié)合傳統(tǒng)優(yōu)化方法
解讀深度強(qiáng)化學(xué)習(xí)基石論文:函數(shù)近似的策略梯度方法
NES 1942 環(huán)境安裝
紅白機(jī)游戲環(huán)境可以由OpenAI Retro來模擬,OpenAI Retro還在 Gym 集成了其他的經(jīng)典游戲環(huán)境,包括Atari 2600,GBA,SNES等。
不過,受到版權(quán)原因,除了一些基本的rom,大部分游戲需要自行獲取rom。
環(huán)境準(zhǔn)備部分相關(guān)代碼如下
pip install gym-retro
python -m retro.import /path/to/your/ROMs/directory/
OpenAI Gym 輸入動(dòng)作類型
在創(chuàng)建 retro 環(huán)境時(shí),可以在retro.make中通過參數(shù)use_restricted_actions指定 action space,即按鍵的配置。
env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)
可選參數(shù)如下,F(xiàn)ILTERED,DISCRETE和MULTI_DISCRETE 都可以指定過濾的動(dòng)作,過濾動(dòng)作需要通過配置文件加載。
class Actions(Enum):
"""
Different settings for the action space of the environment
"""
ALL = 0 #: MultiBinary action space with no filtered actions
FILTERED = 1 #: MultiBinary action space with invalid or not allowed actions filtered out
DISCRETE = 2 #: Discrete action space for filtered actions
MULTI_DISCRETE = 3 #: MultiDiscete action space for filtered actions
DISCRETE和MULTI_DISCRETE 是 Gym 里的 Action概念,它們的基類都是gym.spaces.Space,可以通過 sample()方法采樣,下面具體一一介紹。
Discrete:對(duì)應(yīng)一維離散空間,例如,Discrete(n=4) 表示 [0, 3] 范圍的整數(shù)。
from gym.spaces import Discrete
space = Discrete(4)
print(space.sample())
輸出是
3
Box:對(duì)應(yīng)多維連續(xù)空間,每一維的范圍可以用 [low,high] 指定。舉例,Box(low=-1.0, high=2, shape=(3, 4,), dtype=np.float32) 表示 shape 是 [3, 4],每個(gè)范圍在 [-1, 2] 的float32型 tensor。
from gym.spaces import Box
import numpy as np
space = Box(low=-1.0, high=2.0, shape=(3, 4), dtype=np.float32)
print(space.sample())
輸出是
[[-0.7538084 0.96901214 0.38641307 -0.05045208]
[-0.85486996 1.3516271 0.3222616 1.2540635 ]
[-0.29908678 -0.8970335 1.4869047 0.7007356 ]]
MultiBinary: 0或1的多維離散空間。例如,MultiBinary([3,2]) 表示 shape 是3x2的0或1的tensor。
from gym.spaces import MultiBinary
space = MultiBinary([3,2])
print(space.sample())
輸出是
[[1 0]
[1 1]
[0 0]]
MultiDiscrete:多維整型離散空間。例如,MultiDiscrete([5,2,2]) 表示三維Discrete空間,第一維范圍在 [0-4],第二,三維范圍在[0-1]。
from gym.spaces import MultiDiscrete
space = MultiDiscrete([5,2,2])
print(space.sample())
輸出是
[2 1 0]
Tuple:組合成 tuple 復(fù)合空間。舉例來說,可以將 Box,Discrete,Discrete組成tuple 空間:Tuple(spaces=(Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32), Discrete(n=3), Discrete(n=2)))
from gym.spaces import *
import numpy as np
space = Tuple(spaces=(Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32), Discrete(n=3), Discrete(n=2)))
print(space.sample())
輸出是
(array([ 0.22640526, 0.75286865, -0.6309239 ], dtype=float32), 0, 1)
Dict:組合成有名字的復(fù)合空間。例如,Dict({'position':Discrete(2), 'velocity':Discrete(3)})
from gym.spaces import *
space = Dict({'position':Discrete(2), 'velocity':Discrete(3)})
print(space.sample())
輸出是
OrderedDict([('position', 1), ('velocity', 1)])
NES 1942 動(dòng)作空間配置
了解了 gym/retro 的動(dòng)作空間,我們來看看1942的默認(rèn)動(dòng)作空間
env = retro.make(game='1942-Nes')
print("The size of action is: ", env.action_space.shape)
The size of action is: (9,)
表示有9個(gè) Discrete 動(dòng)作,包括 start, select這些控制鍵。
從訓(xùn)練1942角度來說,我們希望指定最少的有效動(dòng)作取得最好的成績(jī)。根據(jù)經(jīng)驗(yàn),我們知道這個(gè)游戲最重要的鍵是4個(gè)方向加上 fire 鍵。限定游戲動(dòng)作空間,官方的做法是在創(chuàng)建游戲環(huán)境時(shí),指定預(yù)先生成的動(dòng)作輸入配置文件。但是這個(gè)方式相對(duì)麻煩,我們采用了直接指定按鍵的二進(jìn)制表示來達(dá)到同樣的目的,此時(shí),需要設(shè)置 use_restricted_actions=retro.Actions.FILTERED。
下面的代碼限制了6種按鍵,并隨機(jī)play。
action_list = [
# No Operation
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
# Left
[0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
# Right
[0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
# Down
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
# Up
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
# B
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
]
def random_play(env, action_list, sleep_seconds=0.01):
env.viewer = None
state = env.reset()
score = 0
for j in range(10000):
env.render()
time.sleep(sleep_seconds)
action = np.random.randint(len(action_list))
next_state, reward, done, _ = env.step(action_list[action])
state = next_state
score += reward
if done:
print("Episode Score: ", score)
env.reset()
break
env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)
random_play(env, action_list)
來看看其游戲效果,全隨機(jī)死的還是比較快。

圖像輸入處理
一般對(duì)于通過屏幕像素作為輸入的RL end-to-end訓(xùn)練來說,對(duì)圖像做預(yù)處理很關(guān)鍵。因?yàn)樵紙D像較大,一方面我們希望能盡量壓縮圖像到比較小的tensor,另一方面又要保證關(guān)鍵信息不丟失,比如子彈的圖像不能因?yàn)閳D片縮小而消失。另外的一個(gè)通用技巧是將多個(gè)連續(xù)的frame合并起來組成立體的frame,這樣可以有效表示連貫動(dòng)作。
下面的代碼通過 pipeline 將游戲每幀原始圖像從shape (224, 240, 3) 轉(zhuǎn)換成 (4, 84, 84),也就是原始的 width=224,height=240,rgb=3轉(zhuǎn)換成 width=84,height=240,stack_size=4的黑白圖像。具體 pipeline為
MaxAndSkipEnv:每?jī)蓭^濾一幀圖像,減少數(shù)據(jù)量。
FrameDownSample:down sample 圖像到指定小分辨率 84x84,并從彩色降到黑白。
FrameBuffer:合并連續(xù)的4幀,形成 (4, 84, 84) 的圖像輸入
def build_env():
env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)
env = MaxAndSkipEnv(env, skip=2)
env = FrameDownSample(env, (1, -1, -1, 1))
env = FrameBuffer(env, 4)
env.seed(0)
return env
觀察圖像維度變換
env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)
print("Initial shape: ", env.observation_space.shape)
env = build_env(env)
print("Processed shape: ", env.observation_space.shape)
確保shape 從 (224, 240, 3) 轉(zhuǎn)換成 (4, 84, 84)
Initial shape: (224, 240, 3)
Processed shape: (4, 84, 84)
FrameDownSample實(shí)現(xiàn)如下,我們使用了 cv2 類庫來完成黑白化和圖像縮放
class FrameDownSample(ObservationWrapper):
def __init__(self, env, exclude, width=84, height=84):
super(FrameDownSample, self).__init__(env)
self.exclude = exclude
self.observation_space = Box(low=0,
high=255,
shape=(width, height, 1),
dtype=np.uint8)
self._width = width
self._height = height
def observation(self, observation):
# convert image to gray scale
screen = cv2.cvtColor(observation, cv2.COLOR_RGB2GRAY)
# crop screen [up: down, left: right]
screen = screen[self.exclude[0]:self.exclude[2], self.exclude[3]:self.exclude[1]]
# to float, and normalized
screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
# resize image
screen = cv2.resize(screen, (self._width, self._height), interpolation=cv2.INTER_AREA)
return screen
MaxAndSkipEnv,每?jī)蓭^濾一幀
class MaxAndSkipEnv(Wrapper):
def __init__(self, env=None, skip=4):
super(MaxAndSkipEnv, self).__init__(env)
self._obs_buffer = deque(maxlen=2)
self._skip = skip
def step(self, action):
total_reward = 0.0
done = None
for _ in range(self._skip):
obs, reward, done, info = self.env.step(action)
self._obs_buffer.append(obs)
total_reward += reward
if done:
break
max_frame = np.max(np.stack(self._obs_buffer), axis=0)
return max_frame, total_reward, done, info
def reset(self):
self._obs_buffer.clear()
obs = self.env.reset()
self._obs_buffer.append(obs)
return obs
FrameBuffer,將最近的4幀合并起來
class FrameBuffer(ObservationWrapper):
def __init__(self, env, num_steps, dtype=np.float32):
super(FrameBuffer, self).__init__(env)
obs_space = env.observation_space
self._dtype = dtype
self.observation_space = Box(low=0, high=255, shape=(num_steps, obs_space.shape[0], obs_space.shape[1]), dtype=self._dtype)
def reset(self):
frame = self.env.reset()
self.buffer = np.stack(arrays=[frame, frame, frame, frame])
return self.buffer
def observation(self, observation):
self.buffer[:-1] = self.buffer[1:]
self.buffer[-1] = observation
return self.buffer
最后,visualize 處理后的圖像,同樣還是在隨機(jī)play中,確保關(guān)鍵信息不丟失
def random_play_preprocessed(env, action_list, sleep_seconds=0.01):
import matplotlib.pyplot as plt
env.viewer = None
state = env.reset()
score = 0
for j in range(10000):
time.sleep(sleep_seconds)
action = np.random.randint(len(action_list))
plt.imshow(state[-1], cmap="gray")
plt.title('Pre Processed image')
plt.pause(sleep_seconds)
next_state, reward, done, _ = env.step(action_list[action])
state = next_state
score += reward
if done:
print("Episode Score: ", score)
env.reset()
break
matplotlib 動(dòng)畫輸出

CNN Actor & Critic
Actor 和 Critic 模型相同,輸入是 (4, 84, 84) 的圖像,輸出是 [0, 5] 的action index。
class Actor(nn.Module):
def __init__(self, input_shape, num_actions):
super(Actor, self).__init__()
self.input_shape = input_shape
self.num_actions = num_actions
self.features = nn.Sequential(
nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU()
)
self.fc = nn.Sequential(
nn.Linear(self.feature_size(), 512),
nn.ReLU(),
nn.Linear(512, self.num_actions),
nn.Softmax(dim=1)
)
def forward(self, x):
x = self.features(x)
x = x.view(x.size(0), -1)
x = self.fc(x)
dist = Categorical(x)
return dist
PPO核心代碼
先計(jì)算 ,這里采用了一個(gè)技巧,對(duì) 取 log,相減再取 exp,這樣可以增強(qiáng)數(shù)值穩(wěn)定性。
dist = self.actor_net(state)
new_log_probs = dist.log_prob(action)
ratio = (new_log_probs - old_log_probs).exp()
surr1 = ratio * advantage
surr1 對(duì)應(yīng)PPO論文中的

然后計(jì)算 surr2,對(duì)應(yīng) 中的 clip 部分,clip可以由 torch.clamp 函數(shù)實(shí)現(xiàn)。 則對(duì)應(yīng) actor_loss。
surr2 = torch.clamp(ratio, 1.0 - self.clip_param, 1.0 + self.clip_param) * advantage
actor_loss = - torch.min(surr1, surr2).mean()
最后,計(jì)算總的 loss ,包括 actor_loss,critic_loss 和 policy的 entropy。
entropy = dist.entropy().mean()
critic_loss = (return_ - value).pow(2).mean()
loss = actor_loss + 0.5 * critic_loss - 0.001 * entropy 
上述完整代碼如下
for _ in range(self.ppo_epoch):
for state, action, old_log_probs, return_, advantage in sample_batch():
dist = self.actor_net(state)
value = self.critic_net(state)
entropy = dist.entropy().mean()
new_log_probs = dist.log_prob(action)
ratio = (new_log_probs - old_log_probs).exp()
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1.0 - self.clip_param, 1.0 + self.clip_param) * advantage
actor_loss = - torch.min(surr1, surr2).mean()
critic_loss = (return_ - value).pow(2).mean()
loss = actor_loss + 0.5 * critic_loss - 0.001 * entropy
# Minimize the loss
self.actor_optimizer.zero_grad()
self.critic_optimizer.zero_grad()
loss.backward()
self.actor_optimizer.step()
self.critic_optimizer.step()
補(bǔ)充一下 GAE 的計(jì)算,advantage 根據(jù)公式

可以轉(zhuǎn)換成如下代碼
def compute_gae(self, next_value):
gae = 0
returns = []
values = self.values + [next_value]
for step in reversed(range(len(self.rewards))):
delta = self.rewards[step] + self.gamma * values[step + 1] * self.masks[step] - values[step]
gae = delta + self.gamma * self.tau * self.masks[step] * gae
returns.insert(0, gae + values[step])
return returns
外層 Training 代碼
外層調(diào)用代碼基于隨機(jī) play 的邏輯,agent.act()封裝了采樣和 forward prop,agent.step() 則封裝了 backprop 和參數(shù)學(xué)習(xí)迭代的邏輯。
for i_episode in range(start_epoch + 1, n_episodes + 1):
state = env.reset()
score = 0
timestamp = 0
while timestamp < 10000:
action, log_prob, value = agent.act(state)
next_state, reward, done, info = env.step(action_list[action])
score += reward
timestamp += 1
agent.step(state, action, value, log_prob, reward, done, next_state)
if done:
break
else:
state = next_state
訓(xùn)練結(jié)果
讓我們來看看學(xué)習(xí)的效果吧,注意我們的飛機(jī)學(xué)到了一些關(guān)鍵的技巧,躲避子彈;飛到角落盡快擊斃敵機(jī);一定程度預(yù)測(cè)敵機(jī)出現(xiàn)的位置并預(yù)先走到位置。
往期精彩回顧
本站qq群851320808,加入微信群請(qǐng)掃碼:
