<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          asyncio 并發(fā)任務(wù),如何限制協(xié)程的并發(fā)數(shù)?

          共 7453字,需瀏覽 15分鐘

           ·

          2021-09-03 22:26

          劇照:反黑風暴

          作者:kingname

          來源:未聞Code

          有同學問,如果使用 asyncio + httpx 實現(xiàn)并發(fā)請求,怎么限制請求的頻率呢?怎么限制最多只能有 x 個請求同時發(fā)出呢?我們今天給出兩種方案。

          提出問題

          假設(shè)如果我們同時發(fā)起12個請求,每個請求的時間不同,那么總共的請求時間大概跟最長耗時的請求差不多。我們先來寫一個用于測試的例子:

          import asyncio
          import httpx
          import time


          async def req(delay):
              print(f'請求一個延遲為{delay}秒的接口')
              async with httpx.AsyncClient(timeout=20as client:
                  resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
                  result = resp.json()
                  print(result)


          async def main():
              start = time.time()
              delay_list = [361824527398]
              task_list = []
              for delay in delay_list:
                  task = asyncio.create_task(req(delay))
                  task_list.append(task)
              await asyncio.gather(*task_list)
              end = time.time()
              print(f'一共耗時:{end - start}')

          asyncio.run(main())

          這段代碼,使用 for 循環(huán)創(chuàng)建了12個協(xié)程任務(wù),這些任務(wù)幾乎同時運行,于是,請求完成所有的接口,總共耗時如下圖所示:

          現(xiàn)在的問題是,由于網(wǎng)站有反爬蟲機制,最多只能同時發(fā)起3個請求。那么我們怎么確保同一時間最多只有3個協(xié)程在請求網(wǎng)絡(luò)呢?

          限制協(xié)程任務(wù)數(shù)

          第一個方案跟以前限制多線程的線程數(shù)的方案相同。我們創(chuàng)建一個列表,確保列表里面最多只有3個任務(wù),然后持續(xù)循環(huán)檢查,發(fā)現(xiàn)有任務(wù)完成了,就移除這個完成的任務(wù),并加入一個新的任務(wù),直到待爬的列表為空,這個任務(wù)列表也為空。代碼如下:

          import asyncio
          import httpx
          import time


          async def req(delay):
              print(f'請求一個延遲為{delay}秒的接口')
              async with httpx.AsyncClient(timeout=20as client:
                  resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
                  result = resp.json()
                  print(result)


          async def main():
              start = time.time()
              delay_list = [361824527398]
              task_list = []
              while True:
                  if not delay_list and not task_list:
                      break
                  while len(task_list) < 3:
                      if delay_list:
                          delay = delay_list.pop()
                          task = asyncio.create_task(req(delay))
                          task_list.append(task)
                      else:
                          break
                  task_list = [task for task in task_list if not task.done()]
                  await asyncio.sleep(1)
              end = time.time()
              print(f'一共耗時:{end - start}')

          asyncio.run(main())

          運行效果如下圖所示:

          總共耗時大概28秒左右。比串行需要的58秒快了一半,但比全部同時并發(fā)多了一倍。

          使用 Semaphore

          asyncio 實際上自帶了一個限制協(xié)程數(shù)量的類,叫做Semaphore。我們只需要初始化它,傳入最大允許的協(xié)程數(shù)量,然后就可以通過上下文管理器來使用。我們看一下代碼:

          import asyncio
          import httpx
          import time


          async def req(delay, sem):
              print(f'請求一個延遲為{delay}秒的接口')
              async with sem:
                  async with httpx.AsyncClient(timeout=20as client:
                      resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
                      result = resp.json()
                      print(result)


          async def main():
              start = time.time()
              delay_list = [361824527398]
              task_list = []
              sem = asyncio.Semaphore(3)
              for delay in delay_list:
                  task = asyncio.create_task(req(delay, sem))
                  task_list.append(task)
              await asyncio.gather(*task_list)

              end = time.time()
              print(f'一共耗時:{end - start}')

          asyncio.run(main())

          運行效果如下圖所示:

          耗時為22秒,比第一個方案更快。

          我們來看看Semaphore的用法,它的格式為:

          sem = asyncio.Semaphore(同時運行的協(xié)程數(shù)量)

          async def func(sem):
              async with sem:
                  這里是并發(fā)執(zhí)行的代碼

          task_list = []
          for _ in range(總共需要執(zhí)行的任務(wù)數(shù)):
              task = asyncio.create_task(func(sem))
              task_list.append(task)
          await asyncio.gather(*task_list)

          當我們要限制一個協(xié)程的并發(fā)數(shù)的時候,可以在調(diào)用協(xié)程之前,先初始化一個Semaphore對象。然后把這個對象傳到需要限制并發(fā)的協(xié)程里面,在協(xié)程里面,使用異步上下文管理器包住你的正式代碼:

          async with sem:
              正式代碼

          這樣一來,如果并發(fā)數(shù)沒有達到限制,那么async with sem會瞬間執(zhí)行完成,進入里面的正式代碼中。如果并發(fā)數(shù)已經(jīng)達到了限制,那么其他的協(xié)程會阻塞在async with sem這個地方,直到正在運行的某個協(xié)程完成了,退出了,才會放行一個新的協(xié)程去替換掉這個已經(jīng)完成的協(xié)程。

          這個寫法其實跟多線程的加鎖很像。只不過鎖是確保同一個時間只有一個線程在運行,而Semaphore可以人為指定能有多少個協(xié)程同時運行。

          如何限制1分鐘內(nèi)能夠運行的協(xié)程數(shù)

          可能同學看了上面的例子以后,只知道如何限制同時運行的協(xié)程數(shù)。但是怎么限制在一段時間里同時運行的協(xié)程數(shù)呢?

          其實非常簡單,在并發(fā)的協(xié)程里面加個 asyncio.sleep 就可以了。例如上面的例子,我想限制每分鐘只能有3個協(xié)程,那么可以把代碼改為:

          async def req(delay, sem):
              print(f'請求一個延遲為{delay}秒的接口')
              async with sem:
                  async with httpx.AsyncClient(timeout=20as client:
                      resp = await client.get(f'http://127.0.0.1:8000/sleep/{delay}')
                      result = resp.json()
                      print(result)
              await asyncio.sleep(60)

          總結(jié)

          如果大家要限制協(xié)程的并發(fā)數(shù),那么最簡單的辦法就是使用asyncio.Semaphore。但需要注意的是,只能在啟動協(xié)程之前初始化它,然后傳給協(xié)程。要確保所有并發(fā)協(xié)程拿到的是同一個Semaphore對象。

          當然,你的程序里面,可能有多個不同的部分,有些部分限制并發(fā)數(shù)為 a,有些部分限制并發(fā)數(shù)為 b。那么你可以初始化多個Semaphore對象,分別傳給不同的協(xié)程。

          Python貓技術(shù)交流群開放啦!群里既有國內(nèi)一二線大廠在職員工,也有國內(nèi)外高校在讀學生,既有十多年碼齡的編程老鳥,也有中小學剛剛?cè)腴T的新人,學習氛圍良好!想入群的同學,請在公號內(nèi)回復『交流群』,獲取貓哥的微信(謝絕廣告黨,非誠勿擾!)~


          還不過癮?試試它們




          堅持原創(chuàng)很難,但我不會放棄!

          Google 內(nèi)部的 Python 代碼風格指南

          在手機上 Python 編程,可以試試它!

          2021年,不容錯過的7個VS Code插件

          Python 中 Redis 庫分布式鎖簡單分析

          如何美觀地打印 Python 對象?這個標準庫可以簡單實現(xiàn)


          如果你覺得本文有幫助
          請慷慨分享點贊,感謝啦
          瀏覽 55
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产嫩苞又嫩又紧AV在线 | 欧美三级理论片 | 亚洲HD色网站 | 自拍偷拍影音先锋 | 国产精品一二三级 |