<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          【源碼】RocketMQ如何實(shí)現(xiàn)獲取指定消息

          共 16110字,需瀏覽 33分鐘

           ·

          2020-08-19 06:30

          點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

          優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

          ? 作者?|??貓毛·波拿巴

          來(lái)源 |? ?urlify.cn/YfuQ32

          66套java從入門到精通實(shí)戰(zhàn)課程分享

          概要

          消息查詢是什么?

          消息查詢就是根據(jù)用戶提供的msgId從MQ中取出該消息

          RocketMQ如果有多個(gè)節(jié)點(diǎn)如何查詢?

          問(wèn)題:RocketMQ分布式結(jié)構(gòu)中,數(shù)據(jù)分散在各個(gè)節(jié)點(diǎn),即便是同一Topic的數(shù)據(jù),也未必都在一個(gè)broker上。客戶端怎么知道數(shù)據(jù)該去哪個(gè)節(jié)點(diǎn)上查?

          猜想1:逐個(gè)訪問(wèn)broker節(jié)點(diǎn)查詢數(shù)據(jù)

          猜想2:有某種數(shù)據(jù)中心存在,該中心知道所有消息存儲(chǔ)的位置,只要向該中心查詢即可得到消息具體位置,進(jìn)而取得消息內(nèi)容

          實(shí)際:

          1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。

          2.客戶端實(shí)現(xiàn)會(huì)從msgId字符串中解析出broker地址,向指定broker節(jié)查詢消息。

          問(wèn)題:CommitLog文件有多個(gè),只有偏移量估計(jì)不能確定在哪個(gè)文件吧?

          實(shí)際:?jiǎn)蝹€(gè)Broker節(jié)點(diǎn)內(nèi)offset是全局唯一的,不是每個(gè)CommitLog文件的偏移量都是從0開(kāi)始的。單個(gè)節(jié)點(diǎn)內(nèi)所有CommitLog文件共用一套偏移量,每個(gè)文件的文件名為其第一個(gè)消息的偏移量。所以可以根據(jù)偏移量和文件名確定CommitLog文件。

          源碼閱讀

          0.使用方式

          MessageExt? msg = consumer.viewMessage(msgId);

          1.消息ID解析

          這個(gè)了解下就可以了


          public?class?MessageId?{
          ????private?SocketAddress address;
          ????private?long?offset;

          ????public?MessageId(SocketAddress address, long?offset)?{
          ????????this.address = address;
          ????????this.offset = offset;
          ????}

          ????//get-set
          }

          //from MQAdminImpl.java
          public?MessageExt viewMessage(
          ????String msgId)
          ?throws?RemotingException, MQBrokerException, InterruptedException, MQClientException
          {

          ????MessageId messageId = null;
          ????try?{
          ????????//從msgId字符串中解析出address和offset
          ????????//address = ip:port
          ????????//offset為消息在CommitLog文件中的偏移量
          ????????messageId = MessageDecoder.decodeMessageId(msgId);
          ????} catch?(Exception e) {
          ????????throw?new?MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
          ????}
          ????return?this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
          ????????messageId.getOffset(), timeoutMillis);
          }

          //from MessageDecoder.java
          public?static?MessageId decodeMessageId(final?String msgId)?throws?UnknownHostException {
          ????SocketAddress address;
          ????long?offset;
          ????//ipv4和ipv6的區(qū)別
          ????//如果msgId總長(zhǎng)度超過(guò)32字符,則為ipv6
          ????int?ipLength = msgId.length() == 32?? 4?* 2?: 16?* 2;

          ????byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
          ????byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
          ????ByteBuffer bb = ByteBuffer.wrap(port);
          ????int?portInt = bb.getInt(0);
          ????address = new?InetSocketAddress(InetAddress.getByAddress(ip), portInt);

          ????// offset
          ????byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8?+ 16));
          ????bb = ByteBuffer.wrap(data);
          ????offset = bb.getLong(0);

          ????return?new?MessageId(address, offset);
          }


          2.長(zhǎng)連接客戶端RPC實(shí)現(xiàn)

          要發(fā)請(qǐng)求首先得先建立連接,這里方法可以看到創(chuàng)建連接相關(guān)的操作。值得注意的是,第一次訪問(wèn)的時(shí)候可能連接還沒(méi)建立,建立連接需要消耗一段時(shí)間。代碼中對(duì)這個(gè)時(shí)間也做了判斷,如果連接建立完成后,發(fā)現(xiàn)已經(jīng)超時(shí),則不再發(fā)出請(qǐng)求。目的應(yīng)該是盡可能減少請(qǐng)求線程的阻塞時(shí)間。


          //from NettyRemotingClient.java
          @Override
          public?RemotingCommand invokeSync(String addr, final?RemotingCommand request, long?timeoutMillis)
          ????throws?InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException
          {
          ????long?beginStartTime = System.currentTimeMillis();
          ????//這里會(huì)先檢查有無(wú)該地址的通道,有則返回,無(wú)則創(chuàng)建
          ????final?Channel channel = this.getAndCreateChannel(addr);
          ????if?(channel != null?&& channel.isActive()) {
          ????????try?{
          ????????????//前置鉤子
          ????????????doBeforeRpcHooks(addr, request);
          ????????????//判斷通道建立完成時(shí)是否已到達(dá)超時(shí)時(shí)間,如果超時(shí)直接拋出異常。不發(fā)請(qǐng)求
          ????????????long?costTime = System.currentTimeMillis() - beginStartTime;
          ????????????if?(timeoutMillis < costTime) {
          ????????????????throw?new?RemotingTimeoutException("invokeSync call timeout");
          ????????????}
          ????????????//同步調(diào)用
          ????????????RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
          ????????????//后置鉤子
          ????????????doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); //后置鉤子
          ????????????return?response;
          ????????} catch?(RemotingSendRequestException e) {
          ????????????log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
          ????????????this.closeChannel(addr, channel);
          ????????????throw?e;
          ????????} catch?(RemotingTimeoutException e) {
          ????????????if?(nettyClientConfig.isClientCloseSocketIfTimeout()) {
          ????????????????this.closeChannel(addr, channel);
          ????????????????log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
          ????????????}
          ????????????log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
          ????????????throw?e;
          ????????}
          ????} else?{
          ????????this.closeChannel(addr, channel);
          ????????throw?new?RemotingConnectException(addr);
          ????}
          }


          下一步看看它的同步調(diào)用做了什么處理。注意到它會(huì)構(gòu)建一個(gè)Future對(duì)象加入待響應(yīng)池,發(fā)出請(qǐng)求報(bào)文后就掛起線程,然后等待喚醒(waitResponse內(nèi)部使用CountDownLatch等待)。


          //from NettyRemotingAbstract.javapublic RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
          ????final?long?timeoutMillis)
          ????throws?InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
          ????//請(qǐng)求id
          ????final?int?opaque = request.getOpaque();

          ????try?{
          ????????//請(qǐng)求存根
          ????????final?ResponseFuture responseFuture = new?ResponseFuture(channel, opaque, timeoutMillis, null, null);
          ????????//加入待響應(yīng)的請(qǐng)求池
          ????????this.responseTable.put(opaque, responseFuture);
          ????????final?SocketAddress addr = channel.remoteAddress();
          ????????//將請(qǐng)求發(fā)出,成功發(fā)出時(shí)更新?tīng)顟B(tài)
          ????????channel.writeAndFlush(request).addListener(new?ChannelFutureListener() {
          ????????????@Override
          ????????????public?void?operationComplete(ChannelFuture f)?throws?Exception {
          ????????????????if?(f.isSuccess()) { //若成功發(fā)出,更新請(qǐng)求狀態(tài)為“已發(fā)出”
          ????????????????????responseFuture.setSendRequestOK(true);
          ????????????????????return;
          ????????????????} else?{
          ????????????????????responseFuture.setSendRequestOK(false);
          ????????????????}

          ????????????????//若發(fā)出失敗,則從池中移除(沒(méi)用了,釋放資源)
          ????????????????responseTable.remove(opaque);
          ????????????????responseFuture.setCause(f.cause());
          ????????????????//putResponse的時(shí)候會(huì)喚醒等待的線程
          ????????????????responseFuture.putResponse(null);
          ????????????????log.warn("send a request command to channel <"?+ addr + "> failed.");
          ????????????}
          ????????});

          ????????//只等待一段時(shí)間,不會(huì)一直等下去
          ????????//若正常響應(yīng),則收到響應(yīng)后,此線程會(huì)被喚醒,繼續(xù)執(zhí)行下去
          ????????//若超時(shí),則到達(dá)該時(shí)間后線程蘇醒,繼續(xù)執(zhí)行
          ????????RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
          ????????if?(null?== responseCommand) {
          ????????????if?(responseFuture.isSendRequestOK()) {
          ????????????????throw?new?RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
          ????????????????????responseFuture.getCause());
          ????????????} else?{
          ????????????????throw?new?RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
          ????????????}
          ????????}

          ????????return?responseCommand;
          ????} finally?{
          ????????//正常響應(yīng)完成時(shí),將future釋放(正常邏輯)
          ????????//超時(shí)時(shí),將future釋放。這個(gè)請(qǐng)求已經(jīng)作廢了,后面如果再收到響應(yīng),就可以直接丟棄了(由于找不到相關(guān)的響應(yīng)鉤子,就不處理了)
          ????????this.responseTable.remove(opaque);
          ????}
          }


          好,我們?cè)賮?lái)看看收到報(bào)文的時(shí)候是怎么處理的。我們都了解JDK中的Future的原理,大概就是將這個(gè)任務(wù)提交給其他線程處理,該線程處理完畢后會(huì)將結(jié)果寫入到Future對(duì)象中,寫入時(shí)如果有線程在等待該結(jié)果,則喚醒這些線程。這里也差不多,只不過(guò)執(zhí)行線程在服務(wù)端,服務(wù)執(zhí)行完畢后會(huì)將結(jié)果通過(guò)長(zhǎng)連接發(fā)送給客戶端,客戶端收到后根據(jù)報(bào)文中的ID信息從待響應(yīng)池中找到Future對(duì)象,然后就是類似的處理了。


          class?NettyClientHandler?extends?SimpleChannelInboundHandler<RemotingCommand> {

          ????//底層解碼完畢得到RemotingCommand的報(bào)文
          ????@Override
          ????protected?void?channelRead0(ChannelHandlerContext ctx, RemotingCommand msg)?throws?Exception {
          ????????processMessageReceived(ctx, msg);
          ????}
          }

          public?void?processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg)?throws?Exception {
          ????final?RemotingCommand cmd = msg;
          ????if?(cmd != null) {
          ????????//判斷類型
          ????????switch?(cmd.getType()) {
          ????????????case?REQUEST_COMMAND:
          ????????????????processRequestCommand(ctx, cmd);
          ????????????????break;
          ????????????case?RESPONSE_COMMAND:
          ????????????????processResponseCommand(ctx, cmd);
          ????????????????break;
          ????????????default:
          ????????????????break;
          ????????}
          ????}
          }

          public?void?processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd)?{
          ????//取得消息id
          ????final?int?opaque = cmd.getOpaque();
          ????//從待響應(yīng)池中取得對(duì)應(yīng)請(qǐng)求
          ????final?ResponseFuture responseFuture = responseTable.get(opaque);
          ????if?(responseFuture != null) {
          ????????//將響應(yīng)值注入到ResponseFuture對(duì)象中,等待線程可從這個(gè)對(duì)象獲取結(jié)果
          ????????responseFuture.setResponseCommand(cmd);
          ????????//請(qǐng)求已處理完畢,釋放該請(qǐng)求
          ????????responseTable.remove(opaque);

          ????????//如果有回調(diào)函數(shù)的話則回調(diào)(由當(dāng)前線程處理)
          ????????if?(responseFuture.getInvokeCallback() != null) {
          ????????????executeInvokeCallback(responseFuture);
          ????????} else?{
          ????????????//沒(méi)有的話,則喚醒等待線程(由等待線程做處理)
          ????????????responseFuture.putResponse(cmd);
          ????????????responseFuture.release();
          ????????}
          ????} else?{
          ????????log.warn("receive response, but not matched any request, "?+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
          ????????log.warn(cmd.toString());
          ????}
          }


          總結(jié)一下,客戶端的處理時(shí)序大概是這樣的:

          結(jié)構(gòu)大概是這樣的:

          3.服務(wù)端的處理

          //todo 服務(wù)端待補(bǔ)充CommitLog文件映射相關(guān)內(nèi)容


          class?NettyServerHandler?extends?SimpleChannelInboundHandler<RemotingCommand> {

          ????@Override
          ????protected?void?channelRead0(ChannelHandlerContext ctx, RemotingCommand msg)?throws?Exception {
          ????????processMessageReceived(ctx, msg);
          ????}
          }

          //from NettyRemotingAbscract.java
          public?void?processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg)?throws?Exception {
          ????final?RemotingCommand cmd = msg;
          ????if?(cmd != null) {
          ????????switch?(cmd.getType()) {
          ????????????case?REQUEST_COMMAND: //服務(wù)端走這里
          ????????????????processRequestCommand(ctx, cmd);
          ????????????????break;
          ????????????case?RESPONSE_COMMAND:
          ????????????????processResponseCommand(ctx, cmd);
          ????????????????break;
          ????????????default:
          ????????????????break;
          ????????}
          ????}
          }

          //from NettyRemotingAbscract.java
          public?void?processRequestCommand(final?ChannelHandlerContext ctx, final?RemotingCommand cmd)?{
          ????//查看有無(wú)該請(qǐng)求code相關(guān)的處理器
          ????final?Pair matched = this.processorTable.get(cmd.getCode());
          ????//如果沒(méi)有,則使用默認(rèn)處理器(可能沒(méi)有默認(rèn)處理器)
          ????final?Pair pair = null?== matched ? this.defaultRequestProcessor : matched;
          ????final?int?opaque = cmd.getOpaque();

          ????if?(pair != null) {
          ????????Runnable run = new?Runnable() {
          ????????????@Override
          ????????????public?void?run()?{
          ????????????????try?{
          ????????????????????doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
          ????????????????????final?RemotingResponseCallback callback = new?RemotingResponseCallback() {
          ????????????????????????@Override
          ????????????????????????public?void?callback(RemotingCommand response)?{
          ????????????????????????????doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
          ????????????????????????????if?(!cmd.isOnewayRPC()) {
          ????????????????????????????????if?(response != null) { //不為null,則由本類將響應(yīng)值寫會(huì)給請(qǐng)求方
          ????????????????????????????????????response.setOpaque(opaque);
          ????????????????????????????????????response.markResponseType();
          ????????????????????????????????????try?{
          ????????????????????????????????????????ctx.writeAndFlush(response);
          ????????????????????????????????????} catch?(Throwable e) {
          ????????????????????????????????????????log.error("process request over, but response failed", e);
          ????????????????????????????????????????log.error(cmd.toString());
          ????????????????????????????????????????log.error(response.toString());
          ????????????????????????????????????}
          ????????????????????????????????} else?{ //為null,意味著processor內(nèi)部已經(jīng)將響應(yīng)處理了,這里無(wú)需再處理。
          ????????????????????????????????}
          ????????????????????????????}
          ????????????????????????}
          ????????????????????};
          ????????????????????if?(pair.getObject1() instanceof?AsyncNettyRequestProcessor) {//QueryMessageProcessor為異步處理器
          ????????????????????????AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
          ????????????????????????processor.asyncProcessRequest(ctx, cmd, callback);
          ????????????????????} else?{
          ????????????????????????NettyRequestProcessor processor = pair.getObject1();
          ????????????????????????RemotingCommand response = processor.processRequest(ctx, cmd);
          ????????????????????????doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
          ????????????????????????callback.callback(response);
          ????????????????????}
          ????????????????} catch?(Throwable e) {
          ????????????????????log.error("process request exception", e);
          ????????????????????log.error(cmd.toString());

          ????????????????????if?(!cmd.isOnewayRPC()) {
          ????????????????????????final?RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
          ????????????????????????????RemotingHelper.exceptionSimpleDesc(e));
          ????????????????????????response.setOpaque(opaque);
          ????????????????????????ctx.writeAndFlush(response);
          ????????????????????}
          ????????????????}
          ????????????}
          ????????};

          ????????if?(pair.getObject1().rejectRequest()) {
          ????????????final?RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
          ????????????????"[REJECTREQUEST]system busy, start flow control for a while");
          ????????????response.setOpaque(opaque);
          ????????????ctx.writeAndFlush(response);
          ????????????return;
          ????????}

          ????????try?{
          ????????????final?RequestTask requestTask = new?RequestTask(run, ctx.channel(), cmd);
          ????????????pair.getObject2().submit(requestTask);
          ????????} catch?(RejectedExecutionException e) {
          ????????????if?((System.currentTimeMillis() % 10000) == 0) {
          ????????????????log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
          ????????????????????+ ", too many requests and system thread pool busy, RejectedExecutionException "
          ????????????????????+ pair.getObject2().toString()
          ????????????????????+ " request code: "?+ cmd.getCode());
          ????????????}

          ????????????if?(!cmd.isOnewayRPC()) {
          ????????????????final?RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
          ????????????????????"[OVERLOAD]system busy, start flow control for a while");
          ????????????????response.setOpaque(opaque);
          ????????????????ctx.writeAndFlush(response);
          ????????????}
          ????????}
          ????} else?{
          ????????String error = " request type "?+ cmd.getCode() + " not supported";
          ????????final?RemotingCommand response =
          ????????????RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
          ????????response.setOpaque(opaque);
          ????????ctx.writeAndFlush(response);
          ????????log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
          ????}
          }

          //from QueryMessageProcesor.java
          @Override
          public?RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
          ????throws?RemotingCommandException
          {
          ????switch?(request.getCode()) {
          ????????case?RequestCode.QUERY_MESSAGE:
          ????????????return?this.queryMessage(ctx, request);
          ????????case?RequestCode.VIEW_MESSAGE_BY_ID: //通過(guò)msgId查詢消息
          ????????????return?this.viewMessageById(ctx, request);
          ????????default:
          ????????????break;
          ????}

          ????return?null;
          }

          public?RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
          ????throws?RemotingCommandException
          {
          ????final?RemotingCommand response = RemotingCommand.createResponseCommand(null);
          ????final?ViewMessageRequestHeader requestHeader =
          ????????(ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);

          ????response.setOpaque(request.getOpaque());

          ????//getMessagetStore得到當(dāng)前映射到內(nèi)存中的CommitLog文件,然后根據(jù)偏移量取得數(shù)據(jù)
          ????final?SelectMappedBufferResult selectMappedBufferResult =
          ????????this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
          ????if?(selectMappedBufferResult != null) {
          ????????response.setCode(ResponseCode.SUCCESS);
          ????????response.setRemark(null);

          ????????//將響應(yīng)通過(guò)socket寫回給客戶端
          ????????try?{
          ????????????//response對(duì)象的數(shù)據(jù)作為header
          ????????????//消息內(nèi)容作為body
          ????????????FileRegion fileRegion =
          ????????????????new?OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
          ????????????????????selectMappedBufferResult);
          ????????????ctx.channel().writeAndFlush(fileRegion).addListener(new?ChannelFutureListener() {
          ????????????????@Override
          ????????????????public?void?operationComplete(ChannelFuture future)?throws?Exception {
          ????????????????????selectMappedBufferResult.release();
          ????????????????????if?(!future.isSuccess()) {
          ????????????????????????log.error("Transfer one message from page cache failed, ", future.cause());
          ????????????????????}
          ????????????????}
          ????????????});
          ????????} catch?(Throwable e) {
          ????????????log.error("", e);
          ????????????selectMappedBufferResult.release();
          ????????}

          ????????return?null; //如果有值,則直接寫回給請(qǐng)求方。這里返回null是不需要由外層處理響應(yīng)。
          ????} else?{
          ????????response.setCode(ResponseCode.SYSTEM_ERROR);
          ????????response.setRemark("can not find message by the offset, "?+ requestHeader.getOffset());
          ????}

          ????return?response;
          }



          粉絲福利:108本java從入門到大神精選電子書(shū)領(lǐng)取

          ???

          ?長(zhǎng)按上方鋒哥微信二維碼?2 秒
          備注「1234」即可獲取資料以及
          可以進(jìn)入java1234官方微信群



          感謝點(diǎn)贊支持下哈?

          瀏覽 62
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  免费中文中文A片 | 18禁一区二区三区 | 天天日天天综合 | 韩国精品一二三区 | 69操逼|