<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          如何優(yōu)雅地開始研究一個新技術(shù)

          共 17467字,需瀏覽 35分鐘

           ·

          2021-05-14 03:04

          低并發(fā)編程
          戰(zhàn)略上藐視技術(shù),戰(zhàn)術(shù)上重視技術(shù)

          不講過多大道理,本文以我開始研究 rocketmq 為例,就最近的事。

          大家不要嘲笑我,我對 rocketmq 一無所知,所以寫這篇文章剛好合適,正好也記錄下我開始學(xué)習(xí) rocketmq 這個冷啟動階段,大家看看對自己是否有幫助。

          這篇文章是我邊看邊寫的,力求還原一下我最真實的啟動過程,不了解 rocketmq 的不用擔(dān)心,甚至更好,因為本文不會涉及到 rocketmq 的細節(jié),只是突出從一無所知開始研究它的啟動階段。

          開始吧。


          先把源碼跑起來


          第一步,先把源碼搞到手。

          百度出來它的官方網(wǎng)站:

          http://rocketmq.apache.org/

          傻瓜式的首頁,非常友好,直接點擊最大的那個 Getting Started 按鈕。

          然后就進入了 quick-start 文檔頁。

          根據(jù)開頭給的 4.8.0 版本的 rocketmq 源碼地址 

          https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip 

          把它下載了下來。

          是個 Java 項目,還是 maven 構(gòu)建的,那好辦了,直接 idea 打開!

          緊接著人家就教我如何構(gòu)建它。

          > mvn -Prelease-all -DskipTests clean install -U
          cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0

          mvn 構(gòu)建這一步就受挫了,好像是 maven 默認中央倉庫好多 jar 包下載不了,于是我把 maven 配置文件的中央倉庫改成阿里的,就好了。

          <mirror>
            <id>alimaven</id>
            <name>aliyun maven</name
              <url>
          http://maven.aliyun.com/nexus/content/repositories/central/</url>
            <mirrorOf>central</mirrorOf>
          </mirror>

          阿里云牛逼!(此處應(yīng)有廣告位)


          體驗一下最簡操作


          接著往下讀文檔,現(xiàn)在代碼已經(jīng)到手,也構(gòu)建成功,接下來講的是如果體驗一下發(fā)個消息,再收個消息的過程。

          我是 windows 系統(tǒng),就照著下面 windows 的教程來,就是這么任性。

          嗯,配置一下環(huán)境變量而已。

          接下來是四個啟動腳本,很清晰。

          ## Start Name Server
          .\bin\mqnamesrv.cmd
          ## Start Broker
          .\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
          ## Send Messages
          .\bin\tools.cmd  org.apache.rocketmq.example.quickstart.Producer
          ## Receive Messages
          .\bin\tools.cmd  org.apache.rocketmq.example.quickstart.Consumer

          我看了一下這些 cmd 文件,意思就是執(zhí)行這些類的 main 方法而已,于是我改在 idea 里分別執(zhí)行他們,大概長這個樣子。

          ???????

          具體看下較為關(guān)心的兩個 main 方法。

          Producer 的 main 方法,很簡單,簡化版如下。

          DefaultMQProducer producer = new DefaultMQProducer("example");
          producer.start();
          for (int i = 0; i < 1000; i++) {
              Message msg = new Message(...);
              SendResult sendResult = producer.send(msg);
              System.out.printf("%s%n", sendResult);
          }

          執(zhí)行后,會看到一堆消息作為生產(chǎn)者被生產(chǎn)出來。

          ...
          SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84C03DF, offsetMsgId=0AE84A8400002A9F00000000000F7133, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=2], queueOffset=1244]
          SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84C03E0, offsetMsgId=0AE84A8400002A9F00000000000F71FE, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=3], queueOffset=1249]
          SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E1, offsetMsgId=0AE84A8400002A9F00000000000F72C9, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=0], queueOffset=1244]
          SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E2, offsetMsgId=0AE84A8400002A9F00000000000F7394, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=1], queueOffset=1251]
          SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E3, offsetMsgId=0AE84A8400002A9F00000000000F745F, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=2], queueOffset=1245]
          SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84D03E4, offsetMsgId=0AE84A8400002A9F00000000000F752A, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=3], queueOffset=1250]
          SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84E03E5, offsetMsgId=0AE84A8400002A9F00000000000F75F5, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=0], queueOffset=1245]
          SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84E03E6, offsetMsgId=0AE84A8400002A9F00000000000F76C0, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=1], queueOffset=1252]
          SendResult [sendStatus=SEND_OK, msgId=7F000001162C18B4AAC28D83A84E03E7, offsetMsgId=0AE84A8400002A9F00000000000F778B, messageQueue=MessageQueue [topic=TopicTest, brokerName=DESKTOP-Q6M76VI, queueId=2], queueOffset=1246]
          ...

          同樣,Consumer 的 mian 方法,也非常簡單。

          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example");
          consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
          consumer.subscribe("TopicTest""*");
          consumer.registerMessageListener(new MessageListenerConcurrently() {
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                  ConsumeConcurrentlyContext context)
           
          {
                  System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
              }
          });
          consumer.start();

          執(zhí)行后會看到一堆消息被成功消費

          ...
          ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1239, sysFlag=0, bornTimestamp=1619580615742, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615742, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F5246, commitLogOffset=1004102, bodyCRC=493758879, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83E03B8, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575350], transactionId='null'}]] 
          ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1238, sysFlag=0, bornTimestamp=1619580615740, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615740, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4F1A, commitLogOffset=1003290, bodyCRC=1688269248, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83C03B4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575256], transactionId='null'}]] 
          ConsumeMessageThread_5 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1237, sysFlag=0, bornTimestamp=1619580615737, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615737, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4BEE, commitLogOffset=1002478, bodyCRC=1830206955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83903B0, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575252], transactionId='null'}]] 
          ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1236, sysFlag=0, bornTimestamp=1619580615735, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615735, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F48C2, commitLogOffset=1001666, bodyCRC=1786477042, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83703AC, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575248], transactionId='null'}]] 
          ConsumeMessageThread_18 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1235, sysFlag=0, bornTimestamp=1619580615733, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615733, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4596, commitLogOffset=1000854, bodyCRC=1280920064, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83503A8, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575154], transactionId='null'}]] 
          ConsumeMessageThread_16 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1234, sysFlag=0, bornTimestamp=1619580615731, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615731, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F426A, commitLogOffset=1000042, bodyCRC=1261735449, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83303A4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575150], transactionId='null'}]] 
          ConsumeMessageThread_10 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1233, sysFlag=0, bornTimestamp=1619580615729, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615729, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F3F3E, commitLogOffset=999230, bodyCRC=855266886, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83103A0, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575056], transactionId='null'}]] 
          ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1232, sysFlag=0, bornTimestamp=1619580615727, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615728, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F3C12, commitLogOffset=998418, bodyCRC=994843245, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82F039C, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575052], transactionId='null'}]] 
          ConsumeMessageThread_11 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1231, sysFlag=0, bornTimestamp=1619580615725, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615726, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F38E6, commitLogOffset=997606, bodyCRC=1008852596, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82D0398, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132575048], transactionId='null'}]] 
          ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1230, sysFlag=0, bornTimestamp=1619580615723, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615724, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F35BA, commitLogOffset=996794, bodyCRC=2121214082, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82B0394, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72101108108111328211199107101116778132574954], transactionId='null'}]] 
          ...

          一個生產(chǎn),一個消費,清晰明了,體驗很好。

          嗯,至此為止,quick-start 就基本通了,也就知道這玩意的最簡單的用法了。


          看看整體架構(gòu)


          此時已經(jīng)上手體驗了一把用法,來總結(jié)下。

          1. 我啟動了一個 namesrv,暴露的端口是 9876。
          2. 我又啟動了一個 broker,啟動時加了個參數(shù) -n localhost:9876,很明顯,就是指向了剛剛那個 namesrv。
          3. 然后我啟動 produer 發(fā)了一堆消息。
          4. 最后我啟動了 consumer,就神奇地收到了這個消息。

          啟動 producer 和 consumer 時都必須使用一個環(huán)境變量叫 NAMESRV_ADDR=localhost:9876

          OK 了,此時我已經(jīng)有了合理的猜測,producer 和 consumer 都是通過環(huán)境變量連接了 9876 這個端口,也就是 namesrv,然后通過這個 namesrv 能找到 broker,那我可以簡單畫出這樣的架構(gòu)圖(這是我猜的,不一定對)。

          嗯,有了這個自己的猜測,我去看了官方文檔中的架構(gòu)部分:

          https://github.com/apache/rocketmq/blob/master/docs/cn/architecture.md

          上來就是一張圖。

          哇塞,大差不差,只是官方文檔的圖是考慮到了集群情況,其實我反倒覺得開始的圖不應(yīng)該整合太多內(nèi)容。

          緊接著,下面就解釋了這四個東西都是啥,我簡寫下。

          Producer:消息發(fā)布的角色,選擇相應(yīng)的 Broker 集群隊列進行消息投遞。
          Consumer:消息消費的角色,支持以 push 推,pull 拉兩種模式對消息進行消費。
          NameServer:一個非常簡單的 Topic 路由注冊中心,支持 Broker 的動態(tài)注冊與發(fā)現(xiàn)。
          BrokerServer:Broker 主要負責(zé)消息的存儲、投遞和查詢,包括以下幾個子模塊:

          Remoting Module:整個 Broker 的實體,負責(zé)處理來自 clients 端的請求。

          Client Manager:負責(zé)管理客戶端(Producer/Consumer)和維護 Consumer 的 Topic 訂閱信息。

          Store Service:提供方便簡單的 API 接口處理消息存儲到物理硬盤和查詢功能。

          HA Service:高可用服務(wù),提供 Master Broker 和 Slave Broker 之間的數(shù)據(jù)同步功能。

          Index Service:根據(jù)特定的 Message key 對投遞到 Broker 的消息進行索引服務(wù),以提供消息的快速查詢。

          嗯,又有了些新概念,BrokerServer 還分成五個子模塊,看著有些蒙,先不管。

          再往下看,到了部署架構(gòu),有了部署步驟。

          嗯,跟我們剛剛 quick-start 部分的部署順序一樣,只不過步驟三的創(chuàng)建 Topic,可以提前創(chuàng)建,也可以在發(fā)送消息時自動創(chuàng)建,我們剛剛用的應(yīng)該就是發(fā)消息時自動創(chuàng)建啦!這里再拿個小本本記下來,如何提前創(chuàng)建 Topic,OK 不去管它。

          再往下,就是最最最最重要的部分了,也就是設(shè)計原理

          地址是這個:

          https://github.com/apache/rocketmq/blob/master/docs/cn/design.md

          這也就是我們研究 rocketmq 的原理,閱讀源碼,最需要看的部分,當(dāng)然,也包括面試,滋滋滋。

          設(shè)計中包含六個部分,分別是消息存儲、通信機制、消息過濾、負載均衡、事務(wù)消息、消息查詢

          比如消息存儲,是整個設(shè)計部分的第一個版塊,上來就是一張勸退圖。

          下面配上了文字講解。

          之前的 quick-start 和整體架構(gòu)的描述,可以被快速地理解,到這就不行了,就真得花時間開始細琢磨了。

          但實際上呀,仔細看消息存儲版塊里面的內(nèi)容,包含三個部分:

          1.1 消息存儲整體架構(gòu)
          1.2 頁緩存與內(nèi)存映射
          1.3 消息刷盤

          這里面包括 IO 模型,內(nèi)存映射,磁盤順序讀寫,PageCache,同步異步刷盤等通用的底層知識,如果這些都統(tǒng)統(tǒng)掌握,整個這一部分就跟拼積木一樣,很順利拼起來了。

          這回知道底層知識有啥用了吧?起碼能讓你更深入和快速理解一個由它們拼起來的上層技術(shù)。

          這塊就沒法完全展開啦,不然就成一篇講 rocketmq 原理的文章了,我拿出這里的一句話。

          另外,RocketMQ 主要通過 MappedByteBuffer 對文件進行讀寫操作。其中,利用了 NIO 中的 FileChannel 模型將磁盤上的物理文件直接映射到用戶態(tài)的內(nèi)存地址中(這種 Mmap 的方式減少了傳統(tǒng) IO 將磁盤文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū)和用戶應(yīng)用程序地址空間的緩沖區(qū)之間來回進行拷貝的性能開銷),將對文件的操作轉(zhuǎn)化為直接對內(nèi)存地址進行操作,從而極大地提高了文件的讀寫效率(正因為需要使用內(nèi)存映射機制,故 RocketMQ 的文件存儲都使用定長結(jié)構(gòu)來存儲,方便一次將整個文件映射至內(nèi)存)。

          這段話如果你對零拷貝這個底層概念了解,其實整段話一秒鐘就看完了,而且會覺得它是廢話,就像是在湊字數(shù)一樣。(當(dāng)然這個我做不到哈,我對底層還沒有了解很透徹,所以用了 10 秒,理解到了一個馬馬虎虎的程度)


          見仁見智


          再接下來就是見仁見智的部分啦,如果是已經(jīng)有了非常多技術(shù)深入學(xué)習(xí)經(jīng)驗的大牛,直接根據(jù)設(shè)計文檔,再對照源碼即可,對大牛來說是最快的方式。
          如果不是的話,可以先找一些市面上比較經(jīng)典的入門書籍,或者質(zhì)量高的視頻教程,過一遍,然后再配合設(shè)計文檔和源碼,基本就能把這個技術(shù)吃透了。
          學(xué)多了之后你會發(fā)現(xiàn),好多底層的技術(shù)或中間件,和我們應(yīng)用層一樣,也是有套路的,也是拼積木拼起來的。所以今后在你想學(xué)一門新技術(shù)時,別想著學(xué)它有沒有用,面試會不會考,起碼你看多了之后會發(fā)現(xiàn),越新技術(shù)越學(xué)越快,學(xué)到最后你會發(fā)現(xiàn)根本就沒有新技術(shù),一切都是在拼積木。
          好了不裝逼了,我是第二類人群,已經(jīng)卡在設(shè)計文檔那里,所以我現(xiàn)在去找視頻看了,有推薦的可以留言區(qū)評論下。
          同時歡迎加我好友看我朋友圈的一驚一乍,就在公眾號菜單欄里可以找到哦。
          瀏覽 60
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天操屄| 国产精品站 | 狠狠2022 | 国产豆花在线视频 | 欧美高清成人在线 |