<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          深度強(qiáng)化學(xué)習(xí)之:PPO訓(xùn)練紅白機(jī)1942

          共 21796字,需瀏覽 44分鐘

           ·

          2021-03-31 16:10

          本篇是深度強(qiáng)化學(xué)習(xí)動(dòng)手系列文章,自MyEncyclopedia公眾號(hào)文章深度強(qiáng)化學(xué)習(xí)之:DQN訓(xùn)練超級(jí)瑪麗闖關(guān)發(fā)布后收到不少關(guān)注和反饋,這一期,讓我們實(shí)現(xiàn)目前主流深度強(qiáng)化學(xué)習(xí)算法PPO來打另一個(gè)紅白機(jī)經(jīng)典游戲1942。

          相關(guān)文章鏈接如下:

          強(qiáng)化學(xué)習(xí)開源環(huán)境集

          視頻論文解讀:PPO算法

          視頻論文解讀:組合優(yōu)化的強(qiáng)化學(xué)習(xí)方法

          解讀TRPO論文,深度強(qiáng)化學(xué)習(xí)結(jié)合傳統(tǒng)優(yōu)化方法

          解讀深度強(qiáng)化學(xué)習(xí)基石論文:函數(shù)近似的策略梯度方法


          NES 1942 環(huán)境安裝

          紅白機(jī)游戲環(huán)境可以由OpenAI Retro來模擬,OpenAI Retro還在 Gym 集成了其他的經(jīng)典游戲環(huán)境,包括Atari 2600,GBA,SNES等。

          不過,受到版權(quán)原因,除了一些基本的rom,大部分游戲需要自行獲取rom。

          環(huán)境準(zhǔn)備部分相關(guān)代碼如下

          pip install gym-retro
          python -m retro.import /path/to/your/ROMs/directory/

          OpenAI Gym 輸入動(dòng)作類型

          在創(chuàng)建 retro 環(huán)境時(shí),可以在retro.make中通過參數(shù)use_restricted_actions指定 action space,即按鍵的配置。

          env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)

          可選參數(shù)如下,F(xiàn)ILTERED,DISCRETE和MULTI_DISCRETE 都可以指定過濾的動(dòng)作,過濾動(dòng)作需要通過配置文件加載。

          class Actions(Enum):
              """
              Different settings for the action space of the environment
              """

              ALL = 0  #: MultiBinary action space with no filtered actions
              FILTERED = 1  #: MultiBinary action space with invalid or not allowed actions filtered out
              DISCRETE = 2  #: Discrete action space for filtered actions
              MULTI_DISCRETE = 3  #: MultiDiscete action space for filtered actions

          DISCRETE和MULTI_DISCRETE 是 Gym 里的 Action概念,它們的基類都是gym.spaces.Space,可以通過 sample()方法采樣,下面具體一一介紹。

          • Discrete:對(duì)應(yīng)一維離散空間,例如,Discrete(n=4) 表示 [0, 3] 范圍的整數(shù)。
          from gym.spaces import Discrete
          space = Discrete(4)
          print(space.sample())

          輸出是

          3


          • Box:對(duì)應(yīng)多維連續(xù)空間,每一維的范圍可以用 [low,high] 指定。舉例,Box(low=-1.0, high=2, shape=(3, 4,), dtype=np.float32) 表示 shape 是 [3, 4],每個(gè)范圍在 [-1, 2] 的float32型 tensor。
          from gym.spaces import Box
          import numpy as np
          space = Box(low=-1.0, high=2.0, shape=(34), dtype=np.float32)
          print(space.sample())

          輸出是

          [[-0.7538084   0.96901214  0.38641307 -0.05045208]
           [-0.85486996  1.3516271   0.3222616   1.2540635 ]
           [-0.29908678 -0.8970335   1.4869047   0.7007356 ]]


          • MultiBinary: 0或1的多維離散空間。例如,MultiBinary([3,2]) 表示 shape 是3x2的0或1的tensor。
          from gym.spaces import MultiBinary
          space = MultiBinary([3,2])
          print(space.sample())

          輸出是

          [[1 0]
           [1 1]
           [0 0]]


          • MultiDiscrete:多維整型離散空間。例如,MultiDiscrete([5,2,2]) 表示三維Discrete空間,第一維范圍在 [0-4],第二,三維范圍在[0-1]。
          from gym.spaces import MultiDiscrete
          space = MultiDiscrete([5,2,2])
          print(space.sample())

          輸出是

          [2 1 0]


          • Tuple:組合成 tuple 復(fù)合空間。舉例來說,可以將 Box,Discrete,Discrete組成tuple 空間:Tuple(spaces=(Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32), Discrete(n=3), Discrete(n=2)))
          from gym.spaces import *
          import numpy as np
          space = Tuple(spaces=(Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32), Discrete(n=3), Discrete(n=2)))
          print(space.sample())

          輸出是

          (array([ 0.22640526,  0.75286865, -0.6309239 ], dtype=float32), 0, 1)


          • Dict:組合成有名字的復(fù)合空間。例如,Dict({'position':Discrete(2), 'velocity':Discrete(3)})
          from gym.spaces import *
          space = Dict({'position':Discrete(2), 'velocity':Discrete(3)})
          print(space.sample())

          輸出是

          OrderedDict([('position', 1), ('velocity', 1)])

          NES 1942 動(dòng)作空間配置

          了解了 gym/retro 的動(dòng)作空間,我們來看看1942的默認(rèn)動(dòng)作空間

          env = retro.make(game='1942-Nes')
          print("The size of action is: ", env.action_space.shape)
          The size of action is:  (9,)

          表示有9個(gè) Discrete 動(dòng)作,包括 start, select這些控制鍵。

          從訓(xùn)練1942角度來說,我們希望指定最少的有效動(dòng)作取得最好的成績(jī)。根據(jù)經(jīng)驗(yàn),我們知道這個(gè)游戲最重要的鍵是4個(gè)方向加上 fire 鍵。限定游戲動(dòng)作空間,官方的做法是在創(chuàng)建游戲環(huán)境時(shí),指定預(yù)先生成的動(dòng)作輸入配置文件。但是這個(gè)方式相對(duì)麻煩,我們采用了直接指定按鍵的二進(jìn)制表示來達(dá)到同樣的目的,此時(shí),需要設(shè)置 use_restricted_actions=retro.Actions.FILTERED。

          下面的代碼限制了6種按鍵,并隨機(jī)play。

          action_list = [
              # No Operation
              [000000000000],
              # Left
              [000000100000],
              # Right
              [000000010000],
              # Down
              [000001000000],
              # Up
              [000010000000],
              # B
              [100000000000],
          ]

          def random_play(env, action_list, sleep_seconds=0.01):
              env.viewer = None
              state = env.reset()
              score = 0
              for j in range(10000):
                  env.render()
                  time.sleep(sleep_seconds)
                  action = np.random.randint(len(action_list))

                  next_state, reward, done, _ = env.step(action_list[action])
                  state = next_state
                  score += reward
                  if done:
                      print("Episode Score: ", score)
                      env.reset()
                      break
                      
          env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)
          random_play(env, action_list)

          來看看其游戲效果,全隨機(jī)死的還是比較快。

           

          圖像輸入處理

          一般對(duì)于通過屏幕像素作為輸入的RL end-to-end訓(xùn)練來說,對(duì)圖像做預(yù)處理很關(guān)鍵。因?yàn)樵紙D像較大,一方面我們希望能盡量壓縮圖像到比較小的tensor,另一方面又要保證關(guān)鍵信息不丟失,比如子彈的圖像不能因?yàn)閳D片縮小而消失。另外的一個(gè)通用技巧是將多個(gè)連續(xù)的frame合并起來組成立體的frame,這樣可以有效表示連貫動(dòng)作。

          下面的代碼通過 pipeline 將游戲每幀原始圖像從shape (224, 240, 3) 轉(zhuǎn)換成 (4, 84, 84),也就是原始的 width=224,height=240,rgb=3轉(zhuǎn)換成 width=84,height=240,stack_size=4的黑白圖像。具體 pipeline為

          1. MaxAndSkipEnv:每?jī)蓭^濾一幀圖像,減少數(shù)據(jù)量。

          2. FrameDownSample:down sample 圖像到指定小分辨率 84x84,并從彩色降到黑白。

          3. FrameBuffer:合并連續(xù)的4幀,形成 (4, 84, 84) 的圖像輸入

          def build_env():
              env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)
              env = MaxAndSkipEnv(env, skip=2)
              env = FrameDownSample(env, (1-1-11))
              env = FrameBuffer(env, 4)
              env.seed(0)
              return env

          觀察圖像維度變換

          env = retro.make(game='1942-Nes', use_restricted_actions=retro.Actions.FILTERED)
          print("Initial shape: ", env.observation_space.shape)

          env = build_env(env)
          print("Processed shape: ", env.observation_space.shape)

          確保shape 從 (224, 240, 3) 轉(zhuǎn)換成 (4, 84, 84)

          Initial shape:  (224, 240, 3)
          Processed shape:  (4, 84, 84)

          FrameDownSample實(shí)現(xiàn)如下,我們使用了 cv2 類庫來完成黑白化和圖像縮放

          class FrameDownSample(ObservationWrapper):
              def __init__(self, env, exclude, width=84, height=84):
                  super(FrameDownSample, self).__init__(env)
                  self.exclude = exclude
                  self.observation_space = Box(low=0,
                                               high=255,
                                               shape=(width, height, 1),
                                               dtype=np.uint8)
                  self._width = width
                  self._height = height

              def observation(self, observation):
                  # convert image to gray scale
                  screen = cv2.cvtColor(observation, cv2.COLOR_RGB2GRAY)

                  # crop screen [up: down, left: right]
                  screen = screen[self.exclude[0]:self.exclude[2], self.exclude[3]:self.exclude[1]]

                  # to float, and normalized
                  screen = np.ascontiguousarray(screen, dtype=np.float32) / 255

                  # resize image
                  screen = cv2.resize(screen, (self._width, self._height), interpolation=cv2.INTER_AREA)
                  return screen

          MaxAndSkipEnv,每?jī)蓭^濾一幀

          class MaxAndSkipEnv(Wrapper):
              def __init__(self, env=None, skip=4):
                  super(MaxAndSkipEnv, self).__init__(env)
                  self._obs_buffer = deque(maxlen=2)
                  self._skip = skip

              def step(self, action):
                  total_reward = 0.0
                  done = None
                  for _ in range(self._skip):
                      obs, reward, done, info = self.env.step(action)
                      self._obs_buffer.append(obs)
                      total_reward += reward
                      if done:
                          break
                  max_frame = np.max(np.stack(self._obs_buffer), axis=0)
                  return max_frame, total_reward, done, info

              def reset(self):
                  self._obs_buffer.clear()
                  obs = self.env.reset()
                  self._obs_buffer.append(obs)
                  return obs

          FrameBuffer,將最近的4幀合并起來

          class FrameBuffer(ObservationWrapper):
              def __init__(self, env, num_steps, dtype=np.float32):
                  super(FrameBuffer, self).__init__(env)
                  obs_space = env.observation_space
                  self._dtype = dtype
                  self.observation_space = Box(low=0, high=255, shape=(num_steps, obs_space.shape[0], obs_space.shape[1]), dtype=self._dtype)

              def reset(self):
                  frame = self.env.reset()
                  self.buffer = np.stack(arrays=[frame, frame, frame, frame])
                  return self.buffer

              def observation(self, observation):
                  self.buffer[:-1] = self.buffer[1:]
                  self.buffer[-1] = observation
                  return self.buffer

          最后,visualize 處理后的圖像,同樣還是在隨機(jī)play中,確保關(guān)鍵信息不丟失

          def random_play_preprocessed(env, action_list, sleep_seconds=0.01):
              import matplotlib.pyplot as plt

              env.viewer = None
              state = env.reset()
              score = 0
              for j in range(10000):
                  time.sleep(sleep_seconds)
                  action = np.random.randint(len(action_list))

                  plt.imshow(state[-1], cmap="gray")
                  plt.title('Pre Processed image')
                  plt.pause(sleep_seconds)

                  next_state, reward, done, _ = env.step(action_list[action])
                  state = next_state
                  score += reward
                  if done:
                      print("Episode Score: ", score)
                      env.reset()
                      break

          matplotlib 動(dòng)畫輸出

           

          CNN Actor & Critic

          Actor 和 Critic 模型相同,輸入是 (4, 84, 84) 的圖像,輸出是 [0, 5] 的action index。

          class Actor(nn.Module):
              def __init__(self, input_shape, num_actions):
                  super(Actor, self).__init__()
                  self.input_shape = input_shape
                  self.num_actions = num_actions

                  self.features = nn.Sequential(
                      nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
                      nn.ReLU(),
                      nn.Conv2d(3264, kernel_size=4, stride=2),
                      nn.ReLU(),
                      nn.Conv2d(6464, kernel_size=3, stride=1),
                      nn.ReLU()
                  )

                  self.fc = nn.Sequential(
                      nn.Linear(self.feature_size(), 512),
                      nn.ReLU(),
                      nn.Linear(512, self.num_actions),
                      nn.Softmax(dim=1)
                  )

              def forward(self, x):
                  x = self.features(x)
                  x = x.view(x.size(0), -1)
                  x = self.fc(x)
                  dist = Categorical(x)
                  return dist

          PPO核心代碼

          先計(jì)算 ,這里采用了一個(gè)技巧,對(duì) 取 log,相減再取 exp,這樣可以增強(qiáng)數(shù)值穩(wěn)定性。

          dist = self.actor_net(state)
          new_log_probs = dist.log_prob(action)
          ratio = (new_log_probs - old_log_probs).exp()
          surr1 = ratio * advantage

          surr1 對(duì)應(yīng)PPO論文中的

           

          然后計(jì)算 surr2,對(duì)應(yīng) 中的 clip 部分,clip可以由 torch.clamp 函數(shù)實(shí)現(xiàn)。 則對(duì)應(yīng) actor_loss。

          surr2 = torch.clamp(ratio, 1.0 - self.clip_param, 1.0 + self.clip_param) * advantage
          actor_loss = - torch.min(surr1, surr2).mean()

           

          最后,計(jì)算總的 loss ,包括 actor_loss,critic_loss 和 policy的 entropy。

          entropy = dist.entropy().mean()

          critic_loss = (return_ - value).pow(2).mean()
          loss = actor_loss + 0.5 * critic_loss - 0.001 * entropy 



          上述完整代碼如下

          for _ in range(self.ppo_epoch):
              for state, action, old_log_probs, return_, advantage in sample_batch():
                  dist = self.actor_net(state)
                  value = self.critic_net(state)

                  entropy = dist.entropy().mean()
                  new_log_probs = dist.log_prob(action)

                  ratio = (new_log_probs - old_log_probs).exp()
                  surr1 = ratio * advantage
                  surr2 = torch.clamp(ratio, 1.0 - self.clip_param, 1.0 + self.clip_param) * advantage

                  actor_loss = - torch.min(surr1, surr2).mean()
                  critic_loss = (return_ - value).pow(2).mean()

                  loss = actor_loss + 0.5 * critic_loss - 0.001 * entropy

                  # Minimize the loss
                  self.actor_optimizer.zero_grad()
                  self.critic_optimizer.zero_grad()
                  loss.backward()
                  self.actor_optimizer.step()
                  self.critic_optimizer.step()

          補(bǔ)充一下 GAE 的計(jì)算,advantage 根據(jù)公式

          可以轉(zhuǎn)換成如下代碼

          def compute_gae(self, next_value):
              gae = 0
              returns = []
              values = self.values + [next_value]
              for step in reversed(range(len(self.rewards))):
                  delta = self.rewards[step] + self.gamma * values[step + 1] * self.masks[step] - values[step]
                  gae = delta + self.gamma * self.tau * self.masks[step] * gae
                  returns.insert(0, gae + values[step])
              return returns

          外層 Training 代碼

          外層調(diào)用代碼基于隨機(jī) play 的邏輯,agent.act()封裝了采樣和 forward prop,agent.step() 則封裝了 backprop 和參數(shù)學(xué)習(xí)迭代的邏輯。

          for i_episode in range(start_epoch + 1, n_episodes + 1):
              state = env.reset()
              score = 0
              timestamp = 0

              while timestamp < 10000:
                  action, log_prob, value = agent.act(state)
                  next_state, reward, done, info = env.step(action_list[action])
                  score += reward
                  timestamp += 1

                  agent.step(state, action, value, log_prob, reward, done, next_state)
                  if done:
                      break
                  else:
                      state = next_state

          訓(xùn)練結(jié)果

          讓我們來看看學(xué)習(xí)的效果吧,注意我們的飛機(jī)學(xué)到了一些關(guān)鍵的技巧,躲避子彈;飛到角落盡快擊斃敵機(jī);一定程度預(yù)測(cè)敵機(jī)出現(xiàn)的位置并預(yù)先走到位置。




          往期精彩回顧





          本站qq群851320808,加入微信群請(qǐng)掃碼:

          瀏覽 75
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产道高无码视频91 | 黄片在线看。 | 东北骚妇大战黑人视频 | 亚洲A∨无码无在线观看 | 精品无码人妻一区二区免费蜜桃 |