【源碼】RocketMQ如何實(shí)現(xiàn)獲取指定消息
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
? 作者?|??貓毛·波拿巴
來(lái)源 |? ?urlify.cn/YfuQ32
66套java從入門到精通實(shí)戰(zhàn)課程分享
概要
消息查詢是什么?
消息查詢就是根據(jù)用戶提供的msgId從MQ中取出該消息
RocketMQ如果有多個(gè)節(jié)點(diǎn)如何查詢?
問(wèn)題:RocketMQ分布式結(jié)構(gòu)中,數(shù)據(jù)分散在各個(gè)節(jié)點(diǎn),即便是同一Topic的數(shù)據(jù),也未必都在一個(gè)broker上。客戶端怎么知道數(shù)據(jù)該去哪個(gè)節(jié)點(diǎn)上查?
猜想1:逐個(gè)訪問(wèn)broker節(jié)點(diǎn)查詢數(shù)據(jù)
猜想2:有某種數(shù)據(jù)中心存在,該中心知道所有消息存儲(chǔ)的位置,只要向該中心查詢即可得到消息具體位置,進(jìn)而取得消息內(nèi)容
實(shí)際:
1.消息Id中含有消息所在的broker的地址信息(IP\Port)以及該消息在CommitLog中的偏移量。
2.客戶端實(shí)現(xiàn)會(huì)從msgId字符串中解析出broker地址,向指定broker節(jié)查詢消息。
問(wèn)題:CommitLog文件有多個(gè),只有偏移量估計(jì)不能確定在哪個(gè)文件吧?
實(shí)際:?jiǎn)蝹€(gè)Broker節(jié)點(diǎn)內(nèi)offset是全局唯一的,不是每個(gè)CommitLog文件的偏移量都是從0開(kāi)始的。單個(gè)節(jié)點(diǎn)內(nèi)所有CommitLog文件共用一套偏移量,每個(gè)文件的文件名為其第一個(gè)消息的偏移量。所以可以根據(jù)偏移量和文件名確定CommitLog文件。
源碼閱讀
0.使用方式
MessageExt? msg = consumer.viewMessage(msgId);
1.消息ID解析
這個(gè)了解下就可以了
public?class?MessageId?{
????private?SocketAddress address;
????private?long?offset;
????public?MessageId(SocketAddress address, long?offset)?{
????????this.address = address;
????????this.offset = offset;
????}
????//get-set
}
//from MQAdminImpl.java
public?MessageExt viewMessage(
????String msgId)?throws?RemotingException, MQBrokerException, InterruptedException, MQClientException {
????MessageId messageId = null;
????try?{
????????//從msgId字符串中解析出address和offset
????????//address = ip:port
????????//offset為消息在CommitLog文件中的偏移量
????????messageId = MessageDecoder.decodeMessageId(msgId);
????} catch?(Exception e) {
????????throw?new?MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
????}
????return?this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
????????messageId.getOffset(), timeoutMillis);
}
//from MessageDecoder.java
public?static?MessageId decodeMessageId(final?String msgId)?throws?UnknownHostException {
????SocketAddress address;
????long?offset;
????//ipv4和ipv6的區(qū)別
????//如果msgId總長(zhǎng)度超過(guò)32字符,則為ipv6
????int?ipLength = msgId.length() == 32?? 4?* 2?: 16?* 2;
????byte[] ip = UtilAll.string2bytes(msgId.substring(0, ipLength));
????byte[] port = UtilAll.string2bytes(msgId.substring(ipLength, ipLength + 8));
????ByteBuffer bb = ByteBuffer.wrap(port);
????int?portInt = bb.getInt(0);
????address = new?InetSocketAddress(InetAddress.getByAddress(ip), portInt);
????// offset
????byte[] data = UtilAll.string2bytes(msgId.substring(ipLength + 8, ipLength + 8?+ 16));
????bb = ByteBuffer.wrap(data);
????offset = bb.getLong(0);
????return?new?MessageId(address, offset);
}2.長(zhǎng)連接客戶端RPC實(shí)現(xiàn)
要發(fā)請(qǐng)求首先得先建立連接,這里方法可以看到創(chuàng)建連接相關(guān)的操作。值得注意的是,第一次訪問(wèn)的時(shí)候可能連接還沒(méi)建立,建立連接需要消耗一段時(shí)間。代碼中對(duì)這個(gè)時(shí)間也做了判斷,如果連接建立完成后,發(fā)現(xiàn)已經(jīng)超時(shí),則不再發(fā)出請(qǐng)求。目的應(yīng)該是盡可能減少請(qǐng)求線程的阻塞時(shí)間。
//from NettyRemotingClient.java
@Override
public?RemotingCommand invokeSync(String addr, final?RemotingCommand request, long?timeoutMillis)
????throws?InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
????long?beginStartTime = System.currentTimeMillis();
????//這里會(huì)先檢查有無(wú)該地址的通道,有則返回,無(wú)則創(chuàng)建
????final?Channel channel = this.getAndCreateChannel(addr);
????if?(channel != null?&& channel.isActive()) {
????????try?{
????????????//前置鉤子
????????????doBeforeRpcHooks(addr, request);
????????????//判斷通道建立完成時(shí)是否已到達(dá)超時(shí)時(shí)間,如果超時(shí)直接拋出異常。不發(fā)請(qǐng)求
????????????long?costTime = System.currentTimeMillis() - beginStartTime;
????????????if?(timeoutMillis < costTime) {
????????????????throw?new?RemotingTimeoutException("invokeSync call timeout");
????????????}
????????????//同步調(diào)用
????????????RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
????????????//后置鉤子
????????????doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); //后置鉤子
????????????return?response;
????????} catch?(RemotingSendRequestException e) {
????????????log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
????????????this.closeChannel(addr, channel);
????????????throw?e;
????????} catch?(RemotingTimeoutException e) {
????????????if?(nettyClientConfig.isClientCloseSocketIfTimeout()) {
????????????????this.closeChannel(addr, channel);
????????????????log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
????????????}
????????????log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
????????????throw?e;
????????}
????} else?{
????????this.closeChannel(addr, channel);
????????throw?new?RemotingConnectException(addr);
????}
}下一步看看它的同步調(diào)用做了什么處理。注意到它會(huì)構(gòu)建一個(gè)Future對(duì)象加入待響應(yīng)池,發(fā)出請(qǐng)求報(bào)文后就掛起線程,然后等待喚醒(waitResponse內(nèi)部使用CountDownLatch等待)。
//from NettyRemotingAbstract.javapublic RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
????final?long?timeoutMillis)
????throws?InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
????//請(qǐng)求id
????final?int?opaque = request.getOpaque();
????try?{
????????//請(qǐng)求存根
????????final?ResponseFuture responseFuture = new?ResponseFuture(channel, opaque, timeoutMillis, null, null);
????????//加入待響應(yīng)的請(qǐng)求池
????????this.responseTable.put(opaque, responseFuture);
????????final?SocketAddress addr = channel.remoteAddress();
????????//將請(qǐng)求發(fā)出,成功發(fā)出時(shí)更新?tīng)顟B(tài)
????????channel.writeAndFlush(request).addListener(new?ChannelFutureListener() {
????????????@Override
????????????public?void?operationComplete(ChannelFuture f)?throws?Exception {
????????????????if?(f.isSuccess()) { //若成功發(fā)出,更新請(qǐng)求狀態(tài)為“已發(fā)出”
????????????????????responseFuture.setSendRequestOK(true);
????????????????????return;
????????????????} else?{
????????????????????responseFuture.setSendRequestOK(false);
????????????????}
????????????????//若發(fā)出失敗,則從池中移除(沒(méi)用了,釋放資源)
????????????????responseTable.remove(opaque);
????????????????responseFuture.setCause(f.cause());
????????????????//putResponse的時(shí)候會(huì)喚醒等待的線程
????????????????responseFuture.putResponse(null);
????????????????log.warn("send a request command to channel <"?+ addr + "> failed.");
????????????}
????????});
????????//只等待一段時(shí)間,不會(huì)一直等下去
????????//若正常響應(yīng),則收到響應(yīng)后,此線程會(huì)被喚醒,繼續(xù)執(zhí)行下去
????????//若超時(shí),則到達(dá)該時(shí)間后線程蘇醒,繼續(xù)執(zhí)行
????????RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
????????if?(null?== responseCommand) {
????????????if?(responseFuture.isSendRequestOK()) {
????????????????throw?new?RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
????????????????????responseFuture.getCause());
????????????} else?{
????????????????throw?new?RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
????????????}
????????}
????????return?responseCommand;
????} finally?{
????????//正常響應(yīng)完成時(shí),將future釋放(正常邏輯)
????????//超時(shí)時(shí),將future釋放。這個(gè)請(qǐng)求已經(jīng)作廢了,后面如果再收到響應(yīng),就可以直接丟棄了(由于找不到相關(guān)的響應(yīng)鉤子,就不處理了)
????????this.responseTable.remove(opaque);
????}
}好,我們?cè)賮?lái)看看收到報(bào)文的時(shí)候是怎么處理的。我們都了解JDK中的Future的原理,大概就是將這個(gè)任務(wù)提交給其他線程處理,該線程處理完畢后會(huì)將結(jié)果寫入到Future對(duì)象中,寫入時(shí)如果有線程在等待該結(jié)果,則喚醒這些線程。這里也差不多,只不過(guò)執(zhí)行線程在服務(wù)端,服務(wù)執(zhí)行完畢后會(huì)將結(jié)果通過(guò)長(zhǎng)連接發(fā)送給客戶端,客戶端收到后根據(jù)報(bào)文中的ID信息從待響應(yīng)池中找到Future對(duì)象,然后就是類似的處理了。
class?NettyClientHandler?extends?SimpleChannelInboundHandler<RemotingCommand> {
????//底層解碼完畢得到RemotingCommand的報(bào)文
????@Override
????protected?void?channelRead0(ChannelHandlerContext ctx, RemotingCommand msg)?throws?Exception {
????????processMessageReceived(ctx, msg);
????}
}
public?void?processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg)?throws?Exception {
????final?RemotingCommand cmd = msg;
????if?(cmd != null) {
????????//判斷類型
????????switch?(cmd.getType()) {
????????????case?REQUEST_COMMAND:
????????????????processRequestCommand(ctx, cmd);
????????????????break;
????????????case?RESPONSE_COMMAND:
????????????????processResponseCommand(ctx, cmd);
????????????????break;
????????????default:
????????????????break;
????????}
????}
}
public?void?processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd)?{
????//取得消息id
????final?int?opaque = cmd.getOpaque();
????//從待響應(yīng)池中取得對(duì)應(yīng)請(qǐng)求
????final?ResponseFuture responseFuture = responseTable.get(opaque);
????if?(responseFuture != null) {
????????//將響應(yīng)值注入到ResponseFuture對(duì)象中,等待線程可從這個(gè)對(duì)象獲取結(jié)果
????????responseFuture.setResponseCommand(cmd);
????????//請(qǐng)求已處理完畢,釋放該請(qǐng)求
????????responseTable.remove(opaque);
????????//如果有回調(diào)函數(shù)的話則回調(diào)(由當(dāng)前線程處理)
????????if?(responseFuture.getInvokeCallback() != null) {
????????????executeInvokeCallback(responseFuture);
????????} else?{
????????????//沒(méi)有的話,則喚醒等待線程(由等待線程做處理)
????????????responseFuture.putResponse(cmd);
????????????responseFuture.release();
????????}
????} else?{
????????log.warn("receive response, but not matched any request, "?+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
????????log.warn(cmd.toString());
????}
}總結(jié)一下,客戶端的處理時(shí)序大概是這樣的:

結(jié)構(gòu)大概是這樣的:

3.服務(wù)端的處理
//todo 服務(wù)端待補(bǔ)充CommitLog文件映射相關(guān)內(nèi)容
class?NettyServerHandler?extends?SimpleChannelInboundHandler<RemotingCommand> {
????@Override
????protected?void?channelRead0(ChannelHandlerContext ctx, RemotingCommand msg)?throws?Exception {
????????processMessageReceived(ctx, msg);
????}
}
//from NettyRemotingAbscract.java
public?void?processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg)?throws?Exception {
????final?RemotingCommand cmd = msg;
????if?(cmd != null) {
????????switch?(cmd.getType()) {
????????????case?REQUEST_COMMAND: //服務(wù)端走這里
????????????????processRequestCommand(ctx, cmd);
????????????????break;
????????????case?RESPONSE_COMMAND:
????????????????processResponseCommand(ctx, cmd);
????????????????break;
????????????default:
????????????????break;
????????}
????}
}
//from NettyRemotingAbscract.java
public?void?processRequestCommand(final?ChannelHandlerContext ctx, final?RemotingCommand cmd)?{
????//查看有無(wú)該請(qǐng)求code相關(guān)的處理器
????final?Pair matched = this.processorTable.get(cmd.getCode());
????//如果沒(méi)有,則使用默認(rèn)處理器(可能沒(méi)有默認(rèn)處理器)
????final?Pair pair = null?== matched ? this.defaultRequestProcessor : matched;
????final?int?opaque = cmd.getOpaque();
????if?(pair != null) {
????????Runnable run = new?Runnable() {
????????????@Override
????????????public?void?run()?{
????????????????try?{
????????????????????doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
????????????????????final?RemotingResponseCallback callback = new?RemotingResponseCallback() {
????????????????????????@Override
????????????????????????public?void?callback(RemotingCommand response)?{
????????????????????????????doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
????????????????????????????if?(!cmd.isOnewayRPC()) {
????????????????????????????????if?(response != null) { //不為null,則由本類將響應(yīng)值寫會(huì)給請(qǐng)求方
????????????????????????????????????response.setOpaque(opaque);
????????????????????????????????????response.markResponseType();
????????????????????????????????????try?{
????????????????????????????????????????ctx.writeAndFlush(response);
????????????????????????????????????} catch?(Throwable e) {
????????????????????????????????????????log.error("process request over, but response failed", e);
????????????????????????????????????????log.error(cmd.toString());
????????????????????????????????????????log.error(response.toString());
????????????????????????????????????}
????????????????????????????????} else?{ //為null,意味著processor內(nèi)部已經(jīng)將響應(yīng)處理了,這里無(wú)需再處理。
????????????????????????????????}
????????????????????????????}
????????????????????????}
????????????????????};
????????????????????if?(pair.getObject1() instanceof?AsyncNettyRequestProcessor) {//QueryMessageProcessor為異步處理器
????????????????????????AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
????????????????????????processor.asyncProcessRequest(ctx, cmd, callback);
????????????????????} else?{
????????????????????????NettyRequestProcessor processor = pair.getObject1();
????????????????????????RemotingCommand response = processor.processRequest(ctx, cmd);
????????????????????????doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
????????????????????????callback.callback(response);
????????????????????}
????????????????} catch?(Throwable e) {
????????????????????log.error("process request exception", e);
????????????????????log.error(cmd.toString());
????????????????????if?(!cmd.isOnewayRPC()) {
????????????????????????final?RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
????????????????????????????RemotingHelper.exceptionSimpleDesc(e));
????????????????????????response.setOpaque(opaque);
????????????????????????ctx.writeAndFlush(response);
????????????????????}
????????????????}
????????????}
????????};
????????if?(pair.getObject1().rejectRequest()) {
????????????final?RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
????????????????"[REJECTREQUEST]system busy, start flow control for a while");
????????????response.setOpaque(opaque);
????????????ctx.writeAndFlush(response);
????????????return;
????????}
????????try?{
????????????final?RequestTask requestTask = new?RequestTask(run, ctx.channel(), cmd);
????????????pair.getObject2().submit(requestTask);
????????} catch?(RejectedExecutionException e) {
????????????if?((System.currentTimeMillis() % 10000) == 0) {
????????????????log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
????????????????????+ ", too many requests and system thread pool busy, RejectedExecutionException "
????????????????????+ pair.getObject2().toString()
????????????????????+ " request code: "?+ cmd.getCode());
????????????}
????????????if?(!cmd.isOnewayRPC()) {
????????????????final?RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
????????????????????"[OVERLOAD]system busy, start flow control for a while");
????????????????response.setOpaque(opaque);
????????????????ctx.writeAndFlush(response);
????????????}
????????}
????} else?{
????????String error = " request type "?+ cmd.getCode() + " not supported";
????????final?RemotingCommand response =
????????????RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
????????response.setOpaque(opaque);
????????ctx.writeAndFlush(response);
????????log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
????}
}
//from QueryMessageProcesor.java
@Override
public?RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
????throws?RemotingCommandException {
????switch?(request.getCode()) {
????????case?RequestCode.QUERY_MESSAGE:
????????????return?this.queryMessage(ctx, request);
????????case?RequestCode.VIEW_MESSAGE_BY_ID: //通過(guò)msgId查詢消息
????????????return?this.viewMessageById(ctx, request);
????????default:
????????????break;
????}
????return?null;
}
public?RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
????throws?RemotingCommandException {
????final?RemotingCommand response = RemotingCommand.createResponseCommand(null);
????final?ViewMessageRequestHeader requestHeader =
????????(ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
????response.setOpaque(request.getOpaque());
????//getMessagetStore得到當(dāng)前映射到內(nèi)存中的CommitLog文件,然后根據(jù)偏移量取得數(shù)據(jù)
????final?SelectMappedBufferResult selectMappedBufferResult =
????????this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
????if?(selectMappedBufferResult != null) {
????????response.setCode(ResponseCode.SUCCESS);
????????response.setRemark(null);
????????//將響應(yīng)通過(guò)socket寫回給客戶端
????????try?{
????????????//response對(duì)象的數(shù)據(jù)作為header
????????????//消息內(nèi)容作為body
????????????FileRegion fileRegion =
????????????????new?OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
????????????????????selectMappedBufferResult);
????????????ctx.channel().writeAndFlush(fileRegion).addListener(new?ChannelFutureListener() {
????????????????@Override
????????????????public?void?operationComplete(ChannelFuture future)?throws?Exception {
????????????????????selectMappedBufferResult.release();
????????????????????if?(!future.isSuccess()) {
????????????????????????log.error("Transfer one message from page cache failed, ", future.cause());
????????????????????}
????????????????}
????????????});
????????} catch?(Throwable e) {
????????????log.error("", e);
????????????selectMappedBufferResult.release();
????????}
????????return?null; //如果有值,則直接寫回給請(qǐng)求方。這里返回null是不需要由外層處理響應(yīng)。
????} else?{
????????response.setCode(ResponseCode.SYSTEM_ERROR);
????????response.setRemark("can not find message by the offset, "?+ requestHeader.getOffset());
????}
????return?response;
} 粉絲福利:108本java從入門到大神精選電子書(shū)領(lǐng)取
???
?長(zhǎng)按上方鋒哥微信二維碼?2 秒 備注「1234」即可獲取資料以及 可以進(jìn)入java1234官方微信群
感謝點(diǎn)贊支持下哈?
