<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          關(guān)于Go并發(fā)編程,你不得不知的“左膀右臂”——并發(fā)與通道!

          共 16973字,需瀏覽 34分鐘

           ·

          2022-08-04 09:49

          導語 | 并發(fā)編程,可以說一直都是開發(fā)者們關(guān)注最多的主題之一。而Golang作為一個出道就自帶“高并發(fā)”光環(huán)的編程語言,其并發(fā)編程的實現(xiàn)原理肯定是值得我們深入探究的。本文主要介紹Goroutine和channel的實現(xiàn)。


          Go并發(fā)編程模型在底層是由操作系統(tǒng)所提供的線程庫支撐的,這里先簡要介紹一下線程實現(xiàn)模型的相關(guān)概念。


          一、線程的實現(xiàn)模型


          線程的實現(xiàn)模型主要有3個,分別是:用戶級線程模型內(nèi)核級線程模型和兩級線程模型。它們之間最大的差異在于用戶線程與內(nèi)核調(diào)度實體(KSE)之間的對應(yīng)關(guān)系上。內(nèi)核調(diào)度實體就是可以被操作系統(tǒng)內(nèi)核調(diào)度器調(diào)度的對象,也稱為內(nèi)核級線程,是操作系統(tǒng)內(nèi)核的最小調(diào)度單元。


          (一)用戶級線程模型



          用戶線程與KSE為多對一(N:1)的映射關(guān)系。此模型下的線程由用戶級別的線程庫全權(quán)管理,線程庫存儲在進程的用戶空間之中,這些線程的存在對于內(nèi)核來說是無法感知的,所以這些線程也不是內(nèi)核調(diào)度器調(diào)度的對象。一個進程中所有創(chuàng)建的線程都只和同一個KSE在運行時動態(tài)綁定,內(nèi)核的所有調(diào)度都是基于用戶進程的。對于線程的調(diào)度則是在用戶層面完成的,相較于內(nèi)核調(diào)度不需要讓CPU在用戶態(tài)和內(nèi)核態(tài)之間切換,這種實現(xiàn)方式相比內(nèi)核級線程模型可以做的很輕量級,對系統(tǒng)資源的消耗會小很多,上下文切換所花費的代價也會小得多。許多語言實現(xiàn)的協(xié)程庫基本上都屬于這種方式。但是,此模型下的多線程并不能真正的并發(fā)運行。例如,如果某個線程在I/O操作過程中被阻塞,那么其所屬進程內(nèi)的所有線程都被阻塞,整個進程將被掛起。



          (二)內(nèi)核級線程模型



          用戶線程與KSE為一對一(1:1)的映射關(guān)系。此模型下的線程由內(nèi)核負責管理,應(yīng)用程序?qū)€程的創(chuàng)建、終止和同步都必須通過內(nèi)核提供的系統(tǒng)調(diào)用來完成,內(nèi)核可以分別對每一個線程進行調(diào)度。所以,一對一線程模型可以真正的實現(xiàn)線程的并發(fā)運行,大部分語言實現(xiàn)的線程庫基本上都屬于這種方式。但是,此模型下線程的創(chuàng)建、切換和同步都需要花費更多的內(nèi)核資源和時間,如果一個進程包含了大量的線程,那么它會給內(nèi)核的調(diào)度器造成非常大的負擔,甚至會影響到操作系統(tǒng)的整體性能。



          (三)兩級線程模型



          用戶線程與KSE為多對多(N:M)的映射關(guān)系。兩級線程模型吸收前兩種線程模型的優(yōu)點并且盡量規(guī)避了它們的缺點,區(qū)別于用戶級線程模型,兩級線程模型中的進程可以與多個內(nèi)核線程KSE關(guān)聯(lián),也就是說一個進程內(nèi)的多個線程可以分別綁定一個自己的KSE,這點和內(nèi)核級線程模型相似;其次,又區(qū)別于內(nèi)核級線程模型,它的進程里的線程并不與KSE唯一綁定,而是可以多個用戶線程映射到同一個KSE,當某個KSE因為其綁定的線程的阻塞操作被內(nèi)核調(diào)度出CPU時,其關(guān)聯(lián)的進程中其余用戶線程可以重新與其他KSE綁定運行。所以,兩級線程模型既不是用戶級線程模型那種完全靠自己調(diào)度的也不是內(nèi)核級線程模型完全靠操作系統(tǒng)調(diào)度的,而是一種自身調(diào)度與系統(tǒng)調(diào)度協(xié)同工作的中間態(tài),即用戶調(diào)度器實現(xiàn)用戶線程到KSE的調(diào)度,內(nèi)核調(diào)度器實現(xiàn)KSE到CPU上的調(diào)度



          二、Go的并發(fā)機制


          在Go的并發(fā)編程模型中,不受操作系統(tǒng)內(nèi)核管理的獨立控制流不叫用戶線程或線程,而稱為Goroutine。Goroutine通常被認為是協(xié)程的Go實現(xiàn),實際上Goroutine并不是傳統(tǒng)意義上的協(xié)程,傳統(tǒng)的協(xié)程庫屬于用戶級線程模型,而Goroutine結(jié)合Go調(diào)度器的底層實現(xiàn)上屬于兩級線程模型。


          Go搭建了一個特有的兩級線程模型。由Go調(diào)度器實現(xiàn)Goroutine到KSE的調(diào)度,由內(nèi)核調(diào)度器實現(xiàn)KSE到CPU上的調(diào)度。Go的調(diào)度器使用G、M、P三個結(jié)構(gòu)體來實現(xiàn)Goroutine的調(diào)度,也稱之為GMP模型


          (一)GMP模型


          G:表示Goroutine。每個Goroutine對應(yīng)一個G結(jié)構(gòu)體,G存儲Goroutine的運行堆棧、狀態(tài)以及任務(wù)函數(shù),可重用。當Goroutine被調(diào)離CPU時,調(diào)度器代碼負責把CPU寄存器的值保存在G對象的成員變量之中,當Goroutine被調(diào)度起來運行時,調(diào)度器代碼又負責把G對象的成員變量所保存的寄存器的值恢復(fù)到CPU的寄存器。


          M:OS底層線程的抽象,它本身就與一個內(nèi)核線程進行綁定,每個工作線程都有唯一的一個M結(jié)構(gòu)體的實例對象與之對應(yīng),它代表著真正執(zhí)行計算的資源,由操作系統(tǒng)的調(diào)度器調(diào)度和管理。M結(jié)構(gòu)體對象除了記錄著工作線程的諸如棧的起止位置、當前正在執(zhí)行的Goroutine以及是否空閑等等狀態(tài)信息之外,還通過指針維持著與P結(jié)構(gòu)體的實例對象之間的綁定關(guān)系。


          P:表示邏輯處理器。對G來說,P相當于CPU核,G只有綁定到P(在P的local runq中)才能被調(diào)度。對M來說,P提供了相關(guān)的執(zhí)行環(huán)境(Context),如內(nèi)存分配狀態(tài)(mcache),任務(wù)隊列(G)等。它維護一個局部Goroutine可運行G隊列,工作線程優(yōu)先使用自己的局部運行隊列,只有必要時才會去訪問全局運行隊列,這可以大大減少鎖沖突,提高工作線程的并發(fā)性,并且可以良好的運用程序的局部性原理。


          一個G的執(zhí)行需要P和M的支持。一個M在與一個P關(guān)聯(lián)之后,就形成了一個有效的G運行環(huán)境(內(nèi)核線程+上下文)。每個P都包含一個可運行的G的隊列(runq)。該隊列中的G會被依次傳遞給與本地P關(guān)聯(lián)的M,并獲得運行時機。


          M與KSE之間總是一一對應(yīng)的關(guān)系,一個M僅能代表一個內(nèi)核線程。M與KSE之間的關(guān)聯(lián)非常穩(wěn)固,一個M在其生命周期內(nèi),會且僅會與一個KSE產(chǎn)生關(guān)聯(lián),而M與P、P與G之間的關(guān)聯(lián)都是可變的,M與P也是一對一的關(guān)系,P與G則是一對多的關(guān)系。


          • G


          運行時,G在調(diào)度器中的地位與線程在操作系統(tǒng)中差不多,但是它占用了更小的內(nèi)存空間,也降低了上下文切換的開銷。它是Go語言在用戶態(tài)提供的線程,作為一種粒度更細的資源調(diào)度單元,使用得當,能夠在高并發(fā)的場景下更高效地利用機器的CPU。



          g結(jié)構(gòu)體部分源碼(src/runtime/runtime2.go):


          type g struct {    stack      stack    // Goroutine的棧內(nèi)存范圍[stack.lo, stack.hi)    stackguard0    uintptr  // 用于調(diào)度器搶占式調(diào)度    m        *m    // Goroutine占用的線程    sched      gobuf    // Goroutine的調(diào)度相關(guān)數(shù)據(jù)    atomicstatus  uint32  // Goroutine的狀態(tài)    ...}
          type gobuf struct { sp uintptr // 棧指針 pc uintptr // 程序計數(shù)器 g guintptr // gobuf對應(yīng)的Goroutine ret sys.Uintewg // 系統(tǒng)調(diào)用的返回值 ...}


          gobuf中保存的內(nèi)容會在調(diào)度器保存或恢復(fù)上下文時使用,其中棧指針和程序計數(shù)器會用來存儲或恢復(fù)寄存器中的值,改變程序即將執(zhí)行的代碼。


          atomicstatus字段存儲了當前Goroutine的狀態(tài),Goroutine主要可能處于以下幾種狀態(tài):



          Goroutine的狀態(tài)遷移是一個十分復(fù)雜的過程,觸發(fā)狀態(tài)遷移的方法也很多。這里主要介紹一下比較常見的五種狀態(tài)_Grunnable、_Grunning、_Gsyscall、_Gwaiting和_Gpreempted


          可以將這些不同的狀態(tài)聚合成三種:等待中、可運行、運行中,運行期間會在這三種狀態(tài)來回切換:


          • 等待中:Goroutine正在等待某些條件滿足,例如:系統(tǒng)調(diào)用結(jié)束等,包括_Gwaiting、_Gsyscall和_Gpreempted幾個狀態(tài);


          • 可運行:Goroutine已經(jīng)準備就緒,可以在線程運行,如果當前程序中有非常多的Goroutine,每個Goroutine就可能會等待更多的時間,即_Grunnable;


          • 運行中:Goroutine正在某個線程上運行,即_Grunning。


          G常見的狀態(tài)轉(zhuǎn)換圖:



          進入死亡狀態(tài)的G可以重新初始化并使用。


          • M


          Go語言并發(fā)模型中的M是操作系統(tǒng)線程。調(diào)度器最多可以創(chuàng)建10000個線程,但是最多只會有GOMAXPROCS(P的數(shù)量)個活躍線程能夠正常運行。在默認情況下,運行時會將 GOMAXPROCS設(shè)置成當前機器的核數(shù),我們也可以在程序中使用runtime.GOMAXPROCS來改變最大的活躍線程數(shù)。


          例如,對于一個四核的機器,runtime會創(chuàng)建四個活躍的操作系統(tǒng)線程,每一個線程都對應(yīng)一個運行時中的runtime.m結(jié)構(gòu)體。在大多數(shù)情況下,我們都會使用Go的默認設(shè)置,也就是線程數(shù)等于CPU數(shù),默認的設(shè)置不會頻繁觸發(fā)操作系統(tǒng)的線程調(diào)度和上下文切換,所有的調(diào)度都會發(fā)生在用戶態(tài),由Go語言調(diào)度器觸發(fā),能夠減少很多額外開銷。


          m結(jié)構(gòu)體源碼(部分):


          type m struct {    g0      *g      // 一個特殊的goroutine,執(zhí)行一些運行時任務(wù)    gsignal    *g      // 處理signal的G    curg    *g      // 當前M正在運行的G的指針    p      puintptr  // 正在與當前M關(guān)聯(lián)的P    nextp    puintptr  // 與當前M潛在關(guān)聯(lián)的P    oldp    puintptr  // 執(zhí)行系統(tǒng)調(diào)用之前使用線程的P    spinning  bool    // 當前M是否正在尋找可運行的G    lockedg    *g      // 與當前M鎖定的G}


          g0表示一個特殊的Goroutine,由Go運行時系統(tǒng)在啟動之處創(chuàng)建,它會深度參與運行時的調(diào)度過程,包括Goroutine的創(chuàng)建、大內(nèi)存分配和CGO函數(shù)的執(zhí)行。curg是在當前線程上運行的用戶Goroutine。


          • P


          調(diào)度器中的處理器P是線程和Goroutine的中間層,它能提供線程需要的上下文環(huán)境,也會負責調(diào)度線程上的等待隊列,通過處理器P的調(diào)度,每一個內(nèi)核線程都能夠執(zhí)行多個Goroutine,它能在Goroutine進行一些I/O操作時及時讓出計算資源,提高線程的利用率。


          P的數(shù)量等于GOMAXPROCS,設(shè)置GOMAXPROCS的值只能限制P的最大數(shù)量,對M和G的數(shù)量沒有任何約束。當M上運行的G進入系統(tǒng)調(diào)用導致M被阻塞時,運行時系統(tǒng)會把該M和與之關(guān)聯(lián)的P分離開來,這時,如果該P的可運行G隊列上還有未被運行的G,那么運行時系統(tǒng)就會找一個空閑的M,或者新建一個M與該P關(guān)聯(lián),滿足這些G的運行需要。因此,M的數(shù)量很多時候都會比P多。


          p結(jié)構(gòu)體源碼(部分):


          type p struct {  // p 的狀態(tài)  status   uint32    // 對應(yīng)關(guān)聯(lián)的 M  m        muintptr      // 可運行的Goroutine隊列,可無鎖訪問  runqhead uint32  runqtail uint32  runq     [256]guintptr  // 緩存可立即執(zhí)行的G  runnext    guintptr   // 可用的G列表,G狀態(tài)等于Gdead   gFree struct {    gList    n int32  }  ...}


          P可能處于的狀態(tài)如下:




          三、調(diào)度器


          兩級線程模型中的一部分調(diào)度任務(wù)會由操作系統(tǒng)之外的程序承擔。在Go語言中,調(diào)度器就負責這一部分調(diào)度任務(wù)。調(diào)度的主要對象就是G、M和P的實例。每個M(即每個內(nèi)核線程)在運行過程中都會執(zhí)行一些調(diào)度任務(wù),他們共同實現(xiàn)了Go調(diào)度器的調(diào)度功能。


          (一)g0和m0


          運行時系統(tǒng)中的每個M都會擁有一個特殊的G,一般稱為M的g0。M的g0不是由Go程序中的代碼間接生成的,而是由Go運行時系統(tǒng)在初始化M時創(chuàng)建并分配給該M的。M的g0一般用于執(zhí)行調(diào)度、垃圾回收、棧管理等方面的任務(wù)。M還會擁有一個專用于處理信號的G,稱為gsignal。


          除了g0和gsignal之外,其他由M運行的G都可以視為用戶級別的G,簡稱用戶G,g0和gsignal可稱為系統(tǒng)G。Go運行時系統(tǒng)會進行切換,以使每個M都可以交替運行用戶G和它的g0。這就是前面所說的“每個M都會運行調(diào)度程序”的原因。


          除了每個M都擁有屬于它自己的g0外,還存在一個runtime.g0。runtime.g0用于執(zhí)行引導程序,它運行在Go程序擁有的第一個內(nèi)核線程之中,這個線程也稱為runtime.m0,runtime.m0的g0就是runtime.g0。



          (二)核心元素的容器


          上面講了Go的線程實現(xiàn)模型中的3個核心元素——G、M和P,下面看看承載這些元素實例的容器:



          和G相關(guān)的四個容器值得我們特別注意,任何G都會存在于全局G列表中,其余四個容器只會存放當前作用域內(nèi)的、具有某個狀態(tài)的G。兩個可運行的G列表中的G都擁有幾乎平等的運行機會,只不過不同時機的調(diào)度會把G放在不同的地方,例如,從Gsyscall狀態(tài)轉(zhuǎn)移出來的G都會被放入調(diào)度器的可運行G隊列,而剛剛被初始化的G都會被放入本地P的可運行G隊列。此外,這兩個可運行G隊列之間也會互相轉(zhuǎn)移G,例如,本地P的可運行G隊列已滿時,其中一半的G會被轉(zhuǎn)移到調(diào)度器的可運行G隊列中。


          調(diào)度器的空閑M列表和空閑P列表用于存放暫時不被使用的元素實例。運行時系統(tǒng)需要時,會從中獲取相應(yīng)元素的實例并重新啟用它。



          (三)調(diào)度循環(huán)


          調(diào)用runtime.schedule進入調(diào)度循環(huán):


          func schedule() {  _g_ := getg()
          top: var gp *g var inheritTime bool
          if gp == nil { // 為了公平,每調(diào)用schedule函數(shù)61次就要從全局可運行G隊列中獲取 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } // 從P本地獲取G任務(wù) if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) } // 運行到這里表示從本地運行隊列和全局運行隊列都沒有找到需要運行的G if gp == nil { // 阻塞地查找可用G gp, inheritTime = findrunnable() } // 執(zhí)行G任務(wù)函數(shù) execute(gp, inheritTime)}


          runtime.schedule函數(shù)會從下面幾個地方查找待執(zhí)行的Goroutine:


          • 為了保證公平,當全局運行隊列中有待執(zhí)行的Goroutine時,通過schedtick保證有一定幾率會從全局的運行隊列中查找對應(yīng)的Goroutine;


          • 從處理器本地的運行隊列中查找待執(zhí)行的Goroutine;


          • 如果前兩種方法都沒有找到G,會通過findrunnable函數(shù)去其他P里面去“偷”一些G來執(zhí)行,如果“偷”不到,就阻塞查找直到有可運行的G。


          接下來由runtime.execute執(zhí)行獲取的Goroutine:


          func execute(gp *g, inheritTime bool) {  _g_ := getg()
          // 將G綁定到當前M上 _g_.m.curg = gp gp.m = _g_.m // 將g正式切換為_Grunning狀態(tài) casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 // 搶占信號 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard if !inheritTime { // 調(diào)度器調(diào)度次數(shù)增加1 _g_.m.p.ptr().schedtick++ } ... // gogo完成從g0到gp的切換 gogo(&gp.sched)}


          當開始執(zhí)行execute后,G會被切換到_Grunning狀態(tài),并將M和G進行綁定,最終調(diào)用runtime.gogo將Goroutine調(diào)度到當前線程上。runtime.gogo會從runtime.gobuf中取出runtime.goexit的程序計數(shù)器和待執(zhí)行函數(shù)的程序計數(shù)器,并將:


          • runtime.goexit的程序計數(shù)器被放到棧SP上;


          • 待執(zhí)行函數(shù)的程序計數(shù)器被放到了寄存器BX上。


          MOVL gobuf_sp(BX), SP  // 將runtime.goexit函數(shù)的PC恢復(fù)到SP中MOVL gobuf_pc(BX), BX  // 獲取待執(zhí)行函數(shù)的程序計數(shù)器JMP  BX                // 開始執(zhí)行


          當Goroutine中運行的函數(shù)返回時,程序會跳轉(zhuǎn)到runtime.goexit所在位置,最終在當前線程的g0的棧上調(diào)用runtime.goexit0函數(shù),該函數(shù)會將Goroutine轉(zhuǎn)換為_Gdead狀態(tài)、清理其中的字段、移除Goroutine和線程的關(guān)聯(lián)并調(diào)用runtime.gfput將G重新加入處理器的Goroutine空閑列表gFree中:


          func goexit0(gp *g) {  _g_ := getg()  // 設(shè)置當前G狀態(tài)為_Gdead  casgstatus(gp, _Grunning, _Gdead)   // 清理G  gp.m = nil  ...  gp.writebuf = nil  gp.waitreason = 0  gp.param = nil  gp.labels = nil  gp.timer = nil   // 解綁M和G  dropg()   ...  // 將G扔進gfree鏈表中等待復(fù)用  gfput(_g_.m.p.ptr(), gp)  // 再次進行調(diào)度  schedule()}


          最后runtime.goexit0會重新調(diào)用runtime.schedule觸發(fā)新一輪的Goroutine調(diào)度,調(diào)度器從runtime.schedule開始,最終又回到runtime.schedule,這就是Go語言的調(diào)度循環(huán)。



          四、Channel


          Go中經(jīng)常被人提及的一個設(shè)計模式:不要通過共享內(nèi)存的方式進行通信,而是應(yīng)該通過通信的方式共享內(nèi)存。Goroutine之間會通過 channel傳遞數(shù)據(jù),作為Go語言的核心數(shù)據(jù)結(jié)構(gòu)和Goroutine之間的通信方式,channel是支撐Go語言高性能并發(fā)編程模型的重要結(jié)構(gòu)。



          channel在運行時的內(nèi)部表示是runtime.hchan,該結(jié)構(gòu)體中包含了用于保護成員變量的互斥鎖,從某種程度上說,channel是一個用于同步和通信的有鎖隊列。hchan結(jié)構(gòu)體源碼:


          type hchan struct {    qcount    uint        // 循環(huán)列表元素個數(shù)    dataqsiz  uint        // 循環(huán)隊列的大小    buf      unsafe.Pointer  // 循環(huán)隊列的指針    elemsize  uint16      // chan中元素的大小    closed    uint32      // 是否已close    elemtype  *_type      // chan中元素類型    sendx    uint        // chan的發(fā)送操作處理到的位置    recvx    uint        // chan的接收操作處理到的位置    recvq    waitq        // 等待接收數(shù)據(jù)的Goroutine列表    sendq    waitq        // 等待發(fā)送數(shù)據(jù)的Goroutine列表        lock    mutex        // 互斥鎖}
          type waitq struct { // 雙向鏈表 first *sudog last *sudog}


          waitq中連接的是一個sudog雙向鏈表,保存的是等待中的Goroutine。




          (一)創(chuàng)建chan


          使用make關(guān)鍵字來創(chuàng)建管道,make(chan int,3)會調(diào)用到runtime.makechan函數(shù)中:


          const (  maxAlign  = 8  hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)))
          func makechan(t *chantype, size int) *hchan { elem := t.elem // 計算需要分配的buf空間大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
          var c *hchan switch { case mem == 0: // chan的大小或者elem的大小為0,不需要創(chuàng)建buf c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // elem不含指針,分配一塊連續(xù)的內(nèi)存給hchan數(shù)據(jù)結(jié)構(gòu)和buf c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // elem包含指針,單獨分配buf c = new(hchan) c.buf = mallocgc(mem, elem, true) }
          // 更新hchan的elemsize、elemtype、dataqsiz字段 c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) return c}


          上述代碼根據(jù)channel中收發(fā)元素的類型和緩沖區(qū)的大小初始化runtime.hchan和緩沖區(qū):


          • 若緩沖區(qū)所需大小為0,就只會為hchan分配一段內(nèi)存;


          • 若緩沖區(qū)所需大小不為0且elem不包含指針,會為hchan和buf分配一塊連續(xù)的內(nèi)存;


          • 若緩沖區(qū)所需大小不為0且elem包含指針,會單獨為hchan和buf分配內(nèi)存。



          (二)發(fā)送數(shù)據(jù)到chan


          發(fā)送數(shù)據(jù)到channel,ch<-i會調(diào)用到runtime.chansend函數(shù)中,該函數(shù)包含了發(fā)送數(shù)據(jù)的全部邏輯:


          func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    if c == nil {    // 對于非阻塞的發(fā)送,直接返回    if !block {      return false    }    // 對于阻塞的通道,將goroutine掛起    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)    throw("unreachable")  }    // 加鎖  lock(&c.lock)    // channel已關(guān)閉,panic  if c.closed != 0 {    unlock(&c.lock)    panic(plainError("send on closed channel"))  }    ...}


          block表示當前的發(fā)送操作是否是阻塞調(diào)用。如果channel為空,對于非阻塞的發(fā)送,直接返回false,對于阻塞的發(fā)送,將goroutine掛起,并且永遠不會返回。對channel加鎖,防止多個線程并發(fā)修改數(shù)據(jù),如果channel已關(guān)閉,報錯并中止程序。


          runtime.chansend函數(shù)的執(zhí)行過程可以分為以下三個部分:


          • 當存在等待的接收者時,通過runtime.send直接將數(shù)據(jù)發(fā)送給阻塞的接收者;


          • 當緩沖區(qū)存在空余空間時,將發(fā)送的數(shù)據(jù)寫入緩沖區(qū);


          • 當不存在緩沖區(qū)或緩沖區(qū)已滿時,等待其他Goroutine從channel接收數(shù)據(jù)。


          • 直接發(fā)送


          如果目標channel沒有被關(guān)閉且recvq隊列中已經(jīng)有處于讀等待的Goroutine,那么runtime.chansend會從接收隊列 recvq中取出最先陷入等待的Goroutine并直接向它發(fā)送數(shù)據(jù),注意,由于有接收者在等待,所以如果有緩沖區(qū),那么緩沖區(qū)一定是空的:


          func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {    ...    // 從recvq中取出一個接收者  if sg := c.recvq.dequeue(); sg != nil {     // 如果接收者存在,直接向該接收者發(fā)送數(shù)據(jù),繞過buf    send(c, sg, ep, func() { unlock(&c.lock) }, 3)    return true  }    ...}


          直接發(fā)送會調(diào)用runtime.send函數(shù):


          func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {  ...  if sg.elem != nil {    // 直接把要發(fā)送的數(shù)據(jù)copy到接收者的棧空間    sendDirect(c.elemtype, sg, ep)    sg.elem = nil  }  gp := sg.g  unlockf()  gp.param = unsafe.Pointer(sg)  if sg.releasetime != 0 {    sg.releasetime = cputicks()  }  // 設(shè)置對應(yīng)的goroutine為可運行狀態(tài)  goready(gp, skip+1)}


          sendDirect方法調(diào)用memmove進行數(shù)據(jù)的內(nèi)存拷貝。goready方法將等待接收數(shù)據(jù)的Goroutine標記成可運行狀態(tài)(Grunnable)并把該Goroutine發(fā)到發(fā)送方所在的處理器的runnext上等待執(zhí)行,該處理器在下一次調(diào)度時會立刻喚醒數(shù)據(jù)的接收方。注意,只是放到了runnext中,并沒有立刻執(zhí)行該Goroutine。


          • 發(fā)送到緩沖區(qū)


          如果緩沖區(qū)未滿,則將數(shù)據(jù)寫入緩沖區(qū):


          func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {  ...  // 如果緩沖區(qū)沒有滿,直接將要發(fā)送的數(shù)據(jù)復(fù)制到緩沖區(qū)  if c.qcount < c.dataqsiz {    // 找到buf要填充數(shù)據(jù)的索引位置    qp := chanbuf(c, c.sendx)    ...    // 將數(shù)據(jù)拷貝到buf中    typedmemmove(c.elemtype, qp, ep)    // 數(shù)據(jù)索引前移,如果到了末尾,又從0開始    c.sendx++    if c.sendx == c.dataqsiz {      c.sendx = 0    }    // 元素個數(shù)加1,釋放鎖并返回    c.qcount++    unlock(&c.lock)    return true  }  ...}


          找到緩沖區(qū)要填充數(shù)據(jù)的索引位置,調(diào)用typedmemmove方法將數(shù)據(jù)拷貝到緩沖區(qū)中,然后重新設(shè)值sendx偏移量。


          • 阻塞發(fā)送


          當channel沒有接收者能夠處理數(shù)據(jù)時,向channel發(fā)送數(shù)據(jù)會被下游阻塞,使用select關(guān)鍵字可以向channel非阻塞地發(fā)送消息:


          func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {  ...  // 緩沖區(qū)沒有空間了,對于非阻塞調(diào)用直接返回  if !block {    unlock(&c.lock)    return false  }  // 創(chuàng)建sudog對象  gp := getg()  mysg := acquireSudog()  mysg.releasetime = 0  if t0 != 0 {    mysg.releasetime = -1  }  mysg.elem = ep  mysg.waitlink = nil  mysg.g = gp  mysg.isSelect = false  mysg.c = c  gp.waiting = mysg  gp.param = nil  // 將sudog對象入隊  c.sendq.enqueue(mysg)  // 進入等待狀態(tài)  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)  ...}


          對于非阻塞的調(diào)用會直接返回,對于阻塞的調(diào)用會創(chuàng)建sudog對象并將sudog對象加入發(fā)送等待隊列。調(diào)用gopark將當前Goroutine轉(zhuǎn)入waiting狀態(tài)。調(diào)用gopark之后,在使用者看來向該channel發(fā)送數(shù)據(jù)的代碼語句會被阻塞。


          發(fā)送數(shù)據(jù)整個流程大致如下:



          注意,發(fā)送數(shù)據(jù)的過程中包含幾個會觸發(fā)Goroutine調(diào)度的時機:


          • 發(fā)送數(shù)據(jù)時發(fā)現(xiàn)從channel上存在等待接收數(shù)據(jù)的Goroutine,立刻設(shè)置處理器的runnext屬性,但是并不會立刻觸發(fā)調(diào)度;


          • 發(fā)送數(shù)據(jù)時并沒有找到接收方并且緩沖區(qū)已經(jīng)滿了,這時會將自己加入channel的sendq隊列并調(diào)用gopark觸發(fā)Goroutine的調(diào)度讓出處理器的使用權(quán)。



          (三)從chan接收數(shù)據(jù)


          從channel獲取數(shù)據(jù)最終調(diào)用到runtime.chanrecv函數(shù):


          func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  if c == nil {        // 如果c為空且是非阻塞調(diào)用,直接返回    if !block {      return    }        // 阻塞調(diào)用直接等待    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)    throw("unreachable")  }  ···  lock(&c.lock)    // 如果c已經(jīng)關(guān)閉,并且c中沒有數(shù)據(jù),返回  if c.closed != 0 && c.qcount == 0 {    unlock(&c.lock)    if ep != nil {      typedmemclr(c.elemtype, ep)    }    return true, false  }    ···}


          當從一個空channel接收數(shù)據(jù)時,直接調(diào)用gopark讓出處理器使用權(quán)。如果當前channel已被關(guān)閉且緩沖區(qū)中沒有數(shù)據(jù),直接返回。


          runtime.chanrecv函數(shù)的具體執(zhí)行過程可以分為以下三個部分:


          • 當存在等待的發(fā)送者時,通過runtime.recv從阻塞的發(fā)送者或者緩沖區(qū)中獲取數(shù)據(jù);


          • 當緩沖區(qū)存在數(shù)據(jù)時,從channel的緩沖區(qū)中接收數(shù)據(jù);


          • 當緩沖區(qū)中不存在數(shù)據(jù)時,等待其他Goroutine向channel發(fā)送數(shù)據(jù)。


          • 直接接收


          當channel的sendq隊列中包含處于發(fā)送等待狀態(tài)的Goroutine時,調(diào)用runtime.recv直接從這個發(fā)送者那里提取數(shù)據(jù)。注意,由于有發(fā)送者在等待,所以如果有緩沖區(qū),那么緩沖區(qū)一定是滿的。


          func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  ...  // 從發(fā)送者隊列獲取數(shù)據(jù)  if sg := c.sendq.dequeue(); sg != nil {     // 發(fā)送者隊列不為空,直接從發(fā)送者那里提取數(shù)據(jù)    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)    return true, true  }   ...}


          主要看一下runtime.recv的實現(xiàn):


          func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {    // 如果是無緩沖區(qū)chan  if c.dataqsiz == 0 {    if ep != nil {            // 直接從發(fā)送者拷貝數(shù)據(jù)      recvDirect(c.elemtype, sg, ep)    }    // 有緩沖區(qū)chan  } else {        // 獲取buf的存放數(shù)據(jù)指針    qp := chanbuf(c, c.recvx)        // 直接從緩沖區(qū)拷貝數(shù)據(jù)給接收者    if ep != nil {      typedmemmove(c.elemtype, ep, qp)    }        // 從發(fā)送者拷貝數(shù)據(jù)到緩沖區(qū)    typedmemmove(c.elemtype, qp, sg.elem)    c.recvx++    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz  }  gp := sg.g  gp.param = unsafe.Pointer(sg)    // 設(shè)置對應(yīng)的goroutine為可運行狀態(tài)  goready(gp, skip+1)}


          該函數(shù)會根據(jù)緩沖區(qū)的大小分別處理不同的情況:


          • 如果channel不存在緩沖區(qū):直接從發(fā)送者那里提取數(shù)據(jù)。


          • 如果channel存在緩沖區(qū):


          1. 將緩沖區(qū)中的數(shù)據(jù)拷貝到接收方的內(nèi)存地址;


          2. 將發(fā)送者數(shù)據(jù)拷貝到緩沖區(qū),并喚醒發(fā)送者。


          無論發(fā)生哪種情況,運行時都會調(diào)用goready將等待發(fā)送數(shù)據(jù)的Goroutine標記成可運行狀態(tài)(Grunnable)并將當前處理器的runnext設(shè)置成發(fā)送數(shù)據(jù)的Goroutine,在調(diào)度器下一次調(diào)度時將阻塞的發(fā)送方喚醒。


          • 從緩沖區(qū)接收


          如果channel緩沖區(qū)中有數(shù)據(jù)且發(fā)送者隊列中沒有等待發(fā)送的Goroutine時,直接從緩沖區(qū)中recvx的索引位置取出數(shù)據(jù):


          func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  ...    // 如果緩沖區(qū)中有數(shù)據(jù)  if c.qcount > 0 {    qp := chanbuf(c, c.recvx)        // 從緩沖區(qū)復(fù)制數(shù)據(jù)到ep    if ep != nil {      typedmemmove(c.elemtype, ep, qp)    }    typedmemclr(c.elemtype, qp)        // 接收數(shù)據(jù)的指針前移    c.recvx++        // 環(huán)形隊列,如果到了末尾,再從0開始    if c.recvx == c.dataqsiz {      c.recvx = 0    }        // 緩沖區(qū)中現(xiàn)存數(shù)據(jù)減一    c.qcount--        unlock(&c.lock)    return true, true  }  ...}


          • 阻塞接收


          當channel的發(fā)送隊列中不存在等待的Goroutine并且緩沖區(qū)中也不存在任何數(shù)據(jù)時,從管道中接收數(shù)據(jù)的操作會被阻塞,使用 select 關(guān)鍵字可以非阻塞地接收消息:


          func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {  ...  // 非阻塞,直接返回  if !block {    unlock(&c.lock)    return false, false  }   // 創(chuàng)建sudog  gp := getg()  mysg := acquireSudog()  ···  gp.waiting = mysg  mysg.g = gp  mysg.isSelect = false  mysg.c = c  gp.param = nil  // 將sudog添加到等待接收隊列中  c.recvq.enqueue(mysg)  // 阻塞Goroutine,等待被喚醒  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)  ...}


          如果是非阻塞調(diào)用,直接返回。阻塞調(diào)用會將當前Goroutine封裝成sudog,然后將sudog添加到等待接收隊列中,調(diào)用gopark讓出處理器的使用權(quán)并等待調(diào)度器的調(diào)度。


          注意,接收數(shù)據(jù)的過程中包含幾個會觸發(fā)Goroutine調(diào)度的時機:


          • 當channel為空時


          • 當channel的緩沖區(qū)中不存在數(shù)據(jù)并且sendq中也不存在等待的發(fā)送者時



          (四)關(guān)閉chan


          關(guān)閉通道會調(diào)用到runtime.closechan方法:


          func closechan(c *hchan) {    // 校驗邏輯    ...    lock(&c.lock)    // 設(shè)置chan已關(guān)閉  c.closed = 1  var glist gList    // 獲取所有接收者  for {    sg := c.recvq.dequeue()    if sg == nil {      break    }    if sg.elem != nil {      typedmemclr(c.elemtype, sg.elem)      sg.elem = nil    }    gp := sg.g    gp.param = nil    glist.push(gp)  }  // 獲取所有發(fā)送者  for {    sg := c.sendq.dequeue()    ...  }    unlock(&c.lock)    // 喚醒所有g(shù)list中的goroutine  for !glist.empty() {    gp := glist.pop()    gp.schedlink = 0    goready(gp, 3)  }}


          將recvq和sendq兩個隊列中的Goroutine加入到gList中,并清除所有sudog上未被處理的元素。最后將所有g(shù)list中的Goroutine加入調(diào)度隊列,等待被喚醒。注意,發(fā)送者在被喚醒之后會panic。


          總結(jié)一下發(fā)送/接收/關(guān)閉操作可能引發(fā)的結(jié)果:



          Goroutine+channel的組合非常強壯,兩者的實現(xiàn)共同支撐起了Go語言的并發(fā)機制。


          參考資料:

          1.Go并發(fā)編程實戰(zhàn)

          2.Go語言設(shè)計與實現(xiàn)



           作者簡介


          郭劍池

          騰訊游戲后臺開發(fā)工程師

          騰訊游戲后臺開發(fā)工程師,畢業(yè)于北京郵電大學。目前負責魂斗羅歸來手游服務(wù)器端的相關(guān)開發(fā)工作。在學習和鉆研Go語言的過程中,希望能和大家分享更多的心得體會,共同進步。



          推薦閱讀


          福利

          我為大家整理了一份從入門到進階的Go學習資料禮包,包含學習建議:入門看什么,進階看什么。關(guān)注公眾號 「polarisxu」,回復(fù) ebook 獲取;還可以回復(fù)「進群」,和數(shù)萬 Gopher 交流學習。

          瀏覽 15
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天干天天射天天射 | 丰滿人妻一区二区三区 | 三级视频在线播放 | 日本亚洲色a | 青青草无码在线观看 |