RocketMQ源碼解析之消息生產(chǎn)者(異步發(fā)送)
1.前言
我們在《RocketMQ源碼解析之消息生產(chǎn)者(同步發(fā)送與單向發(fā)送)》一文中解析了RocketMQ生產(chǎn)者同步發(fā)送消息與單向發(fā)送消息,這個(gè)異步發(fā)送消息其實(shí)很多地方與同步發(fā)送一樣,不過有一點(diǎn)是你在編程的時(shí)候需要提供SendCallback 對象,用來發(fā)送響應(yīng)來的時(shí)候進(jìn)行回調(diào)使用,我們知道同步發(fā)送是等待broker響應(yīng)到來,然后將響應(yīng)往上返回,這個(gè)異步調(diào)用就是響應(yīng)來的時(shí)候,對你提供的回調(diào)對象進(jìn)行調(diào)用,你這個(gè)回調(diào)對象可以寫一些自己的邏輯等等。
2.源碼解析
在源碼解析之前我們要先看一下異步發(fā)送消息是怎樣編程的

這里需要我們提供一個(gè)SendCallback 對象用來響應(yīng)來的時(shí)候回調(diào),其中異常的時(shí)候會調(diào)用這里面的onException方法, 成功的時(shí)候調(diào)用onSuccess方法,執(zhí)行相應(yīng)的邏輯。
好了,現(xiàn)在開始源碼解析:

首先是調(diào)用了defaultMQProducerImpl 的send方法,并且將msg 與callback傳進(jìn)去

這里又調(diào)用了下它的重載方法,然后將發(fā)送超時(shí)時(shí)間傳進(jìn)去,默認(rèn)的一個(gè)發(fā)送超時(shí)時(shí)間是3s。

這里需要注意的是,它將這個(gè)發(fā)送消息的任務(wù)交給了一個(gè)異步發(fā)送線程池,然后在任務(wù)中是調(diào)用了sendDefaultImpl 方法,然后通信方式是異步CommunicationMode.ASYNC,這里我們需要看下這個(gè)線程池的一些參數(shù),因?yàn)殛P(guān)乎我們以后的調(diào)優(yōu)

這里先是判斷asyncSenderExecutor 這個(gè)線程池是不是null,其實(shí)咱們這里它是null的(因?yàn)槲覀儧]有指定線程池,不過你編程的時(shí)候是可以指定的,使用setAsyncSenderExecutor()這個(gè)方法就可以設(shè)置了),就使用
defaultAsyncSenderExecutor線程池,這個(gè)defaultAsyncSenderExecutor 線程池是在defaultMQProducerImpl 類構(gòu)造方法創(chuàng)建的的,我們可以看下

隊(duì)列是5w大小,然后核心線程數(shù) 是cpu的核心數(shù),maxThreads也是cpu核心數(shù)。
好了,我們繼續(xù)往下看,這個(gè)sendDefaultImpl 方法其實(shí)就是選擇MessageQueue然后重試那一套東西,不過,異步發(fā)送雖然走這個(gè)方法,但是它的失敗重試不是這樣子玩的,我們接著往下看接著又調(diào)用了sendKernelImpl 方法

這里我們主要是看下異步處理這一塊,因?yàn)槲覀冊诮榻B同步發(fā)送與單向發(fā)送都有介紹過這個(gè)方法,其實(shí)這個(gè)方法就是封裝請求頭啥的,異步這一塊我們可以看到,先是判斷了一下超時(shí)沒有,然后調(diào)用MQClientAPI的sendMessage方法,注意下它這個(gè)倒數(shù)第三個(gè)參數(shù),這個(gè)參數(shù)是獲取了一下默認(rèn)的重試次數(shù),默認(rèn)是2。

這個(gè)方法我們也是主要看下這個(gè)異步發(fā)送這塊,首先是定義了一個(gè)times,這個(gè)times記錄的發(fā)送次數(shù),然后判斷了是否超時(shí),然后調(diào)用sendMessageAsync這個(gè)異步發(fā)送方法。這個(gè)方法又調(diào)用了RemotingClient的invokeAsync 方法,其中在sendMessageAsync 方法中創(chuàng)建了一個(gè)InvokeCallback 對象,我們先不管這個(gè)InvokeCallback ,后面再解釋,先看下invokeAsync 方法:

其實(shí)套路都一樣,先是根據(jù)broker addr 獲取對應(yīng)的channel,然后判斷一下channel狀態(tài),然后執(zhí)行調(diào)用前的鉤子,判斷有沒有超時(shí),調(diào)用
invokeAsyncImpl方法進(jìn)行發(fā)送。

這個(gè)方法重要的操作就這幾個(gè),首先生成一個(gè)調(diào)用id ,也就是opaque , 接著獲取信號量許可,這個(gè)信號量使用來限流的,默認(rèn)是65535,獲取之后判斷一下有沒有超時(shí),然后封裝ResponseFuture 對象,將ResponseFuture 對象緩存到response表中。
接著將消息寫到channel中,注意有個(gè)listener 是在發(fā)送出去的時(shí)候執(zhí)行,成功的話將ResponseFuture 對象設(shè)置發(fā)送成功,失敗的走了requestFail(opaque)方法,失敗我們先不看。這個(gè)時(shí)候就送成功了,等到收到broker響應(yīng)的時(shí)候,NettyClientHandler 就能收到消息了

這個(gè)是netty的知識點(diǎn),是將NettyClientHandler 對象注冊到netty的pipeline上面,在發(fā)送內(nèi)容,接收內(nèi)容,都會執(zhí)行響應(yīng)的實(shí)現(xiàn)方法。

我們這里是收到的響應(yīng)消息,然后調(diào)用processResponseCommand 處理

這里就是根據(jù)opaque去responseTable這個(gè)緩存中找到對應(yīng)的ResponseFuture 對象,然后設(shè)置響應(yīng)內(nèi)容,最最最重要的點(diǎn)就是看一下它的invokeCallBack有沒有,我們發(fā)送消息的時(shí)候是有設(shè)置進(jìn)去的。它會調(diào)用executeInvokeCallback 方法執(zhí)行

這里就是獲取執(zhí)行回調(diào)的線程池,如果線程池是null的話,就在當(dāng)前線程執(zhí)行。這個(gè)回調(diào)線程池參數(shù)我們也看下

它是拿的這個(gè)線程是,默認(rèn)核心線程數(shù)也是cpu核心數(shù)。
接著就是調(diào)用了ResponseFuture 的executeInvokeCallback 方法

設(shè)置回調(diào)狀態(tài),然后調(diào)用invokeCallback的operationComplete 方法,現(xiàn)在我們再回到MQClientAPI的sendMessageAsync 方法中,因?yàn)楫?dāng)時(shí)是在這個(gè)方法中創(chuàng)建的這個(gè) invokeCallback 對象

這個(gè)方法分為2部分吧,一是有響應(yīng),然后沒有sendCallback ,這個(gè)sendCallback 是你自己寫的那個(gè)回調(diào)對象,這個(gè)時(shí)候沒有的話說明你不準(zhǔn)備回調(diào)了,然后解析了一下結(jié)果,執(zhí)行了一下 調(diào)用后的鉤子,這部分就算完事了,二是有響應(yīng),然后也是有這個(gè)回調(diào)對象sendCallback的,先是解析了下響應(yīng),然后執(zhí)行了你寫的那個(gè)sendCallback 對象,另外就是執(zhí)行了updateFaultItem ,進(jìn)行更新一個(gè)響應(yīng)信息,如果你看我的《RocketMQ源碼解析之消息生產(chǎn)者(容錯(cuò))》這篇文章就能知道這個(gè)方法是干嘛的了。
如果異常的話,執(zhí)行了一個(gè)onExceptionImpl 方法來處理,我們來看下這個(gè)方法的實(shí)現(xiàn)

增加調(diào)用次數(shù),然后判斷是否需要重試&& 重試次數(shù)在范圍內(nèi),然后就是重新選擇一個(gè)MessageQueue,重新設(shè)置請求id,也就是opaque這個(gè),最后就是調(diào)用 sendMessageAsync 進(jìn)行發(fā)送了,這就是異步調(diào)用的一個(gè)重試邏輯,并沒有使用for循環(huán)的形式。
好了,到這我們的異步發(fā)送解析就已經(jīng)ok了
