<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          時(shí)間輪在Kafka的實(shí)踐

          共 7756字,需瀏覽 16分鐘

           ·

          2021-10-01 14:54


          桔妹導(dǎo)讀:時(shí)間輪是一個(gè)應(yīng)用場(chǎng)景很廣的組件,在很多高性能中間件中都有它的身影,如Netty、Quartz、Akka,當(dāng)然也包括Kafka,本文主要介紹時(shí)間輪在kafka的應(yīng)用和實(shí)戰(zhàn),從核心源碼和設(shè)計(jì)的角度對(duì)時(shí)間輪進(jìn)行深入的講解 。




          1. 

          引子
          從2個(gè)面試題說(shuō)起,第一個(gè)問(wèn)題:如果一臺(tái)機(jī)器上有10w個(gè)定時(shí)任務(wù),如何做到高效觸發(fā)?

          具體場(chǎng)景是:

          有一個(gè)APP實(shí)時(shí)消息通道系統(tǒng),對(duì)每個(gè)用戶會(huì)維護(hù)一個(gè)APP到服務(wù)器的TCP連接,用來(lái)實(shí)時(shí)收發(fā)消息,對(duì)這個(gè)TCP連接,有這樣一個(gè)需求:“如果連續(xù)30s沒(méi)有請(qǐng)求包(例如登錄,消息,keepalive包),服務(wù)端就要將這個(gè)用戶的狀態(tài)置為離線”。
           其中,單機(jī)TCP同時(shí)在線量約在10w級(jí)別,keepalive請(qǐng)求包較分散大概30s一次,吞吐量約在3000qps。

          怎么做?

          常用方案使用time定時(shí)任務(wù),每秒掃描一次所有連接的集合Map<uid, last_packet_time>,把連接時(shí)間(每次有新的請(qǐng)求更新對(duì)應(yīng)連接的連接時(shí)間)比當(dāng)前時(shí)間的差值大30s的連接找出來(lái)處理。

          另一種方案,使用環(huán)形隊(duì)列法:


          三個(gè)重要的數(shù)據(jù)結(jié)構(gòu):

          1. 30s超時(shí),就創(chuàng)建一個(gè)index從0到30的環(huán)形隊(duì)列(本質(zhì)是個(gè)數(shù)組)
          2. 環(huán)上每一個(gè)slot是一個(gè)Set<uid>,任務(wù)集合
          3. 同時(shí)還有一個(gè)Map<uid, index>,記錄uid落在環(huán)上的哪個(gè)slot里

          這樣當(dāng)有某用戶uid有請(qǐng)求包到達(dá)時(shí):

          1. 從Map結(jié)構(gòu)中,查找出這個(gè)uid存儲(chǔ)在哪一個(gè)slot里
          2. 從這個(gè)slot的Set結(jié)構(gòu)中,刪除這個(gè)uid
          3. 將uid重新加入到新的slot中,具體是哪一個(gè)slot呢 => Current Index指針?biāo)赶虻?/span>上一個(gè)slot,因?yàn)檫@個(gè)slot,會(huì)被timer在30s之后掃描到
          4. 更新Map,這個(gè)uid對(duì)應(yīng)slot的index值

          哪些元素會(huì)被超時(shí)掉呢?

          Current Index每秒種移動(dòng)一個(gè)slot,這個(gè)slot對(duì)應(yīng)的Set<uid>中所有uid都應(yīng)該被集體超時(shí)!如果最近30s有請(qǐng)求包來(lái)到,一定被放到Current Index的前一個(gè)slot了,Current Index所在的slot對(duì)應(yīng)Set中所有元素,都是最近30s沒(méi)有請(qǐng)求包來(lái)到的。

          所以,當(dāng)沒(méi)有超時(shí)時(shí),Current Index掃到的每一個(gè)slot的Set中應(yīng)該都沒(méi)有元素。

          兩種方案對(duì)比:

          方案一每次都要輪詢所有數(shù)據(jù),而方案二使用環(huán)形隊(duì)列只需要輪詢這一刻需要過(guò)期的數(shù)據(jù),如果沒(méi)有數(shù)據(jù)過(guò)期則沒(méi)有數(shù)據(jù)要處理,并且是批量超時(shí),并且由于是環(huán)形結(jié)構(gòu)更加節(jié)約空間,這很適合高性能場(chǎng)景。

          第二個(gè)問(wèn)題:在開發(fā)過(guò)程中有延遲一定時(shí)間的任務(wù)要執(zhí)行,怎么做?

          如果不重復(fù)造輪子的話,我們的選擇當(dāng)然是延遲隊(duì)列或者Timer。

          延遲隊(duì)列和在Timer中增 加延時(shí)任務(wù)采用數(shù)組表示的最小堆的數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn),每次放入新元素和移除隊(duì)首元素時(shí)間復(fù)雜度為O(nlog(n))。



          2. 

          時(shí)間輪
          方案二所采用的環(huán)形隊(duì)列,就是時(shí)間輪的底層數(shù)據(jù)結(jié)構(gòu),它能夠讓需要處理的數(shù)據(jù)(任務(wù)的抽象)集中,在Kafka中存在大量的延遲操作,比如延遲生產(chǎn)、延遲拉取以及延遲刪除等。Kafka并沒(méi)有使用JDK自帶的Timer或者DelayQueue來(lái)實(shí)現(xiàn)延遲的功能,而是基于時(shí)間輪自定義了一個(gè)用于實(shí)現(xiàn)延遲功能的定時(shí)器(SystemTimer)。JDK的Timer和DelayQueue插入和刪除操作的平均時(shí)間復(fù)雜度為O(nlog(n)),并不能滿足Kafka的高性能要求,而基于時(shí)間輪可以將插入和刪除操作的時(shí)間復(fù)雜度都降為O(1)。時(shí)間輪的應(yīng)用并非Kafka獨(dú)有,其應(yīng)用場(chǎng)景還有很多,在Netty、Akka、Quartz、Zookeeper等組件中都存在時(shí)間輪的蹤影。

          2.1 時(shí)間輪的數(shù)據(jù)結(jié)構(gòu)

          參考下圖,Kafka中的時(shí)間輪(TimingWheel)是一個(gè)存儲(chǔ)定時(shí)任務(wù)的環(huán)形隊(duì)列,底層采用數(shù)組實(shí)現(xiàn),數(shù)組中的每個(gè)元素可以存放一個(gè)定時(shí)任務(wù)列表(TimerTaskList)。TimerTaskList是一個(gè)環(huán)形的雙向鏈表,鏈表中的每一項(xiàng)表示的都是定時(shí)任務(wù)項(xiàng)(TimerTaskEntry),其中封裝了真正的定時(shí)任務(wù)TimerTask。在Kafka源碼中對(duì)這個(gè)TimeTaskList是用一個(gè)名稱為buckets的數(shù)組表示的,所以后面介紹中可能TimerTaskList也會(huì)被稱為bucket。

          圖二

          針對(duì)上圖的幾個(gè)名詞簡(jiǎn)單解釋下:

          • tickMs:時(shí)間輪由多個(gè)時(shí)間格組成,每個(gè)時(shí)間格就是tickMs,它代表當(dāng)前時(shí)間輪的基本時(shí)間跨度。
          • wheelSize:代表每一層時(shí)間輪的格數(shù)
          • interval:當(dāng)前時(shí)間輪的總體時(shí)間跨度,interval=tickMs × wheelSize
          • startMs:構(gòu)造當(dāng)層時(shí)間輪時(shí)候的當(dāng)前時(shí)間,第一層的時(shí)間輪的startMs是TimeUnit.NANOSECONDS.toMillis(nanoseconds()),上層時(shí)間輪的startMs為下層時(shí)間輪的currentTime。
          • currentTime:表示時(shí)間輪當(dāng)前所處的時(shí)間,currentTime是tickMs的整數(shù)倍(通過(guò)currentTime=startMs - (startMs % tickMs來(lái)保正currentTime一定是tickMs的整數(shù)倍),這個(gè)運(yùn)算類比鐘表中分鐘里65秒分鐘指針指向的還是1分鐘)。currentTime可以將整個(gè)時(shí)間輪劃分為到期部分和未到期部分,currentTime當(dāng)前指向的時(shí)間格也屬于到期部分,表示剛好到期,需要處理此時(shí)間格所對(duì)應(yīng)的TimerTaskList的所有任務(wù)。


          2.2 時(shí)間輪中的任務(wù)存放

          若時(shí)間輪的tickMs=1ms,wheelSize=20,那么可以計(jì)算得出interval為20ms。初始情況下表盤指針currentTime指向時(shí)間格0,此時(shí)有一個(gè)定時(shí)為2ms的任務(wù)插入進(jìn)來(lái)會(huì)存放到時(shí)間格為2的TimerTaskList中。隨著時(shí)間的不斷推移,指針currentTime不斷向前推進(jìn),過(guò)了2ms之后,當(dāng)?shù)竭_(dá)時(shí)間格2時(shí),就需要將時(shí)間格2所對(duì)應(yīng)的TimeTaskList中的任務(wù)做相應(yīng)的到期操作。此時(shí)若又有一個(gè)定時(shí)為8ms的任務(wù)插入進(jìn)來(lái),則會(huì)存放到時(shí)間格10中,currentTime再過(guò)8ms后會(huì)指向時(shí)間格10。如果同時(shí)有一個(gè)定時(shí)為19ms的任務(wù)插入進(jìn)來(lái)怎么辦?新來(lái)的TimerTaskEntry會(huì)復(fù)用原來(lái)的TimerTaskList,所以它會(huì)插入到原本已經(jīng)到期的時(shí)間格1中。總之,整個(gè)時(shí)間輪的總體跨度是不變的,隨著指針currentTime的不斷推進(jìn),當(dāng)前時(shí)間輪所能處理的時(shí)間段也在不斷后移,總體時(shí)間范圍在currentTime和currentTime+interval之間。
           

          2.3 時(shí)間輪的升降級(jí)

          如果此時(shí)有個(gè)定時(shí)為350ms的任務(wù)該如何處理?直接擴(kuò)充wheelSize的大小么?Kafka中不乏幾萬(wàn)甚至幾十萬(wàn)毫秒的定時(shí)任務(wù),這個(gè)wheelSize的擴(kuò)充沒(méi)有底線,就算將所有的定時(shí)任務(wù)的到期時(shí)間都設(shè)定一個(gè)上限,比如100萬(wàn)毫秒,那么這個(gè)wheelSize為100萬(wàn)毫秒的時(shí)間輪不僅占用很大的內(nèi)存空間,而且效率也會(huì)拉低。Kafka為此引入了層級(jí)時(shí)間輪的概念,當(dāng)任務(wù)的到期時(shí)間超過(guò)了當(dāng)前時(shí)間輪所表示的時(shí)間范圍時(shí),就會(huì)嘗試添加到上層時(shí)間輪中。

          圖三

          參考上圖,復(fù)用之前的案例,第一層的時(shí)間輪tickMs=1ms, wheelSize=20, interval=20ms。第二層的時(shí)間輪的tickMs為第一層時(shí)間輪的interval,即為20ms。每一層時(shí)間輪的wheelSize是固定的,都是20,那么第二層的時(shí)間輪的總體時(shí)間跨度interval為400ms。以此類推,這個(gè)400ms也是第三層的tickMs的大小,第三層的時(shí)間輪的總體時(shí)間跨度為8000ms。


          剛才提到的350ms的任務(wù),不會(huì)插入到第一層時(shí)間輪,會(huì)插入到interval=20*20的第二層時(shí)間輪中,具體插入到時(shí)間輪的哪個(gè)bucket呢?先用350/tickMs(20)=virtualId(17),然后virtualId(17) %wheelSize (20) = 17,所以350會(huì)放在第17個(gè)bucket。如果此時(shí)有一個(gè)450ms后執(zhí)行的任務(wù),那么會(huì)放在第三層時(shí)間輪中,按照剛才的計(jì)算公式,會(huì)放在第0個(gè)bucket。第0個(gè)bucket里會(huì)包含
          [400,800)ms的任務(wù)。隨著時(shí)間流逝,當(dāng)時(shí)間過(guò)去了400ms,那么450ms后就要執(zhí)行的任務(wù)還剩下50ms的時(shí)間才能執(zhí)行,此時(shí)有一個(gè)時(shí)間輪降級(jí)的操作,將50ms任務(wù)重新提交到層級(jí)時(shí)間輪中,那么此時(shí)50ms的任務(wù)根據(jù)公式會(huì)放入第二個(gè)時(shí)間輪的第2個(gè)bucket中,此bucket的時(shí)間范圍為[40,60)ms,然后再經(jīng)過(guò)40ms,這個(gè)50ms的任務(wù)又會(huì)被監(jiān)控到,此時(shí)距離任務(wù)執(zhí)行還有10ms,同樣將10ms的任務(wù)提交到層級(jí)時(shí)間輪,此時(shí)會(huì)加入到第一層時(shí)間輪的第10個(gè)bucket,所以再經(jīng)過(guò)10ms后,此任務(wù)到期,最終執(zhí)行。


          整個(gè)時(shí)間輪的升級(jí)降級(jí)操作是不是很類似于我們的時(shí)鐘? 第一層時(shí)間輪tickMs=1s, wheelSize=60,interval=1min,此為秒鐘;第二層tickMs=1min,wheelSize=60,interval=1hour,此為分鐘;第三層tickMs=1hour,wheelSize為12,interval為12hours,此為時(shí)鐘。而鐘表的指針就對(duì)應(yīng)程序中的currentTime,這個(gè)后面分析代碼時(shí)候會(huì)講到(對(duì)這個(gè)的理解也是時(shí)間輪理解的重點(diǎn)和難點(diǎn))。


          2.4 任務(wù)添加和驅(qū)動(dòng)時(shí)間輪滾動(dòng)核心流程圖

          圖四


          2.5 重點(diǎn)代碼介紹

          這是往SystenTimer中添加一個(gè)任務(wù)。

          //在Systemtimer中添加一個(gè)任務(wù),任務(wù)被包裝為一個(gè)TimerTaskEntryprivate def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {//先判斷是否可以添加進(jìn)時(shí)間輪中,如果不可以添加進(jìn)去代表任務(wù)已經(jīng)過(guò)期或者任務(wù)被取消,注意這里的timingWheel持有上一層時(shí)間輪的引用,所以可能存在遞歸調(diào)用 if (!timingWheel.add(timerTaskEntry)) { // Already expired or cancelled if (!timerTaskEntry.cancelled) //過(guò)期任務(wù)直接線程池異步執(zhí)行掉 taskExecutor.submit(timerTaskEntry.timerTask) }}timingWheel添加任務(wù),遞歸添加直到添加該任務(wù)進(jìn)合適的時(shí)間輪的bucket中def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs //任務(wù)取消 if (timerTaskEntry.cancelled) { // Cancelled false } else if (expiration < currentTime + tickMs) { // 任務(wù)過(guò)期后會(huì)被執(zhí)行 false } else if (expiration < currentTime + interval) {//任務(wù)過(guò)期時(shí)間比當(dāng)前時(shí)間輪時(shí)間加周期小說(shuō)明任務(wù)過(guò)期時(shí)間在本時(shí)間輪周期內(nèi) val virtualId = expiration / tickMs //找到任務(wù)對(duì)應(yīng)本時(shí)間輪的bucket val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) // Set the bucket expiration time //只有本bucket內(nèi)的任務(wù)都過(guò)期后才會(huì)bucket.setExpiration返回true此時(shí)將bucket放入延遲隊(duì)列 if (bucket.setExpiration(virtualId * tickMs)) { //bucket是一個(gè)TimerTaskList,它實(shí)現(xiàn)了java.util.concurrent.Delayed接口,里面是一個(gè)多任務(wù)組成的鏈表,圖2有說(shuō)明 queue.offer(bucket) } true } else { // Out of the interval. Put it into the parent timer //任務(wù)的過(guò)期時(shí)間不在本時(shí)間輪周期內(nèi)說(shuō)明需要升級(jí)時(shí)間輪,如果不存在則構(gòu)造上一層時(shí)間輪,繼續(xù)用上一層時(shí)間輪添加任務(wù) if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) }}

          在本層級(jí)時(shí)間輪里添加上一層時(shí)間輪里的過(guò)程,注意的是在下一層時(shí)間輪的interval為上一層時(shí)間輪的tickMs。

          private[this] def addOverflowWheel(): Unit = { synchronized { if (overflowWheel == null) { overflowWheel = new TimingWheel( tickMs = interval, wheelSize = wheelSize, startMs = currentTime, taskCounter = taskCounter, queue ) } }}

          驅(qū)動(dòng)時(shí)間輪滾動(dòng)過(guò)程:

          注意這里會(huì)存在一個(gè)遞歸,一直驅(qū)動(dòng)時(shí)間輪的指針滾動(dòng)直到時(shí)間不足于驅(qū)動(dòng)上層的時(shí)間輪滾動(dòng)。

          def advanceClock(timeMs: Long): Unit = { if (timeMs >= currentTime + tickMs) { //把當(dāng)前時(shí)間打平為時(shí)間輪tickMs的整數(shù)倍 currentTime = timeMs - (timeMs % tickMs) // Try to advance the clock of the overflow wheel if present //驅(qū)動(dòng)上層時(shí)間輪,這里的傳給上層的currentTime時(shí)間是本層時(shí)間輪打平過(guò)的,但是在上層時(shí)間輪還是會(huì)繼續(xù)打平 if (overflowWheel != null) overflowWheel.advanceClock(currentTime) }}

          驅(qū)動(dòng)源:

          //循環(huán)bucket里面的任務(wù)列表,一個(gè)個(gè)重新添加進(jìn)時(shí)間輪,對(duì)符合條件的時(shí)間輪進(jìn)行升降級(jí)或者執(zhí)行任務(wù)private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry) /* * Advances the clock if there is an expired bucket. If there isn't any expired bucket when called, * waits up to timeoutMs before giving up. */def advanceClock(timeoutMs: Long): Boolean = { var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) { writeLock.lock() try { while (bucket != null) { //驅(qū)動(dòng)時(shí)間輪 timingWheel.advanceClock(bucket.getExpiration()) //循環(huán)buckek也就是任務(wù)列表,任務(wù)列表一個(gè)個(gè)繼續(xù)添加進(jìn)時(shí)間輪以此來(lái)升級(jí)或者降級(jí)時(shí)間輪,把過(guò)期任務(wù)找出來(lái)執(zhí)行 bucket.flush(reinsert) //循環(huán) //這里就是從延遲隊(duì)列取出bucket,bucket是有延遲時(shí)間的,取出代表該bucket過(guò)期,我們通過(guò)bucket能取到bucket包含的任務(wù)列表 bucket = delayQueue.poll() } } finally { writeLock.unlock() } true } else { false }}



          3. 

          總結(jié)

          kafka的延遲隊(duì)列使用時(shí)間輪實(shí)現(xiàn),能夠支持大量任務(wù)的高效觸發(fā),但是在kafka延遲隊(duì)列實(shí)現(xiàn)方案里還是看到了delayQueue的影子,使用delayQueue是對(duì)時(shí)間輪里面的bucket放入延遲隊(duì)列,以此來(lái)推動(dòng)時(shí)間輪滾動(dòng),但是基于將插入和刪除操作則放入時(shí)間輪中,將這些操作的時(shí)間復(fù)雜度都降為O(1),提升效率。Kafka對(duì)性能的極致追求讓它把最合適的組件放在最適合的位置。



          本文作者

          ?

          滴滴車險(xiǎn)團(tuán)隊(duì)架構(gòu)師,負(fù)責(zé)車險(xiǎn)核心系統(tǒng)的架構(gòu)和設(shè)計(jì),十年互聯(lián)網(wǎng)研發(fā)架構(gòu)經(jīng)驗(yàn),其中五年中間件與基礎(chǔ)架構(gòu)經(jīng)驗(yàn),對(duì)高并發(fā),高可用以及分布式應(yīng)用的架構(gòu)設(shè)計(jì)有豐富的實(shí)戰(zhàn)經(jīng)驗(yàn),尤其對(duì)分布式消息隊(duì)列,分布式流程編排引擎、分布式數(shù)據(jù)庫(kù)中間件有較深入的研究,熱愛(ài)技術(shù),崇尚開源,是Kafka、RocketMQ、Conductor等多個(gè)知名開源項(xiàng)目的源碼貢獻(xiàn)者。



          團(tuán)隊(duì)招聘

          ?



          滴滴車險(xiǎn)團(tuán)隊(duì)基于滴滴近百萬(wàn)輛車和海量數(shù)據(jù),通過(guò)線上化、科技化、數(shù)據(jù)化的手段,達(dá)到車險(xiǎn)的降賠付、降發(fā)生,降保費(fèi),為乘客、司機(jī)、以及車隊(duì)、合作伙伴提供方便快捷高效的車險(xiǎn)金融服務(wù)。


          團(tuán)隊(duì)長(zhǎng)期招聘java高級(jí)工程師和技術(shù)專家,歡迎有興趣的小伙伴加入,可投遞簡(jiǎn)歷至 [email protected],郵件請(qǐng)郵件主題請(qǐng)命名為「姓名-應(yīng)聘部門-應(yīng)聘方向」。



          掃碼了解更多崗位




          延伸閱讀

          ?

          內(nèi)容編輯 | Charlotte
          聯(lián)系我們 | [email protected]

          瀏覽 60
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  一区二区成人片18 | 91精品国产综合久久久久久久 | 天天日天天插 | 欧美狂操| 狠狠人妻久久久久久综合99浪潮 |