說說Kafka控制器事件處理全流程
前言
今天來說說 Kafka控制器,即 Kafka Controller。
這篇文章我分為兩部分,第一部分就是直接圖文來說清整個 Kafka 控制器事件處理全流程,然后再通過Controller選舉流程進行一波源碼分析,再來走一遍處理全流程。
正文
在深入源碼之前我們得先搞明白 Controller是什么?它有什么用?這樣在看源碼的時候才能有的放矢。
Controller是核心組件,它的作用是管理和協(xié)調整個Kafka集群。
具體管理和協(xié)調什么呢?
主題的管理,創(chuàng)建和刪除主題; 分區(qū)管理,增加或重分配分區(qū); 分區(qū) Leader選舉;監(jiān)聽 Broker相關變化,即Broker新增、關閉等;元數(shù)據(jù)管理,向其他 Broker提供元數(shù)據(jù)服務;
為什么需要Controller?
我個人理解:凡是管理或者協(xié)調某樣東西,都需要有個Leader,由他來把控全局,管理內部,對接外部,咱們就跟著Leader干就完事了。這其實對外也是好的,外部不需要和我們整體溝通,他只要和一個決策者交流,效率更高。
再來看看朱大是怎么說的,以下內容來自《深入理解Kafka:核心設計與實踐原理》。
在Kafka的早期版本中,并沒有采用 Kafka Controller 這樣一概念來對分區(qū)和副本的狀態(tài)進行管理,而是依賴于 ZooKeeper,每個 broker都會在 ZooKeeper 上為分區(qū)和副本注冊大量的監(jiān)聽器(Watcher)。
當分區(qū)或副本狀態(tài)變化時,會喚醒很多不必要的監(jiān)聽器,這種嚴重依賴 ZooKeeper 的設計會有腦裂、羊群效應,以及造成 ZooKeeper 過載的隱患。
在目前的新版本的設計中,只有 Kafka Controller 在 ZooKeeper 上注冊相應的監(jiān)聽器,其他的 broker 極少需要再監(jiān)聽 ZooKeeper 中的數(shù)據(jù)變化,這樣省去了很多不必要的麻煩。
簡單說下ZooKeeper
了解了 Controller的作用之后我們還需要在簡單的了解下ZooKeeper,因為Controller是極度依賴ZooKeeper的。(不過社區(qū)準備移除ZooKeeper,文末再提一下)。
ZooKeeper是一個開源的分布式協(xié)調服務框架,最常用來作為注冊中心等。ZooKeeper的數(shù)據(jù)模型就像文件系統(tǒng)一樣,以根目錄 "/" 開始,結構上的每個節(jié)點稱為znode,可以存儲一些信息。節(jié)點分為持久節(jié)點和臨時節(jié)點,臨時節(jié)點會隨著會話結束而自動被刪除。
并且有Watcher功能,節(jié)點自身數(shù)據(jù)變更、節(jié)點新增、節(jié)點刪除、子節(jié)點數(shù)量變更都可以通過變更監(jiān)聽器通知客戶端。

Controller是如何依賴ZooKeeper的
每個Broker在啟動時會嘗試向ZooKeeper注冊/controller節(jié)點來競選控制器,第一個創(chuàng)建/controller節(jié)點的Broker會被指定為控制器。這就是是控制器的選舉。
/controller節(jié)點是個臨時節(jié)點,其他Broker會監(jiān)聽著此節(jié)點,當/controller節(jié)點所在的Broker宕機之后,會話就結束了,此節(jié)點就被移除。其他Broker伺機而動,都來爭當控制器,還是第一個創(chuàng)建/controller節(jié)點的Broker被指定為控制器。這就是控制器故障轉移,即Failover。
當然還包括各種節(jié)點的監(jiān)聽,例如主題的增減等,都通過Watcher功能,來實現(xiàn)相關的監(jiān)聽,進行對應的處理。
Controller在初始化的時候會從ZooKeeper拉取集群元數(shù)據(jù)信息,保存在自己的緩存中,然后通過向集群其他Broker發(fā)送請求的方式將數(shù)據(jù)同步給對方。
Controller 底層事件模型
不管是監(jiān)聽Watcher的ZooKeeperWatcher線程,還是定時任務線程亦或是其他線程都需要訪問或更新Controller從集群拉取的元數(shù)據(jù)。多線程 + 數(shù)據(jù)競爭 = 線程不安全。因此需要加鎖來保證線程安全。
一開始Kafka就是用大量的鎖來保證線程間的同步,各種加鎖使得性能下降,并且多線程加鎖的方式使得代碼復雜度急劇上升,一不小心就會出各種問題,bug難修復。
因此在0.11版本之后將多線程并發(fā)訪問改成了單線程事件隊列模式。將涉及到共享數(shù)據(jù)競爭相關方面的訪問抽象成事件,將事件塞入阻塞隊列中,然后單線程處理。
也就是說其它線程還是在的,只是把涉及共享數(shù)據(jù)的操作封裝成事件由專屬線程處理。

先小結一下
到這我們已經清楚了Controller主要用來管理和協(xié)調集群,具體是通過ZooKeeper臨時節(jié)點和Watcher機制來監(jiān)控集群的變化(當然還有來自定時任務或其他線程的事件驅動),更新集群的元數(shù)據(jù),并且通知集群中的其他Broker進行相關的操作(這部分下文會講)。
而由于集群元數(shù)據(jù)會有并發(fā)修改問題,因此將操作抽象成事件,由阻塞隊列和單線程處理來替換之前的多線程處理,降低代碼的復雜度,提升代碼的可維護性和性能。
接下來我們再講講Controller通知集群中的其他Broker的相關操作。
Controller的請求發(fā)送
Controller從ZooKeeper那兒得到變更通知之后,需要告知集群中的Broker(包括它自身)做相應的處理。
Controller只會給集群的Broker發(fā)送三種請求:分別是 LeaderAndIsrRequest、StopReplicaRequest和 UpdateMetadataRequest
LeaderAndIsrRequest
告知Broker主題相關分區(qū)Leader和ISR副本都在哪些 Broker上。
StopReplicaRequest
告知Broker停止相關副本操作,用于刪除主題場景或分區(qū)副本遷移場景。
UpdateMetadataRequest
更新Broker上的元數(shù)據(jù)。
Controller事件處理線程會把事件封裝成對應的請求,然后將請求寫入對應的Broker的請求阻塞隊列,然后RequestSendThread不斷從阻塞隊列中獲取待發(fā)送的請求。

先解釋下controllerBrokerStateInfo,它就是個 POJO類,可以理解為集群每個broker對應一個controllerBrokerStateInfo.

然后再看下ControllerChannelManager,從名字可以看出它管理Controller和集群Broker之間的連接,并為每個Broker創(chuàng)建一個RequestSendThread線程。

再小結一下
接著上個小結,事件處理線程將事件隊列里面的事件處理之后再進行對應的請求封裝,塞入需要通知的集群Broker對應的阻塞隊列中,然后由每個Broker專屬的requestSendThread發(fā)送請求至對應的Broker。
總的步驟如下圖:

現(xiàn)在應該已經清楚Controller大概是如何運作的,整體看起來還是生產者-消費者模型。
接下來就進入源碼環(huán)節(jié)。
Controller選舉流程源碼分析
事件處理的流程都是一樣的,只是具體處理的事件邏輯不同,我們從Controller選舉入手,來走一遍處理流程。
ControllerChangeHandler
選舉會觸發(fā)此handler,可以看到直接往ControllerEventManager的事件隊列里塞。

這個QueueEvent和ControllerEventManager,我們先來看看是啥。不過在此之前先了解下ControllerEvent和ControllerEventProcessor。
ControllerEvent:事件

ControllerEventProcessor :事件處理接口
此接口的唯一實現(xiàn)類是 KafkaController。

ControllerEventManager:事件處理器
此類主要用來管理事件處理線程和事件隊列。

QueuedEvent:封裝了ControllerEvent的類
主要是記錄了下入隊時間,并且提供了事件需要調用的方法。

ControllerEventThread:事件處理線程
整體而言還是很簡單的,從隊列拿事件,然后處理。

KafkaController#process
就是個switch,根據(jù)事件調用對應的processXXXX方法。

來關注下controller 重選事件

然后在onControllerFailover里面會調用sendUpdateMetadataRequest方法
中間省略調用,內容太多了,不是重點,到后來調用ControllerBrokerRequestBatch#sendRequest
最后還是調用了controllerChannelManager#sendRequest.

然后 RequestSendThread#doWork,不斷從請求隊列里拿請求,發(fā)送請求。

一個環(huán)節(jié)完成了!我們來看下整體流程圖

最后我們來看下元數(shù)據(jù)到底有啥和KafkaController的一些字段。
ControllerContext:元數(shù)據(jù)
主要有運行中的Broker、所有主題信息、主題分區(qū)副本信息等。

KafkaController
基本上關鍵的字段都解釋了,關于狀態(tài)機那一塊篇幅有限,之后再說。

最后
整體的流程就是將Controller相關操作都封裝成一個個事件,然后將事件入隊,由一個事件處理線程來處理,保證數(shù)據(jù)的安全(從這也可以看出,不是多線程就是好,有利有弊最終還是看場景)。
最后在通知集群中Broker的過程是每個Broker配備一個發(fā)送線程,因為發(fā)送是同步的,因此每個Broker線程隔離可以防止某個Broker阻塞而導致整體都阻塞的情況。
前面有說到Kafka Controller 強依賴 ZooKeeper。但是現(xiàn)在社區(qū)打算移除 ZooKeeper,因為ZooKeeper不適合頻繁寫,并且是CP的。而且用Kafka還需要維護ZooKeeper集群,提升了系統(tǒng)的復雜度和運維難度,降低了系統(tǒng)的穩(wěn)定性。
像位移信息,已經通過內部主題的方式保存,繞開了ZooKeeper。
社區(qū)打算通過類 Raft 共識算法來選舉Controller,并且把元數(shù)據(jù)存儲在 Log 中的方式來做。
往期推薦:
看完本文有收獲?點贊、分享是最大的支持
明天見(??ω??)??
