時(shí)間輪在Kafka的實(shí)踐

桔妹導(dǎo)讀:時(shí)間輪是一個(gè)應(yīng)用場(chǎng)景很廣的組件,在很多高性能中間件中都有它的身影,如Netty、Quartz、Akka,當(dāng)然也包括Kafka,本文主要介紹時(shí)間輪在kafka的應(yīng)用和實(shí)戰(zhàn),從核心源碼和設(shè)計(jì)的角度對(duì)時(shí)間輪進(jìn)行深入的講解 。
1.


30s超時(shí),就創(chuàng)建一個(gè)index從0到30的環(huán)形隊(duì)列(本質(zhì)是個(gè)數(shù)組) 環(huán)上每一個(gè)slot是一個(gè)Set<uid>,任務(wù)集合 同時(shí)還有一個(gè)Map<uid, index>,記錄uid落在環(huán)上的哪個(gè)slot里
從Map結(jié)構(gòu)中,查找出這個(gè)uid存儲(chǔ)在哪一個(gè)slot里 從這個(gè)slot的Set結(jié)構(gòu)中,刪除這個(gè)uid 將uid重新加入到新的slot中,具體是哪一個(gè)slot呢 => Current Index指針?biāo)赶虻?/span>上一個(gè)slot,因?yàn)檫@個(gè)slot,會(huì)被timer在30s之后掃描到 更新Map,這個(gè)uid對(duì)應(yīng)slot的index值
2.

圖二tickMs:時(shí)間輪由多個(gè)時(shí)間格組成,每個(gè)時(shí)間格就是tickMs,它代表當(dāng)前時(shí)間輪的基本時(shí)間跨度。 wheelSize:代表每一層時(shí)間輪的格數(shù) interval:當(dāng)前時(shí)間輪的總體時(shí)間跨度,interval=tickMs × wheelSize startMs:構(gòu)造當(dāng)層時(shí)間輪時(shí)候的當(dāng)前時(shí)間,第一層的時(shí)間輪的startMs是TimeUnit.NANOSECONDS.toMillis(nanoseconds()),上層時(shí)間輪的startMs為下層時(shí)間輪的currentTime。 currentTime:表示時(shí)間輪當(dāng)前所處的時(shí)間,currentTime是tickMs的整數(shù)倍(通過(guò)currentTime=startMs - (startMs % tickMs來(lái)保正currentTime一定是tickMs的整數(shù)倍),這個(gè)運(yùn)算類比鐘表中分鐘里65秒分鐘指針指向的還是1分鐘)。currentTime可以將整個(gè)時(shí)間輪劃分為到期部分和未到期部分,currentTime當(dāng)前指向的時(shí)間格也屬于到期部分,表示剛好到期,需要處理此時(shí)間格所對(duì)應(yīng)的TimerTaskList的所有任務(wù)。

剛才提到的350ms的任務(wù),不會(huì)插入到第一層時(shí)間輪,會(huì)插入到interval=20*20的第二層時(shí)間輪中,具體插入到時(shí)間輪的哪個(gè)bucket呢?先用350/tickMs(20)=virtualId(17),然后virtualId(17) %wheelSize (20) = 17,所以350會(huì)放在第17個(gè)bucket。如果此時(shí)有一個(gè)450ms后執(zhí)行的任務(wù),那么會(huì)放在第三層時(shí)間輪中,按照剛才的計(jì)算公式,會(huì)放在第0個(gè)bucket。第0個(gè)bucket里會(huì)包含[400,800)ms的任務(wù)。隨著時(shí)間流逝,當(dāng)時(shí)間過(guò)去了400ms,那么450ms后就要執(zhí)行的任務(wù)還剩下50ms的時(shí)間才能執(zhí)行,此時(shí)有一個(gè)時(shí)間輪降級(jí)的操作,將50ms任務(wù)重新提交到層級(jí)時(shí)間輪中,那么此時(shí)50ms的任務(wù)根據(jù)公式會(huì)放入第二個(gè)時(shí)間輪的第2個(gè)bucket中,此bucket的時(shí)間范圍為[40,60)ms,然后再經(jīng)過(guò)40ms,這個(gè)50ms的任務(wù)又會(huì)被監(jiān)控到,此時(shí)距離任務(wù)執(zhí)行還有10ms,同樣將10ms的任務(wù)提交到層級(jí)時(shí)間輪,此時(shí)會(huì)加入到第一層時(shí)間輪的第10個(gè)bucket,所以再經(jīng)過(guò)10ms后,此任務(wù)到期,最終執(zhí)行。

//在Systemtimer中添加一個(gè)任務(wù),任務(wù)被包裝為一個(gè)TimerTaskEntryprivate def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {//先判斷是否可以添加進(jìn)時(shí)間輪中,如果不可以添加進(jìn)去代表任務(wù)已經(jīng)過(guò)期或者任務(wù)被取消,注意這里的timingWheel持有上一層時(shí)間輪的引用,所以可能存在遞歸調(diào)用if (!timingWheel.add(timerTaskEntry)) {// Already expired or cancelledif (!timerTaskEntry.cancelled)//過(guò)期任務(wù)直接線程池異步執(zhí)行掉taskExecutor.submit(timerTaskEntry.timerTask)}}timingWheel添加任務(wù),遞歸添加直到添加該任務(wù)進(jìn)合適的時(shí)間輪的bucket中def add(timerTaskEntry: TimerTaskEntry): Boolean = {val expiration = timerTaskEntry.expirationMs//任務(wù)取消if (timerTaskEntry.cancelled) {// Cancelledfalse} else if (expiration < currentTime + tickMs) {// 任務(wù)過(guò)期后會(huì)被執(zhí)行false} else if (expiration < currentTime + interval) {//任務(wù)過(guò)期時(shí)間比當(dāng)前時(shí)間輪時(shí)間加周期小說(shuō)明任務(wù)過(guò)期時(shí)間在本時(shí)間輪周期內(nèi)val virtualId = expiration / tickMs//找到任務(wù)對(duì)應(yīng)本時(shí)間輪的bucketval bucket = buckets((virtualId % wheelSize.toLong).toInt)bucket.add(timerTaskEntry)// Set the bucket expiration time//只有本bucket內(nèi)的任務(wù)都過(guò)期后才會(huì)bucket.setExpiration返回true此時(shí)將bucket放入延遲隊(duì)列if (bucket.setExpiration(virtualId * tickMs)) {//bucket是一個(gè)TimerTaskList,它實(shí)現(xiàn)了java.util.concurrent.Delayed接口,里面是一個(gè)多任務(wù)組成的鏈表,圖2有說(shuō)明queue.offer(bucket)}true} else {// Out of the interval. Put it into the parent timer//任務(wù)的過(guò)期時(shí)間不在本時(shí)間輪周期內(nèi)說(shuō)明需要升級(jí)時(shí)間輪,如果不存在則構(gòu)造上一層時(shí)間輪,繼續(xù)用上一層時(shí)間輪添加任務(wù)if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)}}
private[this] def addOverflowWheel(): Unit = {synchronized {if (overflowWheel == null) {overflowWheel = new TimingWheel(tickMs = interval,wheelSize = wheelSize,startMs = currentTime,taskCounter = taskCounter,queue)}}}
def advanceClock(timeMs: Long): Unit = {if (timeMs >= currentTime + tickMs) {//把當(dāng)前時(shí)間打平為時(shí)間輪tickMs的整數(shù)倍currentTime = timeMs - (timeMs % tickMs)// Try to advance the clock of the overflow wheel if present//驅(qū)動(dòng)上層時(shí)間輪,這里的傳給上層的currentTime時(shí)間是本層時(shí)間輪打平過(guò)的,但是在上層時(shí)間輪還是會(huì)繼續(xù)打平if (overflowWheel != null) overflowWheel.advanceClock(currentTime)}}
//循環(huán)bucket里面的任務(wù)列表,一個(gè)個(gè)重新添加進(jìn)時(shí)間輪,對(duì)符合條件的時(shí)間輪進(jìn)行升降級(jí)或者執(zhí)行任務(wù)private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)/** Advances the clock if there is an expired bucket. If there isn't any expired bucket when called,* waits up to timeoutMs before giving up.*/def advanceClock(timeoutMs: Long): Boolean = {var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)if (bucket != null) {writeLock.lock()try {while (bucket != null) {//驅(qū)動(dòng)時(shí)間輪timingWheel.advanceClock(bucket.getExpiration())//循環(huán)buckek也就是任務(wù)列表,任務(wù)列表一個(gè)個(gè)繼續(xù)添加進(jìn)時(shí)間輪以此來(lái)升級(jí)或者降級(jí)時(shí)間輪,把過(guò)期任務(wù)找出來(lái)執(zhí)行bucket.flush(reinsert)//循環(huán)//這里就是從延遲隊(duì)列取出bucket,bucket是有延遲時(shí)間的,取出代表該bucket過(guò)期,我們通過(guò)bucket能取到bucket包含的任務(wù)列表bucket = delayQueue.poll()}} finally {writeLock.unlock()}true} else {false}}
3.

kafka的延遲隊(duì)列使用時(shí)間輪實(shí)現(xiàn),能夠支持大量任務(wù)的高效觸發(fā),但是在kafka延遲隊(duì)列實(shí)現(xiàn)方案里還是看到了delayQueue的影子,使用delayQueue是對(duì)時(shí)間輪里面的bucket放入延遲隊(duì)列,以此來(lái)推動(dòng)時(shí)間輪滾動(dòng),但是基于將插入和刪除操作則放入時(shí)間輪中,將這些操作的時(shí)間復(fù)雜度都降為O(1),提升效率。Kafka對(duì)性能的極致追求讓它把最合適的組件放在最適合的位置。
?

滴滴車險(xiǎn)團(tuán)隊(duì)架構(gòu)師,負(fù)責(zé)車險(xiǎn)核心系統(tǒng)的架構(gòu)和設(shè)計(jì),十年互聯(lián)網(wǎng)研發(fā)架構(gòu)經(jīng)驗(yàn),其中五年中間件與基礎(chǔ)架構(gòu)經(jīng)驗(yàn),對(duì)高并發(fā),高可用以及分布式應(yīng)用的架構(gòu)設(shè)計(jì)有豐富的實(shí)戰(zhàn)經(jīng)驗(yàn),尤其對(duì)分布式消息隊(duì)列,分布式流程編排引擎、分布式數(shù)據(jù)庫(kù)中間件有較深入的研究,熱愛(ài)技術(shù),崇尚開源,是Kafka、RocketMQ、Conductor等多個(gè)知名開源項(xiàng)目的源碼貢獻(xiàn)者。

團(tuán)隊(duì)招聘
?
滴滴車險(xiǎn)團(tuán)隊(duì)基于滴滴近百萬(wàn)輛車和海量數(shù)據(jù),通過(guò)線上化、科技化、數(shù)據(jù)化的手段,達(dá)到車險(xiǎn)的降賠付、降發(fā)生,降保費(fèi),為乘客、司機(jī)、以及車隊(duì)、合作伙伴提供方便快捷高效的車險(xiǎn)金融服務(wù)。
團(tuán)隊(duì)長(zhǎng)期招聘java高級(jí)工程師和技術(shù)專家,歡迎有興趣的小伙伴加入,可投遞簡(jiǎn)歷至 [email protected],郵件請(qǐng)郵件主題請(qǐng)命名為「姓名-應(yīng)聘部門-應(yīng)聘方向」。
掃碼了解更多崗位
延伸閱讀




