<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          任務(wù)的插入時(shí)間復(fù)雜度優(yōu)化到 O(1),Timing Wheel時(shí)間輪是怎么做到的?

          共 3934字,需瀏覽 8分鐘

           ·

          2020-12-06 23:04

          點(diǎn)擊上方藍(lán)色“程序猿DD”,選擇“設(shè)為星標(biāo)”

          回復(fù)“資源”獲取獨(dú)家整理的學(xué)習(xí)資料!

          作者 |?瘋狂的哈丘

          來源 |?https://www.jianshu.com/p/0f0fec47a0ad

          在kafka中,有許多請(qǐng)求并不是立即返回,而且處理完一些異步操作或者等待某些條件達(dá)成后才返回,這些請(qǐng)求一般都會(huì)帶有timeout參數(shù),表示如果timeout時(shí)間后服務(wù)端還不滿足返回的條件,就判定此次請(qǐng)求為超時(shí),這時(shí)候kafka同樣要返回超時(shí)的響應(yīng)給客戶端,這樣客戶端才知道此次請(qǐng)求超時(shí)了。比如ack=-1的producer請(qǐng)求,就需要等待所有的isr備份完成了才可以返回給客戶端,或者到達(dá)timeout時(shí)間了返回超時(shí)響應(yīng)給客戶端。

          上面的場景,可以用延遲任務(wù)來實(shí)現(xiàn)。也就是定義一個(gè)任務(wù),在timeout時(shí)間后執(zhí)行,執(zhí)行的內(nèi)容一般就是先檢查返回條件是否滿足,滿足的話就返回客戶端需要的響應(yīng),如果還是不滿足,就發(fā)送超時(shí)響應(yīng)給客戶端。

          對(duì)于延遲操作,java自帶的實(shí)現(xiàn)有Timer和ScheduledThreadPoolExecutor。這兩個(gè)的底層數(shù)據(jù)結(jié)構(gòu)都是基于一個(gè)延遲隊(duì)列,在準(zhǔn)備執(zhí)行一個(gè)延遲任務(wù)時(shí),將其插入到延遲隊(duì)列中。這些延遲隊(duì)列其實(shí)就是一個(gè)用最小堆實(shí)現(xiàn)的優(yōu)先級(jí)隊(duì)列,因此,插入一個(gè)任務(wù)的時(shí)間復(fù)雜度是O(logN),取出一個(gè)任務(wù)執(zhí)行后調(diào)整堆的時(shí)間也是O(logN)。

          如果要執(zhí)行的延遲任務(wù)不多,O(logN)的速度已經(jīng)夠快了。但是對(duì)于kafka這樣一個(gè)高吞吐量的系統(tǒng)來說,O(logN)的速度還不夠,為了追求更快的速度,kafka的設(shè)計(jì)者使用了Timing Wheel的數(shù)據(jù)結(jié)構(gòu),讓任務(wù)的插入時(shí)間復(fù)雜度達(dá)到了O(1)。

          Timing Wheel

          上面是時(shí)間輪的一個(gè)結(jié)構(gòu)圖,該時(shí)間輪有8個(gè)槽,當(dāng)前時(shí)間指向0號(hào)槽。

          我們?cè)倏匆幌翶afka里面TimingWheel的數(shù)據(jù)結(jié)構(gòu)

          private[timer]?class?TimingWheel(tickMs:?Long,?wheelSize:?Int,?startMs:?Long,?taskCounter:?AtomicInteger,?queue:?DelayQueue[TimerTaskList])?{

          ??private[this]?val?interval?=?tickMs?*?wheelSize
          ??private[this]?val?buckets?=?Array.tabulate[TimerTaskList](wheelSize)?{?_?=>?new?TimerTaskList(taskCounter)?}

          ??private[this]?var?currentTime?=?startMs?-?(startMs?%?tickMs)?//?rounding?down?to?multiple?of?tickMs
          }

          tickMs:?表示一個(gè)槽所代表的時(shí)間范圍,kafka的默認(rèn)值的1ms

          wheelSize:?表示該時(shí)間輪有多少個(gè)槽,kafka的默認(rèn)值是20

          startMs:?表示該時(shí)間輪的開始時(shí)間

          taskCounter:?表示該時(shí)間輪的任務(wù)總數(shù)

          queue:?是一個(gè)TimerTaskList的延遲隊(duì)列。每個(gè)槽都有它一個(gè)對(duì)應(yīng)的TimerTaskList,TimerTaskList是一個(gè)雙向鏈表,有一個(gè)expireTime的值,這些TimerTaskList都被加到這個(gè)延遲隊(duì)列中,expireTime最小的槽會(huì)排在隊(duì)列的最前面。

          interval:?時(shí)間輪所能表示的時(shí)間跨度,也就是tickMs*wheelSize

          buckets:?表示TimerTaskList的數(shù)組,即各個(gè)槽。

          currentTime:?表示當(dāng)前時(shí)間,也就是時(shí)間輪指針指向的時(shí)間

          運(yùn)行原理

          當(dāng)新增一個(gè)延遲任務(wù)時(shí),通過buckets[expiration / tickMs % wheelSize]先計(jì)算出它應(yīng)該屬于哪個(gè)槽。比如延遲任務(wù)的delayMs=2ms,當(dāng)前時(shí)間currentTime是0ms,則expiration=delayMs+startMs=2ms,通過前面的公式算出它應(yīng)該落于2號(hào)槽。并把任務(wù)封裝成TimerTaskEntry然后加入到TimerTaskList鏈表中。

          之后,kafka會(huì)啟動(dòng)一個(gè)線程,去推動(dòng)時(shí)間輪的指針轉(zhuǎn)動(dòng)。其實(shí)現(xiàn)原理其實(shí)就是通過queue.poll()取出放在最前面的槽的TimerTaskList。由于queue是一個(gè)延遲隊(duì)列,如果隊(duì)列中的expireTime沒有到達(dá),該操作會(huì)阻塞住,直到expireTime到達(dá)。如果通過queue.poll()取到了TimerTaskList,說明該槽里面的任務(wù)時(shí)間都已經(jīng)到達(dá)。這時(shí)候就可以遍歷該TimerTaskList中的任務(wù),然后執(zhí)行對(duì)應(yīng)的操作了。

          針對(duì)上面的例子,就2號(hào)槽有任務(wù),所以當(dāng)取出2號(hào)槽的TimerTaskList后,會(huì)先將currentTime = timeMs - (timeMs % tickMs),其中timeMs也就是該TimerTaskList的expireTime,也就是2Ms。所以,這時(shí)currentTime=2ms,也就是時(shí)間輪指針指向2Ms。

          時(shí)間溢出處理

          在kafka的默認(rèn)實(shí)現(xiàn)中,tickMs=1Ms,wheelSize=20,這就表示該時(shí)間輪所能表示的延遲時(shí)間范圍是0~20Ms,那如果延遲時(shí)間超過20Ms要如何處理呢?Kafka對(duì)時(shí)間輪做了一層改進(jìn),使時(shí)間輪變成層級(jí)的時(shí)間輪。

          一開始,第一層的時(shí)間輪所能表示時(shí)間范圍是0~20Ms之間,假設(shè)現(xiàn)在出現(xiàn)一個(gè)任務(wù)的延遲時(shí)間是200Ms,那么kafka會(huì)再創(chuàng)建一層時(shí)間輪,我們稱之為第二層時(shí)間輪。

          第二層時(shí)間輪的創(chuàng)建代碼如下

          overflowWheel?=?new?TimingWheel(
          ??????????tickMs?=?interval,
          ??????????wheelSize?=?wheelSize,
          ??????????startMs?=?currentTime,
          ??????????taskCounter?=?taskCounter,
          ??????????queue
          )

          也就是第二層時(shí)間輪每一個(gè)槽所能表示的時(shí)間是第一層時(shí)間輪所能表示的時(shí)間范圍,也就是20Ms。槽的數(shù)量還是一樣,其他的屬性也是繼承自第一層時(shí)間輪。這時(shí)第二層時(shí)間輪所能表示的時(shí)間范圍就是0~400Ms了。

          之后通過buckets[expiration / tickMs % wheelSize]算出延遲時(shí)間為200Ms的任務(wù)應(yīng)該位于第二層時(shí)間輪的10號(hào)槽位。

          同理,如果第二層時(shí)間輪的時(shí)間范圍還容納不了新的延遲任務(wù),就會(huì)創(chuàng)建第三層、第四層...

          值得注意的是,只有當(dāng)前時(shí)間輪無法容納目標(biāo)延遲任務(wù)所能表示的時(shí)間時(shí),才需要?jiǎng)?chuàng)建更高一級(jí)的時(shí)間輪,或者說把該任務(wù)加到更高一級(jí)的時(shí)間輪中(如果該時(shí)間輪已創(chuàng)建)。

          一些細(xì)節(jié)

          1. 當(dāng)時(shí)間輪的指針指向1號(hào)槽時(shí),即currentTime=1Ms,說明0號(hào)槽的任務(wù)都已經(jīng)到期了,這時(shí)0號(hào)槽就會(huì)被拿出來復(fù)用,可以容納20~21Ms延遲時(shí)間的任務(wù)。也就是說,如果currentTime=0Ms時(shí)進(jìn)來一個(gè)21Ms的延遲任務(wù),就需要?jiǎng)?chuàng)建更高一級(jí)的時(shí)間輪,但是如果currentTime=1Ms時(shí)進(jìn)來一個(gè)21Ms的延遲任務(wù),就可以直接把它放到0號(hào)槽中,當(dāng)currentTime=21時(shí),指針又指向0號(hào)槽
          2. 細(xì)心的同學(xué)可能發(fā)現(xiàn),第一層的0號(hào)槽所能表示的任務(wù)延遲時(shí)間范圍是01Ms,對(duì)應(yīng)的TimerTaskList的expireTime是0Ms。第二層的0號(hào)槽鎖能表示的任務(wù)延遲時(shí)間范圍是020Ms,對(duì)應(yīng)的TimerTaskList的expireTime也是0Ms。他們的TimerTaskList又都是放在一個(gè)延遲隊(duì)列中。這時(shí)候執(zhí)行queue.poll()會(huì)把這兩個(gè)TimerTaskList都取出來,然后遍歷鏈表的時(shí)候還會(huì)判斷該任務(wù)是否達(dá)到執(zhí)行時(shí)間了,如果沒有的話,這些任務(wù)還會(huì)被塞回時(shí)間輪中。這時(shí)由于第一層指針的轉(zhuǎn)動(dòng),原先處于第二層時(shí)間輪中的任務(wù)可能會(huì)重新落到第一層時(shí)間輪上面。

          源碼解析

          添加新的延遲任務(wù)

          //SystemTimer.scala??
          private?def?addTimerTaskEntry(timerTaskEntry:?TimerTaskEntry):?Unit?=?{
          ????if?(!timingWheel.add(timerTaskEntry))?{
          ??????//?Already?expired?or?cancelled
          ??????if?(!timerTaskEntry.cancelled)
          ????????taskExecutor.submit(timerTaskEntry.timerTask)
          ????}
          ??}

          往時(shí)間輪添加新的任務(wù)

          //TimingWheel
          def?add(timerTaskEntry:?TimerTaskEntry):?Boolean?=?{
          ????//獲取任務(wù)的延遲時(shí)間
          ????val?expiration?=?timerTaskEntry.expirationMs
          ????//先判斷任務(wù)是否已經(jīng)完成
          ????if?(timerTaskEntry.cancelled)?{
          ??????false
          ??????//如果任務(wù)已經(jīng)到期
          ????}?else?if?(expiration???????false
          ??????//判斷當(dāng)前時(shí)間輪所能表示的時(shí)間范圍是否可以容納該任務(wù)
          ????}?else?if?(expiration???????//?根據(jù)任務(wù)的延遲時(shí)間算出應(yīng)該位于哪個(gè)槽
          ??????val?virtualId?=?expiration?/?tickMs
          ??????val?bucket?=?buckets((virtualId?%?wheelSize.toLong).toInt)
          ??????bucket.add(timerTaskEntry)

          ??????//?設(shè)置TimerTaskList的expireTime
          ??????if?(bucket.setExpiration(virtualId?*?tickMs))?{
          ????????//把TimerTaskList加入到延遲隊(duì)列
          ????????queue.offer(bucket)
          ??????}
          ??????true
          ????}?else?{
          ??????//如果時(shí)間超出當(dāng)前所能表示的最大范圍,則創(chuàng)建新的時(shí)間輪,并把任務(wù)添加到那個(gè)時(shí)間輪上面
          ??????if?(overflowWheel?==?null)?addOverflowWheel()
          ??????overflowWheel.add(timerTaskEntry)
          ????}
          ??}
          ??private[this]?def?addOverflowWheel():?Unit?=?{
          ????synchronized?{
          ??????if?(overflowWheel?==?null)?{
          ????????overflowWheel?=?new?TimingWheel(
          ??????????tickMs?=?interval,
          ??????????wheelSize?=?wheelSize,
          ??????????startMs?=?currentTime,
          ??????????taskCounter?=?taskCounter,
          ??????????queue
          ????????)
          ??????}
          ????}
          ??}

          從上面的代碼可以看出,對(duì)于當(dāng)前時(shí)間輪是否可以容納目標(biāo)任務(wù),是通過expiration < currentTime + interval來計(jì)算的,也就是根據(jù)時(shí)間輪的指針往后推interval時(shí)間就是時(shí)間輪所能表示的時(shí)間范圍。

          時(shí)間輪指針的推進(jìn)

          ?//SystemTimer.scala?
          def?advanceClock(timeoutMs:?Long):?Boolean?=?{
          ??????//從延遲隊(duì)列中取出最近的一個(gè)槽,如果槽的expireTime沒到,此操作會(huì)阻塞timeoutMs
          ????var?bucket?=?delayQueue.poll(timeoutMs,?TimeUnit.MILLISECONDS)
          ????if?(bucket?!=?null)?{
          ??????writeLock.lock()
          ??????try?{
          ????????while?(bucket?!=?null)?{
          ????????????//推進(jìn)時(shí)間輪的指針
          ??????????timingWheel.advanceClock(bucket.getExpiration())
          ????????????//把TimerTaskList的任務(wù)都取出來重新add一遍,add的時(shí)候會(huì)檢查任務(wù)是否已經(jīng)到期
          ??????????bucket.flush(reinsert)
          ??????????bucket?=?delayQueue.poll()
          ????????}
          ??????}?finally?{
          ????????writeLock.unlock()
          ??????}
          ??????true
          ????}?else?{
          ??????false
          ????}
          ??}
          //TimingWheel
          def?advanceClock(timeMs:?Long):?Unit?=?{
          ????if?(timeMs?>=?currentTime?+?tickMs)?{
          ????????//推進(jìn)時(shí)間輪的指針
          ??????currentTime?=?timeMs?-?(timeMs?%?tickMs)

          ??????//?推進(jìn)上層時(shí)間輪的指針
          ??????if?(overflowWheel?!=?null)?overflowWheel.advanceClock(currentTime)
          ????}
          ??}

          總結(jié)

          相比于常用的DelayQueue的時(shí)間復(fù)雜度O(logN),TimingWheel的數(shù)據(jù)結(jié)構(gòu)在插入任務(wù)時(shí)只要O(1),獲取到達(dá)任務(wù)的時(shí)間復(fù)雜度也遠(yuǎn)低于O(logN)。另外,kafka的TimingWheel在插入任務(wù)之前還會(huì)先檢查任務(wù)是否完成,對(duì)于那些在任務(wù)超時(shí)直接就完成指定操作的場景,TimingWheel的表現(xiàn)更加優(yōu)秀。

          DD自研的滬牌代拍業(yè)務(wù),點(diǎn)擊直達(dá)


          【往期推薦】

          Kubernetes 最佳安全實(shí)踐指南

          2020-12-01

          關(guān)于零拷貝的一點(diǎn)認(rèn)識(shí)

          2020-12-01

          高可用解決方案:同城雙活?異地雙活?異地多活?怎么實(shí)現(xiàn)?

          2020-11-30

          知乎熱議:計(jì)算機(jī)專業(yè)錢景究竟如何?

          2020-11-29

          VS Code有哪些奇技淫巧?

          2020-11-29

          API網(wǎng)關(guān)是否真的起到了它該有的作用?

          2020-11-28



          掃一掃,關(guān)注我

          一起學(xué)習(xí),一起進(jìn)步

          每周贈(zèng)書,福利不斷

          深度內(nèi)容

          推薦加入





          瀏覽 42
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  殴美日韩中文在线中 | 爱啪啪导航| 網站日逼 | 青青草免费AV | 狠狠操在线 |