<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RocketMQ(四):生產(chǎn)者消息數(shù)據(jù)寫入實(shí)現(xiàn)細(xì)節(jié)

          共 57529字,需瀏覽 116分鐘

           ·

          2021-02-02 08:28

          走過路過不要錯過

          點(diǎn)擊藍(lán)字關(guān)注我們


          producer 在消息send之后,其實(shí)就是調(diào)用了 broker 對應(yīng)的api,要想了解消息的具體寫入過程就得細(xì)看broker。今天我們就來看看 broker是如何進(jìn)行消息的存儲的!(消息消費(fèi)另說)

          broker啟動起來之后,就可以接收客戶端的生產(chǎn)消費(fèi)請求了!

          1:broker與客戶端的通信: broker 服務(wù)端端口的暴露

          // org.apache.rocketmq.remoting.netty.NettyRemotingServer#start    @Override    public void start() {        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(            nettyServerConfig.getServerWorkerThreads(),            new ThreadFactory() {
          private AtomicInteger threadIndex = new AtomicInteger(0);
          @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } }); // 創(chuàng)建共享的 handler, 如 serverHandler prepareSharableHandlers();
          ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort())) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { // 標(biāo)準(zhǔn) netty 服務(wù)接入,注冊系列 handler // 編解碼器,空閑管理,連接管理,業(yè)務(wù)處理處理器 ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, // 最重要的是 serverHandler serverHandler ); } });
          if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) { childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); }
          try { ChannelFuture sync = this.serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); }
          if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } // 超時定時掃描 this.timer.scheduleAtFixedRate(new TimerTask() {
          @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); }

          可以看出,最重要的處理器是 serverHandler, 它是直接被 new 出來的一個內(nèi)部類, 在 serverBootstrap 啟動之前創(chuàng)建!

           private void prepareSharableHandlers() {        handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);        encoder = new NettyEncoder();        connectionManageHandler = new NettyConnectManageHandler();        serverHandler = new NettyServerHandler();    }    // org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler    @ChannelHandler.Sharable    class NettyServerHandler extends SimpleChannelInboundHandler {
          @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { // 主要業(yè)務(wù)處理即由 processMessageReceived 處理 processMessageReceived(ctx, msg); } } // /** * Entry of incoming command processing. * *

          * Note: * The incoming remoting command may be *

            *
          • An inquiry request from a remote peer component;
          • *
          • A response to a previous request issued by this very participant.
          • *
          *

          * * @param ctx Channel handler context. * @param msg incoming remoting command. * @throws Exception if there were any error while processing the incoming command. */ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception { final RemotingCommand cmd = msg; if (cmd != null) { switch (cmd.getType()) { case REQUEST_COMMAND: // 客戶端請求為 REQUEST_COMMAND processRequestCommand(ctx, cmd); break; case RESPONSE_COMMAND: processResponseCommand(ctx, cmd); break; default: break; } } }
          // org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand /** * Process incoming request command issued by remote peer. * * @param ctx channel handler context. * @param cmd request command. */ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { final Pair matched = this.processorTable.get(cmd.getCode()); final Pair pair = null == matched ? this.defaultRequestProcessor : matched; // opaque 相當(dāng)于是一個請求id, 用于找到對應(yīng)的請求和響應(yīng) final int opaque = cmd.getOpaque();
          if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { // rpc 鉤子處理 doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); // 此處有許多的 processor, 而處理 producer 請求的是 SendMessageProcessor final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd); doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
          if (!cmd.isOnewayRPC()) { if (response != null) { response.setOpaque(opaque); response.markResponseType(); try { ctx.writeAndFlush(response); } catch (Throwable e) { log.error("process request over, but response failed", e); log.error(cmd.toString()); log.error(response.toString()); } } else {
          } } } catch (Throwable e) { log.error("process request exception", e); log.error(cmd.toString());
          if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, RemotingHelper.exceptionSimpleDesc(e)); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } };
          if (pair.getObject1().rejectRequest()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECTREQUEST]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); return; }
          try { final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd); // 將請求提交到 對應(yīng)的線程池中,然后返回 pair.getObject2().submit(requestTask); } catch (RejectedExecutionException e) { if ((System.currentTimeMillis() % 10000) == 0) { log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + ", too many requests and system thread pool busy, RejectedExecutionException " + pair.getObject2().toString() + " request code: " + cmd.getCode()); }
          if (!cmd.isOnewayRPC()) { final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY, "[OVERLOAD]system busy, start flow control for a while"); response.setOpaque(opaque); ctx.writeAndFlush(response); } } } else { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); ctx.writeAndFlush(response); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); } }


          當(dāng)接到外部消息后,經(jīng)過初步判斷進(jìn)行簡單封裝,更多的處理放入到下游的線程池中進(jìn)行處理。

          標(biāo)準(zhǔn)的 netty 服務(wù)處理流程: 編解碼器 -> 空閑管理 -> 連接管理 -> 業(yè)務(wù)處理處理器

          2. 消息的具體寫入框架邏輯


          消息寫入由 SendMessageProcessor 進(jìn)行管理。

          // org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest    @Override    public RemotingCommand processRequest(ChannelHandlerContext ctx,                                          RemotingCommand request) throws RemotingCommandException {        SendMessageContext mqtraceContext;        switch (request.getCode()) {            case RequestCode.CONSUMER_SEND_MSG_BACK:                return this.consumerSendMsgBack(ctx, request);            default:                // 獲取 header                SendMessageRequestHeader requestHeader = parseRequestHeader(request);                if (requestHeader == null) {                    return null;                }
          mqtraceContext = buildMsgContext(ctx, requestHeader); // 寫入鉤子判定 this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
          RemotingCommand response; if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { // 普通寫入消息 response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); }
          this.executeSendMessageHookAfter(response, mqtraceContext); return response; } } // org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#parseRequestHeader protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException {
          SendMessageRequestHeaderV2 requestHeaderV2 = null; SendMessageRequestHeader requestHeader = null; switch (request.getCode()) { case RequestCode.SEND_BATCH_MESSAGE: case RequestCode.SEND_MESSAGE_V2: requestHeaderV2 = (SendMessageRequestHeaderV2) request .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); case RequestCode.SEND_MESSAGE: if (null == requestHeaderV2) { requestHeader = (SendMessageRequestHeader) request .decodeCommandCustomHeader(SendMessageRequestHeader.class); } else { requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2); } default: break; } return requestHeader; }
          // 消息發(fā)送邏輯 // org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { // 響應(yīng)類 final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
          response.setOpaque(request.getOpaque());
          response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
          log.debug("receive SendMessage request command, {}", request); // 時間檢查 final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); return response; }
          response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { return response; }
          final byte[] body = request.getBody();
          int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
          if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); }
          MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt);
          if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return response; }
          msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); PutMessageResult putMessageResult = null; Map oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { // 將消息放入 messagestore 中 putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } // 處理寫入結(jié)果 return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
          }

          如上,由 SendMessageProcessor 進(jìn)行總控消息的寫入。主要分這么幾步:

              1. 解析消息頭 SendMessageRequestHeader;
              2. 從消息頭中判定出處理流程,如是針對單個消息寫入還是批量消息的處理,轉(zhuǎn)到處理邏輯;
              3. 針對單條消息的寫入,進(jìn)來先判斷是否處理超時,如果超時就不再處理了;
              4. 寫入消息,不管成功失敗;
              5. 處理寫入結(jié)果,判定成功或失敗;

          其中寫入消息還是調(diào)用內(nèi)部的邏輯處理,當(dāng)然只是為了進(jìn)一步調(diào)用 commitLog, 進(jìn)行真正的存入。

          // org.apache.rocketmq.store.DefaultMessageStore#putMessage    public PutMessageResult putMessage(MessageExtBrokerInner msg) {        if (this.shutdown) {            log.warn("message store has shutdown, so putMessage is forbidden");            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        }        // SLAVE 不可寫入數(shù)據(jù)         if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {            long value = this.printTimes.getAndIncrement();            if ((value % 50000) == 0) {                log.warn("message store is slave mode, so putMessage is forbidden ");            }
          return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); }
          if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); }
          return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } else { this.printTimes.set(0); }
          if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); }
          if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); }
          if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); }
          long beginTime = this.getSystemClock().now(); // 放入 commitLog 中 PutMessageResult result = this.commitLog.putMessage(msg);
          long elapsedTime = this.getSystemClock().now() - beginTime; if (elapsedTime > 500) { log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
          if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); }
          return result; }

          3. CommitLog 如何寫入數(shù)據(jù)?

          CommitLog 會執(zhí)行真正的寫入數(shù)據(jù)邏輯,主要借助 MappedFileQueue 和 MappedFile。

            // org.apache.rocketmq.store.CommitLog#putMessage    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {        // Set the storage time        msg.setStoreTimestamp(System.currentTimeMillis());        // Set the message body BODY CRC (consider the most appropriate setting        // on the client)        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));        // Back to Results        AppendMessageResult result = null;
          StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
          String topic = msg.getTopic(); int queueId = msg.getQueueId();
          final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); }
          topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
          // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
          msg.setTopic(topic); msg.setQueueId(queueId); } }
          long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; // 獲取最后一個 mappedFile, 寫入數(shù)據(jù) MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // 上鎖寫入 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp;
          // Here settings are stored timestamp, in order to ensure an orderly // global msg.setStoreTimestamp(beginLockTimestamp);
          if (null == mappedFile || mappedFile.isFull()) { mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } // 向 mappedFile 中添加數(shù)據(jù) result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); }
          elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); }
          if (elapsedTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); }
          if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); }
          PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
          // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); // 通知進(jìn)行刷盤操作 handleDiskFlush(result, putMessageResult, msg); // HA 處理,在要求同步刷盤時,要求 SLAVE 也寫入數(shù)據(jù),才算成功 handleHA(result, putMessageResult, msg);
          return putMessageResult; }
          // org.apache.rocketmq.store.MappedFile#appendMessage(org.apache.rocketmq.store.MessageExtBrokerInner, org.apache.rocketmq.store.AppendMessageCallback) public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { return appendMessagesInner(msg, cb); } // org.apache.rocketmq.store.MappedFile#appendMessagesInner public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; // 寫入位置 int currentPos = this.wrotePosition.get();
          if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } // org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET

          // PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position();
          this.resetByteBuffer(hostHolder, 8); String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
          // Record ConsumeQueue information keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); }
          // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; }
          /** * Serialize message */ final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
          final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
          if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); }
          final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length;
          final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
          final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
          // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); }
          // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); }
          // 依次寫入?yún)f(xié)議數(shù)據(jù) // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(hostHolder, 8); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder)); //this.msgBatchMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData);
          final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
          AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
          switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; } // org.apache.rocketmq.store.CommitLog#handleDiskFlush public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 喚醒刷盤服務(wù),進(jìn)行異步刷盤 flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } }
          // 處理寫入數(shù)據(jù)的結(jié)果 // org.apache.rocketmq.broker.processor.SendMessageProcessor#handlePutMessageResult private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) { if (putMessageResult == null) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store putMessage return null"); return response; } boolean sendOK = false;
          switch (putMessageResult.getPutMessageStatus()) { // Success case PUT_OK: sendOK = true; response.setCode(ResponseCode.SUCCESS); break; case FLUSH_DISK_TIMEOUT: response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); sendOK = true; break; case FLUSH_SLAVE_TIMEOUT: response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); sendOK = true; break; case SLAVE_NOT_AVAILABLE: response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); sendOK = true; break;
          // Failed case CREATE_MAPEDFILE_FAILED: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("create mapped file failed, server is busy or broken."); break; case MESSAGE_ILLEGAL: case PROPERTIES_SIZE_EXCEEDED: response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark( "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); break; case SERVICE_NOT_AVAILABLE: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); response.setRemark( "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); break; case OS_PAGECACHE_BUSY: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); break; case UNKNOWN_ERROR: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR"); break; default: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR DEFAULT"); break; }
          String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); // 寫入成功,則直接響應(yīng) if (sendOK) { // 狀態(tài)統(tǒng)計 this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
          response.setRemark(null);
          responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); // 直接向客戶端輸出結(jié)果 doResponse(ctx, request, response);
          if (hasSendMessageHook()) { sendMessageContext.setMsgId(responseHeader.getMsgId()); sendMessageContext.setQueueId(responseHeader.getQueueId()); sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
          int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
          sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); sendMessageContext.setCommercialSendTimes(incValue); sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } return null; } else { if (hasSendMessageHook()) { int wroteSize = request.getBody().length; int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
          sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); sendMessageContext.setCommercialSendTimes(incValue); sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } } return response; }

          4. 刷盤的實(shí)現(xiàn)

          前面的put操作只是將數(shù)據(jù)寫入到mappedByteBuffer中,還沒有進(jìn)行真正的磁盤寫入,所以需要進(jìn)行刷盤。

          刷盤動作分為同步刷盤和異常刷盤,同步可以保證寫入的及時性及可靠性,但是性能會有比較大的影響。異步刷盤:能夠充分利用OS的PageCache的優(yōu)勢,只要消息寫入PageCache即可將成功的ACK返回給Producer端。消息刷盤采用后臺異步線程提交的方式進(jìn)行,降低了讀寫延遲,提高了MQ的性能和吞吐量。

          同步刷盤由 GroupCommitRequest 進(jìn)行處理,而異步刷盤則是由 CommitLog$FlushRealTimeService/CommitLog$CommitRealTimeService/CommitLog$GroupCommitService 進(jìn)行處理。源碼解釋如下:

           // org.apache.rocketmq.store.CommitLog#handleDiskFlush    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {        // Synchronization flush        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;            if (messageExt.isWaitStoreMsgOK()) {                // 初始化要寫入的數(shù)據(jù)偏移                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());                // 將要刷盤的數(shù)據(jù)放入隊列中                service.putRequest(request);                // 等待刷盤結(jié)果                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());                if (!flushOK) {                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()                        + " client address: " + messageExt.getBornHostString());                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);                }            } else {                service.wakeup();            }        }        // Asynchronous flush        else {            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {                flushCommitLogService.wakeup();            } else {                commitLogService.wakeup();            }        }    }

          4.1 同步刷盤服務(wù)

           // org.apache.rocketmq.store.CommitLog.GroupCommitRequest#GroupCommitRequest        public GroupCommitRequest(long nextOffset) {            this.nextOffset = nextOffset;        }        // org.apache.rocketmq.store.CommitLog.GroupCommitService#putRequest        public synchronized void putRequest(final GroupCommitRequest request) {            synchronized (this.requestsWrite) {                this.requestsWrite.add(request);            }            if (hasNotified.compareAndSet(false, true)) {                waitPoint.countDown(); // notify            }        }        // org.apache.rocketmq.store.CommitLog.GroupCommitRequest#waitForFlush        public boolean waitForFlush(long timeout) {            try {                // 提交request后,就一直在此處等待                this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);                return this.flushOK;            } catch (InterruptedException e) {                log.error("Interrupted", e);                return false;            }        }        //后臺會有一個線程一直掃描 request 隊列!        // org.apache.rocketmq.store.CommitLog.GroupCommitService#run        public void run() {            CommitLog.log.info(this.getServiceName() + " service started");
          while (!this.isStopped()) { try { // 調(diào)用父類 ServiceThread 模板方法,覆寫 onWaitEnd() 方法 // 此處理休眠等待的同時,也進(jìn)行了隊列的轉(zhuǎn)換,如從 requestsWrite 隊列轉(zhuǎn)換數(shù)據(jù)到 requestsRead 中 this.waitForRunning(10); // 提交 requestsRead 隊列 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // 以下為異常處理流程 // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); }
          synchronized (this) { this.swapRequests(); }
          this.doCommit();
          CommitLog.log.info(this.getServiceName() + " service end"); } // org.apache.rocketmq.common.ServiceThread#waitForRunning protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { // GroupCommitService 覆寫,進(jìn)行隊列交換 this.onWaitEnd(); return; }
          //entry to wait waitPoint.reset();
          try { waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { hasNotified.set(false); this.onWaitEnd(); } } // org.apache.rocketmq.store.CommitLog.GroupCommitService#onWaitEnd @Override protected void onWaitEnd() { this.swapRequests(); } private void swapRequests() { // 通過 requestsWrite, 減少 requestsRead 隊列上鎖競爭機(jī)會 // 只有在交換的短瞬間可能存在競爭,它會保證 放入寫隊列操作不會被 刷寫操作阻塞,從而提高性能 List tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } // doCommit 進(jìn)行真正的 數(shù)據(jù)刷盤操作 // org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit private void doCommit() { // 同步操作,保證線程安全,由于 requestsWrite 與 requestsRead 經(jīng)常進(jìn)行交換操作,所以,此處的鎖也相當(dāng)于分段鎖,并不會鎖全局 synchronized (this.requestsRead) { // 隊列為空,則無需刷盤 if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush // 為防止消息寫入后,還有一段數(shù)據(jù)是被寫入到第二個 mapfile中,所以,會再嘗試刷寫入第二次 boolean flushOK = false; for (int i = 0; i < 2 && !flushOK; i++) { // 此處有兩種情況 // 1. 消息未刷入file, !flushOK // 2. 消息寫入了file, 但是被分到了兩個文件中, 從而 flushedWhere變小, 需要再刷一次 !flushOK // 具體每次刷寫多少數(shù)據(jù),且看后續(xù)分解 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
          if (!flushOK) { CommitLog.this.mappedFileQueue.flush(0); } }
          req.wakeupCustomer(flushOK); }
          long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); }
          this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } } // org.apache.rocketmq.store.MappedFileQueue#flush public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 調(diào)用 mappedFile 進(jìn)行寫數(shù)據(jù) int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } }
          return result; } // org.apache.rocketmq.store.MappedFile#flush /** * @return The current flushed position */ public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition();
          try { //We only append data to fileChannel or mappedByteBuffer, never both. // fileChannel 即是最終的文件通道, 調(diào)用 force() 方法進(jìn)行刷盤 if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); }
          this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }

          4.2 異步刷盤

          CommitRealTimeService, 服務(wù)線程會一直進(jìn)行commit..


          // org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run @Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
          int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
          int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
          long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; }
          try { boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); }
          if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } }
          boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } } commit操作流程如下: // org.apache.rocketmq.store.MappedFileQueue#commit public boolean commit(final int commitLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); if (mappedFile != null) { int offset = mappedFile.commit(commitLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.committedWhere; this.committedWhere = where; }
          return result; } // org.apache.rocketmq.store.MappedFile#commit public int commit(final int commitLeastPages) { if (writeBuffer == null) { //no need to commit data to file channel, so just regard wrotePosition as committedPosition. return this.wrotePosition.get(); } if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { commit0(commitLeastPages); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } }
          // All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; }
          return this.committedPosition.get(); } // org.apache.rocketmq.store.MappedFile#commit0 protected void commit0(final int commitLeastPages) { int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get();
          if (writePos - this.committedPosition.get() > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } } flush 操作流程如下: // org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run public void run() { CommitLog.log.info(this.getServiceName() + " service started");
          while (!this.isStopped()) { boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
          int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
          int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
          boolean printFlushProgress = false;
          // Print flush progress long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; }
          try { // 等待時間間隔 if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); }
          if (printFlushProgress) { this.printFlushProgress(); }
          long begin = System.currentTimeMillis(); // 時間到, flush CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } }
          // Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); }
          this.printFlushProgress();
          CommitLog.log.info(this.getServiceName() + " service end"); } // org.apache.rocketmq.store.MappedFileQueue#flush public boolean flush(final int flushLeastPages) { boolean result = true; MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 同樣調(diào)用 mappedFile.flush() 方法進(jìn)行刷盤 int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } }
          return result; }


          以上刷盤流程,都是調(diào)用 mappedFileQueue 的 commit 或者 flush 方法進(jìn)行。

            由三個線程完成:

              GroupCommitService, 進(jìn)行同步請求處理;
              CommitRealTimeService, 進(jìn)行異步刷盤commit;
              FlushRealTimeService, 同步刷盤服務(wù), 此服務(wù)依賴于 broker 配置;

            

          整個處理流程時序圖可大致歸結(jié)如下:

            1.?網(wǎng)絡(luò)接入

          2.?數(shù)據(jù)存儲

          3.?客戶端響應(yīng)

          處理過程還是相對容易理解的。



          往期精彩推薦



          騰訊、阿里、滴滴后臺面試題匯總總結(jié) — (含答案)

          面試:史上最全多線程面試題 !

          最新阿里內(nèi)推Java后端面試題

          JVM難學(xué)?那是因?yàn)槟銢]認(rèn)真看完這篇文章


          END


          關(guān)注作者微信公眾號 —《JAVA爛豬皮》


          了解更多java后端架構(gòu)知識以及最新面試寶典


          你點(diǎn)的每個好看,我都認(rèn)真當(dāng)成了


          看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動力


          作者:等你歸去來

          出處:https://www.cnblogs.com/yougewe/p/12133260.html

          瀏覽 62
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  午夜社区 | 大香蕉狠狠操 | 亚洲v视频 | 五月天成人视频 | 亚洲一级A片 |