RocketMQ這樣做,壓測(cè)后性能提高30%
從官方這邊獲悉,RocketMQ在4.9.1版本中對(duì)消息發(fā)送進(jìn)行了大量的優(yōu)化,性能提升十分顯著,接下來(lái)請(qǐng)跟著我一起來(lái)欣賞大神們的杰作。

對(duì)WaitNotifyObject的鎖進(jìn)行優(yōu)化(item2) 移除HAService中的鎖(item3) 移除GroupCommitService中的鎖(item4) 消除HA中不必要的數(shù)組拷貝(item5) 調(diào)整消息發(fā)送幾個(gè)參數(shù)的默認(rèn)值(item7) sendMessageThreadPoolNums useReentrantLockWhenPutMessage flushCommitLogTimed endTransactionThreadPoolNums 減少瑣的作用范圍(item8-12)
移除不必要的鎖 降低鎖粒度(范圍) 修改消息發(fā)送相關(guān)參數(shù)
?
1、移除不必要的鎖

溫馨提示:在RocketMQ4.7版本開始對(duì)消息發(fā)送進(jìn)行了優(yōu)化,同步消息發(fā)送模型引入了jdk的CompletableFuture實(shí)現(xiàn)消息的異步發(fā)送。
消息發(fā)送線程調(diào)用Commitlog的aysncPutMessage方法寫入消息。 Commitlog調(diào)用submitReplicaRequest方法,將任務(wù)提交到GroupTransferService中,并獲取一個(gè)Future,實(shí)現(xiàn)異步編程。值得注意的是這里需要等待,待數(shù)據(jù)成功寫入從節(jié)點(diǎn)(內(nèi)部基于CompletableFuture機(jī)制的內(nèi)部線程池ForkJoin)。 GroupTransferService中對(duì)提交的任務(wù)依次進(jìn)行判斷,判斷對(duì)應(yīng)的請(qǐng)求是否已同步到從節(jié)點(diǎn)。 如果已經(jīng)復(fù)制到從節(jié)點(diǎn),則通過(guò)Future喚醒,并將結(jié)果返回給消息發(fā)送端。

為了更加方便大家理解接下來(lái)的優(yōu)化點(diǎn),首先再總結(jié)提煉一下GroupTransferService的設(shè)計(jì)理念:
首先引入兩個(gè)List結(jié)合,分別命名為讀、寫鏈表。 外部調(diào)用GroupTransferService的putRequest請(qǐng)求,將存儲(chǔ)在寫鏈表中(requestWrite)。 GroupTransferService的run方法從requestRead鏈表中獲取任務(wù),判斷這些任務(wù)對(duì)應(yīng)的請(qǐng)求的數(shù)據(jù)是否成功寫入到從節(jié)點(diǎn)。 每當(dāng)requestRead中沒(méi)有數(shù)據(jù)可讀時(shí),兩個(gè)隊(duì)列進(jìn)行交互,從而實(shí)現(xiàn)讀寫分離,降低鎖競(jìng)爭(zhēng)。
更改putRequest的鎖類型,用自旋鎖替換synchronized 去除doWaitTransfer方法中多余的鎖
1.1 使用自旋鎖替換synchronized


1.2 去除多余的鎖

從這個(gè)角度來(lái)看,其實(shí)主要是將鎖的類型由synchronized替換為更加輕量的自旋鎖。?
2、降低鎖的范圍

新版本采用函數(shù)式編程的思路,只是定義來(lái)獲取msgId的方法,在進(jìn)行消息寫入時(shí)并不會(huì)執(zhí)行,降低鎖的粒度,使得offsetMsgId的生成并行化,其編程手段之巧妙,值得我們學(xué)習(xí)。?
3、調(diào)整消息發(fā)送相關(guān)的參數(shù)
sendMessageThreadPoolNums Broker端消息發(fā)送端線程池?cái)?shù)量,該值在4.9.0版本之前默認(rèn)為1,新版本調(diào)整為操作系統(tǒng)的CPU核數(shù),并且不小于4。該參數(shù)的調(diào)整有利有弊。提高了消息發(fā)送的并發(fā)度,但同時(shí)會(huì)導(dǎo)致消息順序的亂序,其示例圖如下同步發(fā)送下不會(huì)有順序問(wèn)題,可放心修改 
在順序消費(fèi)場(chǎng)景,該參數(shù)不建議修改。在實(shí)際過(guò)程中應(yīng)該對(duì)RocketMQ集群進(jìn)行治理,順序消費(fèi)的場(chǎng)景使用專門集群。 useReentrantLockWhenPutMessage MQ消息寫入時(shí)對(duì)內(nèi)存加鎖使用的鎖類型,低版本之前默認(rèn)為false,表示默認(rèn)使用自旋鎖;新版本使用ReentrantLock。自旋主要的優(yōu)勢(shì)是沒(méi)有線程切換成本,但自旋容易造成CPU的浪費(fèi),內(nèi)存寫入大部分情況下是很快,但RocketMQ比較依賴頁(yè)緩存,如果出現(xiàn)也緩存抖動(dòng),帶來(lái)的CPU浪費(fèi)是非常不值得,在sendMessageThreadPoolNums設(shè)置超過(guò)1之后,鎖的類型使用ReentrantLock更加穩(wěn)定。 flushCommitLogTimed 首先我們通過(guò)觀察源碼了解一下該參數(shù)的含義: 
其主要作用是控制刷盤線程阻塞等待的方式,低版本flushCommitLogTimed為false,默認(rèn)使用CountDownLatch,而高版本則直接使用Thread.sleep。猜想的原因是刷盤線程比較獨(dú)立,無(wú)需與其他線程進(jìn)行直接的交互協(xié)作,故無(wú)需使用CountDownLatch這種專門用來(lái)線程協(xié)作的“外來(lái)和尚”。 endTransactionThreadPoolNums 主要用于設(shè)置事務(wù)消息線程池的大小。 
新版本主要是可通過(guò)調(diào)整發(fā)送線程池來(lái)動(dòng)態(tài)調(diào)節(jié)事務(wù)消息的值,這個(gè)大家可以根據(jù)壓測(cè)結(jié)果動(dòng)態(tài)調(diào)整。
有道無(wú)術(shù),術(shù)可成;有術(shù)無(wú)道,止于術(shù)
歡迎大家關(guān)注Java之道公眾號(hào)
好文章,我在看??
評(píng)論
圖片
表情
