Python多線程總結(jié)

多線程常用模板
在實際處理數(shù)據(jù)時,因系統(tǒng)內(nèi)存有限,我們不可能一次把所有數(shù)據(jù)都導(dǎo)出進行操作,所以需要批量導(dǎo)出依次操作。為了加快運行,我們會采用多線程的方法進行數(shù)據(jù)處理,以下為我總結(jié)的多線程批量處理數(shù)據(jù)的模板:
import?threading
#?從數(shù)據(jù)庫提取數(shù)據(jù)的類
class?Scheduler():
????def?__init__(self):
????????self._lock?=?threading.RLock()
????????self.start?=?0
????????#?每次取10000條數(shù)據(jù)
????????self.step?=?10000
????def?getdata(self):
????????#?上鎖,以免多線程同時對數(shù)據(jù)庫進行訪問,取出重復(fù)數(shù)據(jù)
????????self._lock.acquire()
????????#?進行取數(shù)據(jù)操作
????????data?=?'select?*?from?table'?\
???????????????'where?id?between?self.start?and?self.start?+?self.step'
????????#?取完數(shù)據(jù)后,指針后移
????????self.start?+=?self.step
????????self._lock.release()
????????return?data
#?處理數(shù)據(jù)的過程寫在這里
def?processdata():
????#?從該實例中提取數(shù)據(jù)
????data?=?scheduler.getdata()
????while?data:
????????#?進行處理數(shù)據(jù)的具體操作:
????????#?去重、補缺、運算...只要還有數(shù)據(jù),本線程就繼續(xù)取新數(shù)據(jù)
????????#?然后再獲取數(shù)據(jù),進行循環(huán)
????????data?=?scheduler.getdata()
#?創(chuàng)建多線程,threads_num為創(chuàng)建的線程數(shù)
def?threads_scheduler(threads_num):
????threads?=?[]
????for?i?in?range(threads_num):
????????#?創(chuàng)建線程
????????td?=?threading.Thread(target=processdata,?name='th'+str(i+1))
????????threads.append(td)
????for?t?in?threads:
????????#?啟動線程
????????t.start()
????????for?t?in?threads:
????????????#?子線程守護
????????????t.join()
????????????print('數(shù)據(jù)已全部處理成功')
if?__name__=='__main__':
????#?實例化一個調(diào)度器,初始化參數(shù)
????scheduler?=?Scheduler()
????#?創(chuàng)建線程,開始處理數(shù)據(jù)
????threads_scheduler(4)
主要分為三大部分:
Scheduler類,負責(zé)初始化參數(shù),getdata方法負責(zé)提取數(shù)據(jù)processdata方法中寫具體處理數(shù)據(jù)的流程threads_scheduler方法負責(zé)創(chuàng)建線程
多線程重點回顧
共分4部分對多線程的內(nèi)容進行總結(jié)。
多線程threading
先為大家介紹線程的相關(guān)概念:
主線程:當(dāng)一個程序啟動時,就有一個進程被操作系統(tǒng) OS創(chuàng)建,與此同時一個線程也立刻運行,該線程通常叫做程序的主線程Main Thread。因為它是程序開始時就執(zhí)行的,如果你需要再創(chuàng)建線程,那么創(chuàng)建的線程就是這個主線程的子線程。子線程:使用 threading、ThreadPoolExecutor創(chuàng)建的線性均為子線程。主線程的重要性體現(xiàn)在兩方面: 是產(chǎn)生其他子線程的線程 通常它必須最后完成執(zhí)行,比如執(zhí)行各種關(guān)閉動作
在飛車程序中,如果沒有多線程,我們就不能一邊聽歌一邊玩飛車,聽歌與玩游戲不能并行;在使用多線程后,我們就可以在玩游戲的同時聽背景音樂。在這個例子中啟動飛車程序就是一個進程,玩游戲和聽音樂是兩個線程。
Python提供了threading模塊來實現(xiàn)多線程:
threading.Thread可以創(chuàng)建線程setDaemon(True)為守護主線程,默認為False;join()為守護子線程。
from?time?import?sleep
import?threading
def?music(music_name):
????for?i?in?range(2):
????????print('正在聽{}'.format(music_name))
????????sleep(1)
????????print('music?over')
def?game(game_name):
????for?i?in?range(2):
????????print('正在玩{}'.format(game_name))
????????sleep(3)
????????print('game?over')
threads?=?[]
t1?=?threading.Thread(target=music,args=('稻香',))
threads.append(t1)
t2?=?threading.Thread(target=game,args=('飛車',))
threads.append(t2)
if?__name__?==?'__main__':
????for?t?in?threads:
????????#?t.setDaemon(True)
????????t.start()
????????
????for?t?in?threads:
????????t.join()
????print('主線程運行結(jié)束')線程池
因為新建線程系統(tǒng)需要分配資源、終止線程系統(tǒng)需要回收資源,所以如果可以重用線程,則可以減去新建/終止的開銷以提升性能。同時,使用線程池的語法比自己新建線程執(zhí)行線程更加簡潔。
Python為我們提供了ThreadPoolExecutor來實現(xiàn)線程池,此線程池默認子線程守護。它的適應(yīng)場景為突發(fā)性大量請求或需要大量線程完成任務(wù),但實際任務(wù)處理時間較短。
from?time?import?sleep
#?fun為定義的待運行函數(shù)
with?ThreadPoolExecutor(max_workers=5)?as?executor:
????ans?=?executor.map(fun,?[遍歷值])
????for?res?in?ans:
????????print(res)
with?ThreadPoolExecutor(max_workers=5)?as?executor:
????list?=?[遍歷值]
????ans?=?[executor.submit(fun,?i)?for?i?in?list]
????for?res?in?as_completed(ans):
????????print(res.result())
其中max_workers為線程池中的線程個數(shù),常用的遍歷方法有map和submit+as_completed。根據(jù)業(yè)務(wù)場景的不同,若我們需要輸出結(jié)果按遍歷順序返回,我們就用map方法,若想誰先完成就返回誰,我們就用submit+as_complete方法。
線程互斥
我們把一個時間段內(nèi)只允許一個線程使用的資源稱為臨界資源,對臨界資源的訪問,必須互斥的進行。互斥,也稱間接制約關(guān)系。線程互斥指當(dāng)一個線程訪問某臨界資源時,另一個想要訪問該臨界資源的線程必須等待。當(dāng)前訪問臨界資源的線程訪問結(jié)束,釋放該資源之后,另一個線程才能去訪問臨界資源。鎖的功能就是實現(xiàn)線程互斥。
我把線程互斥比作廁所包間上大號的過程,因為包間里只有一個坑,所以只允許一個人進行大號。當(dāng)?shù)谝粋€人要上廁所時,會將門上上鎖,這時如果第二個人也想大號,那就必須等第一個人上完,將鎖解開后才能進行,在這期間第二個人就只能在門外等著。這個過程與代碼中使用鎖的原理如出一轍,這里的坑就是臨界資源。Python的threading模塊引入了鎖。threading模塊提供了Lock類,它有如下方法加鎖和釋放鎖:
acquire():對?Lock加鎖,其中timeout參數(shù)指定加鎖多少秒release():釋放鎖
class?Account:
????def?__init__(self,?card_id,?balance):
????????#?封裝賬戶ID、賬戶余額的兩個變量
????????self.card_id=?card_id
????????self.balance?=?balance
????????
def?withdraw(account,?money):
????#?進行加鎖
????lock.acquire()
????#?賬戶余額大于取錢數(shù)目
????if?account.balance?>=?money:
????????#?吐出鈔票
????????print(threading.current_thread().name?+?"取錢成功!吐出鈔票:"?+?str(money),end='?')
????????#?修改余額
????????account.balance?-=?money
????????print("\t余額為:?"?+?str(account.balance))
????else:
????????print(threading.current_thread().name?+?"取錢失敗!余額不足")
????#?進行解鎖
????lock.release()
#?創(chuàng)建一個賬戶,銀行卡id為8888,存款1000元
acct?=?Account("8888"?,?1000)
#?模擬兩個對同一個賬戶取錢
#?在主線程中創(chuàng)建一把鎖
lock?=?threading.Lock()
threading.Thread(name='窗口A',?target=withdraw?,?args=(acct?,?800)).start()
threading.Thread(name='窗口B',?target=withdraw?,?args=(acct?,?800)).start()Lock與Rlock的區(qū)別
區(qū)別一: Lock被稱為原始鎖,一個線程只能請求一次;RLock被稱為重入鎖,可以被一個線程請求多次,即鎖中可以嵌套鎖。
import?threading
def?main():
????lock.acquire()
????print('第一道鎖')
????lock.acquire()
????print('第二道鎖')
????lock.release()
????lock.release()
????
if?__name__?==?'__main__':
????lock?=?threading.Lock()
????main()
我們會發(fā)現(xiàn)這個程序只會打印“第一道鎖”,而且程序既沒有終止,也沒有繼續(xù)運行。這是因為Lock鎖在同一線程內(nèi)第一次加鎖之后還沒有釋放時,就進行了第二次acquire請求,導(dǎo)致無法執(zhí)行release,所以鎖永遠無法釋放,這就是死鎖。如果我們使用RLock就能正常運行,不會發(fā)生死鎖的狀態(tài)。
區(qū)別二:當(dāng) Lock處于鎖定狀態(tài)時,不屬于特定線程,可在另一個線程中進行解鎖釋放;而RLock只有當(dāng)前線程才能釋放本線程上的鎖,不可由其他線程進行釋放,所以在使用RLock時,acquire與release必須成對出現(xiàn),即解鈴還須系鈴人。
import?threading
def?main():
????lock.release()
????print("在子線程解鎖后打印")
if?__name__?==?'__main__':
????lock?=?threading.Lock()
????lock.acquire()
????t?=?threading.Thread(target=main)
????t.start()
在主線程中定義Lock鎖,然后上鎖,再創(chuàng)建一個子線程t運行main函數(shù)釋放鎖,結(jié)果正常輸出,說明主線程上的鎖,可由子線程解鎖。
如果把上面的鎖改為RLock則報錯。在實際中設(shè)計程序時,我們會將每個功能分別封裝成一個函數(shù),每個函數(shù)中都可能會有臨界區(qū)域,所以就需要用到RLock。
import?threading
import?time
def?fun_1():
????print('開始')
????time.sleep(1)
????lock.acquire()
????print("第一道鎖")
????fun_2()
????lock.release()
????
def?fun_2():
????lock.acquire()
????print("第二道鎖")
????lock.release()
????
if?__name__?==?'__main__':
????lock?=?threading.RLock()
????t1?=?threading.Thread(target=fun_1)
????t2?=?threading.Thread(target=fun_1)
????t1.start()
????t2.start()
一句話總結(jié)就是Lock不能套娃,RLock可以套娃;Lock可以由其他線程中的鎖進行操作,RLock只能由本線程進行操作。
3、不限速了?度盤等8大網(wǎng)盤承諾年底前推出無差別速率產(chǎn)品
4、為什么阿里巴巴/騰訊不去開發(fā)被卡脖子的工業(yè)軟件?

