核桃干貨 | 快速掌握用python寫并行程序,干貨滿滿


一、大數(shù)據(jù)時代的現(xiàn)狀
當前我們正處于大數(shù)據(jù)時代,每天我們會通過手機、電腦等設(shè)備不斷的將自己的數(shù)據(jù)傳到互聯(lián)網(wǎng)上。據(jù)統(tǒng)計,YouTube上每分鐘就會增加500多小時的視頻,面對如此海量的數(shù)據(jù),如何高效的存儲與處理它們就成了當前最大的挑戰(zhàn)。
但在這個對硬件要求越來越高的時代,CPU卻似乎并不這么給力了。自2013年以來,處理器頻率的增長速度逐漸放緩了,目前CPU的頻率主要分布在3~4GHz。
這個也是可以理解的,畢竟摩爾定律都生效了50年了,如果它老人家還如此給力,那我們以后就只要靜等處理器頻率提升,什么計算問題在未來那都不是話下了。
實際上CPU與頻率是于能耗密切相關(guān)的,我們之前可以通過加電壓來提升頻率,但當能耗太大,散熱問題就無法解決了,所以頻率就逐漸穩(wěn)定下來了,而Intel與AMD等大制造商也將目標轉(zhuǎn)向了多核芯片,目前普通桌面PC也達到了4~8核。
二、面對挑戰(zhàn)的方法
咱們有了多核CPU,以及大量計算設(shè)備,那我們怎么來用它們應(yīng)對大數(shù)據(jù)時代的挑戰(zhàn)了。那就要提到下面的方法了。
2.1 并行計算
并行(parallelism)是指程序運行時的狀態(tài),如果在同時刻有多個“工作單位”運行,則所運行的程序處于并行狀態(tài)。圖一是并行程序的示例,開始并行后,程序從主線程分出許多小的線程并同步執(zhí)行,此時每個線程在各個獨立的CPU進行運行,在所有線程都運行完成之后,它們會重新合并為主線程,而運行結(jié)果也會進行合并,并交給主線程繼續(xù)處理。

圖一、多線程并行
圖二是一個多線程的任務(wù)(沿線為線程時間),但它不是并行任務(wù)。這是因為task1與task2總是不在同一時刻執(zhí)行,這個情況下單核CPU完全可以同時執(zhí)行task1與task2。
方法是在task1不執(zhí)行的時候立即將CPU資源給task2用,task2空閑的時候CPU給task1用,這樣通過時間窗調(diào)整任務(wù),即可實現(xiàn)多線程程序,但task1與task2并沒有同時執(zhí)行過,所以不能稱為并行。我們可以稱它為并發(fā)(concurrency)程序,這個程序一定意義上提升了單個CPU的使用率,所以效率也相對較高。

數(shù)據(jù)并行(Data Parallel)模型:將相同的操作同時作用于不同數(shù)據(jù),只需要簡單地指明執(zhí)行什么并行操作以及并行操作對象。該模型反映在圖一中即是,并行同時在主線程中拿取數(shù)據(jù)進行處理,并線程執(zhí)行相同的操作,然后計算完成后合并結(jié)果。各個并行線程在執(zhí)行時互不干擾。 消息傳遞(Message Passing)模型:各個并行執(zhí)行部分之間傳遞消息,相互通訊。消息傳遞模型的并行線程在執(zhí)行時會傳遞數(shù)據(jù),可能一個線程運行到一半的時候,它所占用的數(shù)據(jù)或處理結(jié)果就要交給另一個線程處理,這樣,在設(shè)計并行程序時會給我們帶來一定麻煩。該模型一般是分布式內(nèi)存并行計算機所采用方法,但是也可以適用于共享式內(nèi)存的并行計算機。
多核CPU——計算密集型任務(wù)。盡量使用并行計算,可以提高任務(wù)執(zhí)行效率。計算密集型任務(wù)會持續(xù)地將CPU占滿,此時有越多CPU來分擔任務(wù),計算速度就會越快,這種情況才是并行程序的用武之地。 單核CPU——計算密集型任務(wù)。此時的任務(wù)已經(jīng)把CPU資源100%消耗了,就沒必要使用并行計算,畢竟硬件障礙擺在那里。 單核CPU——I/O密集型任務(wù)。I/O密集型任務(wù)在任務(wù)執(zhí)行時需要經(jīng)常調(diào)用磁盤、屏幕、鍵盤等外設(shè),由于調(diào)用外設(shè)時CPU會空閑,所以CPU的利用率并不高,此時使用多線程程序,只是便于人機交互。計算效率提升不大。 多核CPU——I/O密集型任務(wù)。同單核CPU——I/O密集型任務(wù)。
強大的浮點數(shù)計算速度。 大量的計算核心,可以進行大型并行計算。一個普通的GPU也有數(shù)千個計算核心。 強大的數(shù)據(jù)吞吐量,GPU的吞吐量是CPU的數(shù)十倍,這意味著GPU有適合的處理大數(shù)據(jù)。
GFS(The Google File System) :解決數(shù)據(jù)存儲的問題。采用N多臺廉價的電腦,使用冗余的方式,來取得讀寫速度與數(shù)據(jù)安全并存的結(jié)果。 MapReduce(Simplified Data Processing on Large Clusters) :函數(shù)式編程,把所有的操作都分成兩類,map與reduce,map用來將數(shù)據(jù)分成多份,分開處理,reduce將處理后的結(jié)果進行歸并,得到最終的結(jié)果。 BigTable(Bigtable: A Distributed Storage System for Structured Data) :在分布式系統(tǒng)上存儲結(jié)構(gòu)化數(shù)據(jù)的一個解決方案,解決了巨大的Table的管理、負載均衡的問題.
在面向線程設(shè)計的系統(tǒng)(如當代多數(shù)操作系統(tǒng)、Linux 2.6及更新的版本)中,進程本身不是基本運行單位,而是線程的容器。 進程擁有自己獨立的內(nèi)存空間,所屬線程可以訪問進程的空間。 程序本身只是指令、數(shù)據(jù)及其組織形式的描述,進程才是程序的真正運行實例。例如,Visual Studio開發(fā)環(huán)境就是利用一個進程編輯源文件,并利用另一個進程完成編譯工作的應(yīng)用程序。
線程有自己的一組CPU指令、寄存器與私有數(shù)據(jù)區(qū),線程的數(shù)據(jù)可以與同一進程的線程共享。 當前的操作系統(tǒng)是面向線程的,即以線程為基本運行單位,并按線程分配CPU。

multiprocessing.Process(target=None, args=())
target: 可以被run()調(diào)用的函數(shù),簡單來說就是進程中運行的函數(shù)
args: 是target的參數(shù)
process的方法:
start(): 開始啟動進程,在創(chuàng)建process之后執(zhí)行
join([timeout]):阻塞目前父進程,直到調(diào)用join方法的進程執(zhí)行完或超時(timeout),才繼續(xù)執(zhí)行父進程
terminate():終止進程,不論進程有沒有執(zhí)行完,盡量少用。
示例1
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',)) # p進程執(zhí)行f函數(shù),參數(shù)為'bob',注意后面的“,”
p.start() # 進程開始
p.join() # 阻塞主線程,直至p進程執(zhí)行結(jié)束
class multiprocessing.Pool([processes])
processes是進程池中的進程數(shù),默認是本機的cpu數(shù)量
方法:
apply(func[, args[, kwds]])進程池中的進程進行func函數(shù)操作,操作時會阻塞進程,直至生成結(jié)果。
apply_async(func[, args[, kwds[, callback]]])與apply類似,但是不會阻塞進程
map(func, iterable[, chunksize])進程池中的進程進行映射操作
map_async(func, iterable[, chunksize[, callback]])
imap(func, iterable[, chunksize]):返回有序迭代器
imap_unordered(func, iterable[, chunsize]):返回無序迭代器
close():禁止進程池再接收任務(wù)
terminate():強行終止進程池,不論是否有任務(wù)在執(zhí)行
join():在close()或terminate()之后進行,等待進程退出
示例2
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5) # 創(chuàng)建有5個進程的進程池
print(p.map(f, [1, 2, 3])) # 將f函數(shù)的操作給進程池
3.3.3 Pipes & Queues
multiprocessing.Pipe([duplex])
返回兩個連接對象(conn1, conn2),兩個連接對象分別訪問pipe的頭和尾,進行讀寫操作
Duplex: True(default),創(chuàng)建的pipe是雙向的,也即兩端都可以進行讀寫;若為False,則pipe是單向的,僅可以在一端讀,另一端寫,此時與Queue類似。
multiprocessing.Queue([maxsize])
qsize():返回queue中member數(shù)量
empty():如果queue是空的,則返回true
full():如果queue中member數(shù)量達到maxsize,則返回true
put(obj):將一個object放入到queue中
get():從隊列中取出一個object并將它從queue中移除,F(xiàn)IFO原則
close():關(guān)閉隊列,并將緩存的object寫入pipe
示例
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises multiprocessing.TimeoutError
3.3.4 進程鎖multiprocessing.Lock
當一個進程獲得(acquire)鎖之后,其它進程在想獲得鎖就會被禁止,可以保護數(shù)據(jù),進行同步處理。
acquire(block=True, timeout=None):嘗試獲取一個鎖,如果block為true,則會在獲得鎖之后阻止其它進程再獲取鎖。
release():釋放鎖
multiprocessing.Value(typecode_or_type, *args[, lock])
返回一個ctype對象,
創(chuàng)建c = Value(‘d’, 3.14),調(diào)用c.value()
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
返回一個ctype數(shù)組,只能是一維的
Array(‘i’, [1, 2, 3, 4])
3.3.6 其它方法
multiprocessing.active_children():返回當前進程的所有子進程
multiprocessing.cpu_count():返回本計算機的cpu數(shù)量
multiprocessing.current_process():返回當前進程
盡量避免共享數(shù)據(jù) 所有對象都盡量是可以pickle的 避免使用terminate強行終止進程,以造成不可預(yù)料的后果 有隊列的進程在終止前隊列中的數(shù)據(jù)需要清空,join操作應(yīng)放到queue清空后 明確給子進程傳遞資源、參數(shù)
注意跨模塊全局變量的使用,可能被各個進程修改造成結(jié)果不統(tǒng)一 主模塊需要加上if name == ' main ':來提高它的安全性,如果有交互界面,需要加上freeze_support()
import multiprocessing as mp
import time
def job(v, num, l):
l.acquire() # 鎖住
for _ in range(5):
time.sleep(0.1)
v.value += num # 獲取共享內(nèi)存
print(v.value)
l.release() # 釋放
def multicore():
l = mp.Lock() # 定義一個進程鎖
#l = 1
v = mp.Value('i', 0) # 定義共享內(nèi)存
p1 = mp.Process(target=job, args=(v,1,l)) # 需要將lock傳入
p2 = mp.Process(target=job, args=(v,3,l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__=='__main__':
multicore()
直接運行上述代碼輸出[1, 2, 3, 4, 5, 8, 11, 14, 17, 20],運行時間為1.037s 在1的基礎(chǔ)上注釋掉鎖(上述注釋了三行),在沒有鎖的情況下,輸出[1, 4, 5, 8, 9, 12, 13, 15, 14, 16],運行時間為0.53s 在2的基礎(chǔ)上將p1.join()調(diào)到p2.start()前面,輸出為[1, 2, 3, 4, 5, 8, 11, 14, 17, 20],運行時間為1.042s.
import multiprocessing as mp
#import pdb
def job(i):
return i*i
def multicore():
pool = mp.Pool()
#pdb.set_trace()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# 用get獲得結(jié)果
print(res.get())
# 迭代器,i=0時apply一次,i=1時apply一次等等
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
# 從迭代器中取出
print([res.get() for res in multi_res])
multicore()
