<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          #multiprocessing# Python多進(jìn)程與多線程

          共 13583字,需瀏覽 28分鐘

           ·

          2023-09-18 23:50

           文章所涉及內(nèi)容更多來自網(wǎng)絡(luò),在此聲明,并感謝知識的貢獻(xiàn)者!

          Python的多線程效率問題

          Python的多線程效率問題
          盡管Python完全支持多線程編程, 但是解釋器的C語言實(shí)現(xiàn)部分在完全并行執(zhí)行時(shí)并不是線程安全的。實(shí)際上,解釋器被一個(gè)全局解釋器鎖保護(hù)著,它確保任何時(shí)候都只有一個(gè)Python線程執(zhí)行。GIL最大的問題就是Python的多線程程序并不能利用多核CPU的優(yōu)勢 (比如一個(gè)使用了多個(gè)線程的計(jì)算密集型程序只會(huì)在一個(gè)單CPU上面運(yùn)行)。
          在討論普通的GIL之前,有一點(diǎn)要強(qiáng)調(diào)的是GIL只會(huì)影響到那些嚴(yán)重依賴CPU的程序(比如計(jì)算型的)。如果你的程序大部分只會(huì)涉及到I/O,比如網(wǎng)絡(luò)交互,那么使用多線程就很合適, 因?yàn)樗鼈兇蟛糠謺r(shí)間都在等待。實(shí)際上,你完全可以放心的創(chuàng)建幾千個(gè)Python線程, 現(xiàn)代操作系統(tǒng)運(yùn)行這么多線程沒有任何壓力,沒啥可擔(dān)心的。

          Python的多線程與多進(jìn)程


          Python的多線程與多進(jìn)程分析
          -多進(jìn)程是多核運(yùn)算
          -效率:運(yùn)行耗時(shí)最少是:多進(jìn)程 < 普通 < 多線程


          Python的多線程與多進(jìn)程示例


          Python的多線程與多進(jìn)程示例
          import multiprocessing as mp
          import threading as td

          def joba(a, d):
              print('job-a')
              for i in range(20):
                  pass
              print('job-a-finish')

          def jobb(a, d):
              print('job-b')
              while 1:
                  pass
              print('job-b-finish')

          if __name__ == "__main__":
              t1 = td.Thread(target=jobb, args=(1, 2))
              p1 = mp.Process(target=joba, args=(1, 2))
              t1.start()
              p1.start()
              t1.join()
              p1.join()

           Python多線程或多進(jìn)程的輸出存儲(chǔ)
          Queue的功能是將每個(gè)核或線程的運(yùn)算結(jié)果放在隊(duì)里中, 等到每個(gè)線程或核運(yùn)行完畢后再從隊(duì)列中取出結(jié)果, 繼續(xù)加載運(yùn)算。原因很簡單, 多線程調(diào)用的函數(shù)不能有返回值, 所以使用Queue存儲(chǔ)多個(gè)線程運(yùn)算的結(jié)果


          Python的多進(jìn)程池示例


          Python的多進(jìn)程池示例
          import multiprocessing as mp
          def job(x):
              return x * x
          def multicore():
              pool = mp.Pool()
              res = pool.map(job, range(10))
              print(res)
          if __name__ == '__main__':
              multicore()


          import multiprocessing as mp
          def job(x):
              return x * x
          def multicore():
              pool = mp.Pool()
              res = pool.map(job, range(10))
              print(res)
              res = pool.apply_async(job, (2,))
              # 用get獲得結(jié)果
              print(res.get())
              # 迭代器,i=0時(shí)apply一次,i=1時(shí)apply一次等等
              multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
              # 從迭代器中取出
              print([res.get() for res in multi_res])
          if __name__ == '__main__':
              multicore()
          總結(jié)
          Pool默認(rèn)調(diào)用是CPU的核數(shù),傳入processes參數(shù)可自定義CPU核數(shù)
          map() 放入迭代參數(shù),返回多個(gè)結(jié)果
          apply_async()只能放入一組參數(shù),并返回一個(gè)結(jié)果,如果想得到map()的效果需要通過迭代

          Python的共享內(nèi)存


          共享內(nèi)存
          只有用共享內(nèi)存才能讓CPU之間有交流
          import multiprocessing as mp
          value1 = mp.Value('i', 0)
          value2 = mp.Value('d', 3.14)
          array = mp.Array('i', [1, 2, 3, 4])

          其中d和i參數(shù)用來設(shè)置數(shù)據(jù)類型的,d表示一個(gè)雙精浮點(diǎn)類型,i表示一個(gè)帶符號的整型。
          在Python的mutiprocessing中,有還有一個(gè)Array類,可以和共享內(nèi)存交互,來實(shí)現(xiàn)在進(jìn)程之間共享數(shù)據(jù)。
          | Type code | C Type             | Python Type       | Minimum size in bytes |
          | --------- | ------------------ | ----------------- | --------------------- |
          | `'b'`     | signed char        | int               | 1                     |
          | `'B'`     | unsigned char      | int               | 1                     |
          | `'u'`     | Py_UNICODE         | Unicode character | 2                     |
          | `'h'`     | signed short       | int               | 2                     |
          | `'H'`     | unsigned short     | int               | 2                     |
          | `'i'`     | signed int         | int               | 2                     |
          | `'I'`     | unsigned int       | int               | 2                     |
          | `'l'`     | signed long        | int               | 4                     |
          | `'L'`     | unsigned long      | int               | 4                     |
          | `'q'`     | signed long long   | int               | 8                     |
          | `'Q'`     | unsigned long long | int               | 8                     |
          | `'f'`     | float              | float             | 4                     |
          | `'d'`     | double             | float             | 8                     |

          Python共享內(nèi)存變量的鎖
          import multiprocessing as mp
          import time
          def job(v, num, l):
              l.acquire()  # 鎖住
              for _ in range(5):
                  time.sleep(0.1)
                  v.value += num  # 獲取共享內(nèi)存
                  print(v.value)
              l.release()  # 釋放
          def multicore():
              l = mp.Lock()  # 定義一個(gè)進(jìn)程鎖
              v = mp.Value('i', 0)  # 定義共享內(nèi)存
              p1 = mp.Process(target=job, args=(v, 1, l))  # 需要將lock傳入
              p2 = mp.Process(target=job, args=(v, 3, l))
              p1.start()
              p2.start()
              p1.join()
              p2.join()
          if __name__ == '__main__':
              multicore()
          進(jìn)程鎖保證了進(jìn)程p1的完整運(yùn)行,然后才進(jìn)行了進(jìn)程p2的運(yùn)行

          Python的多進(jìn)程的multiprocessing模塊


          Python多進(jìn)程multiprocessing模塊

          1、multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
          target 是函數(shù)名字,需要調(diào)用的函數(shù)
          args 函數(shù)需要的參數(shù),以 tuple 的形式傳入
          將daemon設(shè)置為True時(shí),則主線程不必等待子進(jìn)程,主線程結(jié)束則所有結(jié)束
          2、相關(guān)方法
          star() 方法啟動(dòng)進(jìn)程,
          join() 方法實(shí)現(xiàn)進(jìn)程間的同步,等待所有進(jìn)程退出。
          close() 用來阻止多余的進(jìn)程涌入進(jìn)程池 Pool 造成進(jìn)程阻塞。
          如果要啟動(dòng)大量的子進(jìn)程,可以用進(jìn)程池的方式批量創(chuàng)建子進(jìn)程
          Pool提供了一種快捷的方法,賦予函數(shù)并行化處理一系列輸入值的能力,可以將輸入數(shù)據(jù)分配給不同進(jìn)程處理(數(shù)據(jù)并行)。
          from multiprocessing import Pool
          def f(x):
              return x*x
          if __name__ == '__main__':
              with Pool(5) as p:
                  print(p.map(f, [1, 2, 3]))
          (1)p.apply(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,kwargs),然后返回結(jié)果。需要強(qiáng)調(diào)的是:此操作并不會(huì)在所有池工作進(jìn)程中并執(zhí)行func函數(shù)。如果要通過不同參數(shù)并發(fā)地執(zhí)行func函數(shù),必須從不同線程調(diào)用p.apply()函數(shù)或者使用p.apply_async()
          (2)p.apply_async(func [, args [, kwargs]]):在一個(gè)池工作進(jìn)程中執(zhí)行func(args,**kwargs),然后返回結(jié)果。此方法的結(jié)果是 AsyncResult類的實(shí)例,callback是可調(diào)用對象,接收輸入?yún)?shù)。當(dāng)func的結(jié)果變?yōu)榭捎脮r(shí),將理解傳遞給callback。callback禁止執(zhí)行任何阻塞操作,否則將接收其他異步操作中的結(jié)果。多進(jìn)程并發(fā)!
          (3)p.close():關(guān)閉進(jìn)程池,防止進(jìn)一步操作。如果所有操作持續(xù)掛起,它們將在工作進(jìn)程終止前完成
          (4)p.jion():等待所有工作進(jìn)程退出。此方法只能在close()或teminate()之后調(diào)用



          import multiprocessing
          def worker(num):
              """該函數(shù)將在子進(jìn)程中執(zhí)行"""
              print('Worker %d' % num)

          if __name__ == '__main__':
              # 創(chuàng)建進(jìn)程池
              pool = multiprocessing.Pool(4)
              # 啟動(dòng)進(jìn)程池中的進(jìn)程
              pool.map(worker, range(10))
              # 關(guān)閉進(jìn)程池
              pool.close()
              # 等待進(jìn)程池中的進(jìn)程結(jié)束
              pool.join()

          Python的多進(jìn)程間通訊


          Python進(jìn)程間的通信
          多進(jìn)程編程中,不同的進(jìn)程之間需要進(jìn)行通信。multiprocessing模塊提供了多種進(jìn)程間通信的方式,例如使用隊(duì)列、管道、共享內(nèi)存等。
            (1)隊(duì)列
            隊(duì)列是一種常用的進(jìn)程間通信方式。multiprocessing模塊中提供了Queue類,可以用來創(chuàng)建隊(duì)列。下面是一個(gè)簡單的示例:
          import multiprocessing

          def producer(q):
              """該函數(shù)將在生產(chǎn)者進(jìn)程中執(zhí)行"""
              for i in range(10):
                  q.put(i)

          def consumer(q):
              """該函數(shù)將在消費(fèi)者進(jìn)程中執(zhí)行"""
              while True:
                  item = q.get()
                  if item is None:
                      break
                  print(item)

          if __name__ == '__main__':
              # 創(chuàng)建隊(duì)列
              q = multiprocessing.Queue()
              # 創(chuàng)建生產(chǎn)者進(jìn)程
              p1 = multiprocessing.Process(target=producer, args=(q,))
              # 創(chuàng)建消費(fèi)者進(jìn)程
              p2 = multiprocessing.Process(target=consumer, args=(q,))
              # 啟動(dòng)進(jìn)程
              p1.start()
              p2.start()
              # 等待進(jìn)程結(jié)束
              p1.join()
              # 發(fā)送結(jié)束信號
              q.put(None)
              p2.join()
            在上面的代碼中,首先創(chuàng)建了一個(gè)Queue對象,然后創(chuàng)建了一個(gè)生產(chǎn)者進(jìn)程和一個(gè)消費(fèi)者進(jìn)程。生產(chǎn)者進(jìn)程通過調(diào)用put方法將0~9的數(shù)字放入隊(duì)列中,消費(fèi)者進(jìn)程通過調(diào)用get方法從隊(duì)列中獲取數(shù)據(jù),并將其打印出來。最后,調(diào)用put方法發(fā)送結(jié)束信號,然后等待兩個(gè)進(jìn)程結(jié)束。
            (2)管道
            管道是另一種常用的進(jìn)程間通信方式。multiprocessing模塊中提供了Pipe類,可以用來創(chuàng)建管道。下面是一個(gè)簡單的示例:
          import multiprocessing

          def producer(conn):
              """該函數(shù)將在生產(chǎn)者進(jìn)程中執(zhí)行"""
              for i in range(10):
                  conn.send(i)
              conn.close()

          def consumer(conn):
              """該函數(shù)將在消費(fèi)者進(jìn)程中執(zhí)行"""
              while True:
                  item = conn.recv()
                  if item is None:
                      break
                  print(item)

          if __name__ == '__main__':
              # 創(chuàng)建管道
              conn1, conn2 = multiprocessing.Pipe()
              # 創(chuàng)建生產(chǎn)者進(jìn)程
              p1 = multiprocessing.Process(target=producer, args=(conn1,))
              # 創(chuàng)建消費(fèi)者進(jìn)程
              p2 = multiprocessing.Process(target=consumer, args=(conn2,))
              # 啟動(dòng)進(jìn)程
              p1.start()
              p2.start()
              # 等待進(jìn)程結(jié)束
              p1.join()
              # 發(fā)送結(jié)束信號
              conn1.send(None)
              p2.join()
            在上面的代碼中,首先創(chuàng)建了一個(gè)管道,然后創(chuàng)建了一個(gè)生產(chǎn)者進(jìn)程和一個(gè)消費(fèi)者進(jìn)程。生產(chǎn)者進(jìn)程通過調(diào)用send方法將0~9的數(shù)字發(fā)送到管道中,消費(fèi)者進(jìn)程通過調(diào)用recv方法從管道中獲取數(shù)據(jù),并將其打印出來。最后,調(diào)用send方法發(fā)送結(jié)束信號,然后等待兩個(gè)進(jìn)程結(jié)束。
            (3)共享內(nèi)存
            共享內(nèi)存是一種高效的進(jìn)程間通信方式,它允許多個(gè)進(jìn)程共享同一塊內(nèi)存區(qū)域。multiprocessing模塊中提供了Value和Array類,可以用來創(chuàng)建共享內(nèi)存。下面是一個(gè)簡單的示例:
          import multiprocessing

          def worker1(n):
              """該函數(shù)將在進(jìn)程1中執(zhí)行"""
              n.value += 1
              print('worker1:', n.value)

          def worker2(n):
              """該函數(shù)將在進(jìn)程2中執(zhí)行"""
              n.value += 1
              print('worker2:', n.value)

          if __name__ == '__main__':
              # 創(chuàng)建共享內(nèi)存
              n = multiprocessing.Value('i', 0)
              # 創(chuàng)建進(jìn)程1
              p1 = multiprocessing.Process(target=worker1, args=(n,))
              # 創(chuàng)建進(jìn)程2
              p2 = multiprocessing.Process(target=worker2, args=(n,))
              # 啟動(dòng)進(jìn)程
              p1.start()
              p2.start()
              # 等待進(jìn)程結(jié)束
              p1.join()
              p2.join()
            在上面的代碼中,首先創(chuàng)建了一個(gè)Value對象,用于存儲(chǔ)一個(gè)整數(shù)值。然后創(chuàng)建了兩個(gè)進(jìn)程,每個(gè)進(jìn)程都會(huì)將共享內(nèi)存中的值加1,并將其打印出來。最后,等待兩個(gè)進(jìn)程結(jié)束。
            除了Value類之外,multiprocessing模塊還提供了Array類,用于創(chuàng)建共享內(nèi)存數(shù)組。

          參考資料


          GIL 不一定有效率
          https://yulizi123.github.io/tutorials/python-basic/threading/5-GIL/
          The "freeze_support()" line can be omitted if the program  is not going to be frozen to produce an executable.    raise RuntimeError
          https://blog.csdn.net/qq_34905587/article/details/112020651
          Python Multiprocessing(多進(jìn)程)
          https://blog.csdn.net/sikh_0529/article/details/126728914
          Python多進(jìn)程multiprocessing模塊介紹
          https://www.jianshu.com/p/3ff7d04a39bf
          Python編程之多進(jìn)程(multiprocessing)詳解
          http://stack.itcast.cn/news/20230404/10534657760.shtml
          Python multiprocessing進(jìn)程池使用詳解
          https://blog.csdn.net/weixin_39253570/article/details/130817783
          multiprocessing --- 基于進(jìn)程的并行
          https://docs.python.org/zh-cn/3/library/multiprocessing.html?highlight=multi
          Python 異步編程 多進(jìn)程
          https://www.cjavapy.com/article/2427/
          python 中的進(jìn)程池 -- multiprocessing.pool.Pool
          http://www.manongjc.com/detail/63-pndlbcnipsucczm.html
          Python 異步 IO(asyncio)、多進(jìn)程(multiprocessing)、多線程(multithreading)性能對比
          https://www.jianshu.com/p/cac56b3d9a18

          瀏覽 165
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费观看黄色a片 | 天天三级片 | 亚洲激情综合色播 | 欧美日韩一级片电影 | 99成人 国产精品视频 |