<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RocketMQ入門到入土(一)新手也能看懂的原理和實(shí)戰(zhàn)!

          共 19377字,需瀏覽 39分鐘

           ·

          2020-12-20 00:36

          點(diǎn)擊上方?好好學(xué)java?,選擇?星標(biāo)?公眾號

          重磅資訊、干貨,第一時間送達(dá)

          今日推薦:硬剛一周,3W字總結(jié),一年的經(jīng)驗(yàn)告訴你如何準(zhǔn)備校招!

          個人原創(chuàng)100W+訪問量博客:點(diǎn)擊前往,查看更多

          學(xué)任何技術(shù)都是兩步驟:

          1. 搭建環(huán)境

          2. helloworld

          我也不例外,直接搞起來。

          一、RocketMQ的安裝

          1、文檔

          官方網(wǎng)站

          http://rocketmq.apache.org

          GitHub

          https://github.com/apache/rocketmq

          2、下載

          wget?https://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip

          我們是基于Centos8來的,面向官方文檔學(xué)習(xí),所以下載地址自然也是官方的。

          去官方網(wǎng)站找合適的版本進(jìn)行下載,目前我這里最新的是4.7.0版本。

          http://rocketmq.apache.org/dowloading/releases/


          https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip


          3、準(zhǔn)備工作

          3.1、解壓

          unzip?rocketmq-all-4.7.0-bin-release.zip

          3.2、安裝jdk

          sudo?yum?install?java-1.8.0-openjdk-devel

          4、啟動

          4.1、啟動namesrv

          cd?rocketmq-all-4.7.0-bin-release/bin
          ./mqnamesrv

          4.2、啟動broker

          cd?rocketmq-all-4.7.0-bin-release/bin
          ./mqbroker?-n?localhost:9876

          常見錯誤以及解決方案:

          常見錯誤:啟動broker失敗?Cannot allocate memory

          [root@node-113b?bin]#?./mqbroker?-n?localhost:9876
          Java?HotSpot(TM)?64-Bit?Server?VM?warning:?INFO:?os::commit_memory(0x00000005c0000000,?8589934592,?0)?failed
          ;?error='Cannot?allocate?memory'?(errno=12)#
          #?There?is?insufficient?memory?for?the?Java?Runtime?Environment?to?continue.
          #?Native?memory?allocation?(mmap)?failed?to?map?8589934592?bytes?for?committing?reserved?memory.
          #?An?error?report?file?with?more?information?is?saved?as:
          #?/usr/local/rocketmq/bin/hs_err_pid1997.log

          解決方案:

          是由于默認(rèn)內(nèi)存分配的太大了,超出了本機(jī)內(nèi)存,直接OOM了。

          修改bin/目錄下的如下兩個腳本

          runbroker.sh
          runserver.sh

          在這兩個腳本里都搜索-server -Xms,將其內(nèi)存分配小點(diǎn),自己玩的話512MB就足夠了,夠夠的了!

          4.3、啟動成功標(biāo)識

          namesrv啟動成功標(biāo)識:

          broker啟動成功標(biāo)識:

          二、RocketMQ控制臺的安裝

          控制臺目前獲取方式有如下兩種:

          1. 第三方網(wǎng)站去下載現(xiàn)成的,比如csdn等。

          2. 官方源碼包自己編譯而成,官方?jīng)]有現(xiàn)成的。

          我們這里當(dāng)然采取官方方式。

          1、官方文檔

          github倉庫

          https://github.com/apache/rocketmq-externals

          中文指南

          https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md

          2、下載源碼

          https://codeload.github.com/apache/rocketmq-externals/zip/master

          3、修改配置(可選)

          我們下載完解壓后的文件目錄如下:

          修改rocketmq-console\src\main\resources\application.properties文件的server.port就歐了。默認(rèn)8080。

          4、編譯打包

          進(jìn)入rocketmq-console,然后用maven進(jìn)行編譯打包

          mvn?clean?package?-DskipTests

          打包完會在target下生成我們spring boot的jar程序,直接java -jar啟動完事。

          5、啟動控制臺

          將編譯打包好的springboot程序扔到服務(wù)器上,執(zhí)行如下命令進(jìn)行啟動

          java?-jar?rocketmq-console-ng-1.0.1.jar?--rocketmq.config.namesrvAddr=127.0.0.1:9876

          如果想后臺啟動就nohup &

          訪問一下看看效果:

          三、測試

          rocketmq給我們提供了測試工具和測試類,可以在安裝完很方便的進(jìn)行測試。

          0、準(zhǔn)備工作

          rocketmq給我們提供的默認(rèn)測試工具在bin目錄下,叫tools.sh。我們測試前需要配置這個腳本,為他指定namesrv地址才可以,否則測試發(fā)送/消費(fèi)消息的時候會出現(xiàn)如下錯誤?connect to null failed

          22:49:02.470?[main]?DEBUG?i.n.u.i.l.InternalLoggerFactory?-?Using?SLF4J?as?the?default?logging?framework
          RocketMQLog:WARN?No?appenders?could?be?found?for?logger?(io.netty.util.internal.PlatformDependent0).
          RocketMQLog:WARN?Please?initialize?the?logger?system?properly.
          java.lang.IllegalStateException:?org.apache.rocketmq.remoting.exception.RemotingConnectException:?connect?to?null?failed

          配置如下:

          vim?tools.sh
          #?在export?JAVA_HOME上面添加如下這段代碼
          export?NAMESRV_ADDR=localhost:9876

          1、發(fā)送消息

          ./tools.sh?org.apache.rocketmq.example.quickstart.Producer

          成功的話會看到嘩嘩嘩的日志,因?yàn)檫@個類會發(fā)送1000條消息到TopicTest這個Topic下。

          2、消費(fèi)消息

          ./tools.sh?org.apache.rocketmq.example.quickstart.Consumer

          成功的話會看到嘩嘩嘩的日志,因?yàn)檫@個類會消費(fèi)TopicTest下的全部消息。剛發(fā)送的1000條都會被消費(fèi)掉。

          3、控制臺

          發(fā)送成功后我們自然也能來到管控臺去看消息和消費(fèi)情況等等等信息

          四、架構(gòu)圖以及角色

          1、架構(gòu)圖

          2、角色

          2.1、Broker

          • 理解成RocketMQ本身

          • broker主要用于producer和consumer接收和發(fā)送消息

          • broker會定時向nameserver提交自己的信息

          • 是消息中間件的消息存儲、轉(zhuǎn)發(fā)服務(wù)器

          • 每個Broker節(jié)點(diǎn),在啟動時,都會遍歷NameServer列表,與每個NameServer建立長連接,注冊自己的信息,之后定時上報

          2.2、Nameserver

          • 理解成zookeeper的效果,只是他沒用zk,而是自己寫了個nameserver來替代zk

          • 底層由netty實(shí)現(xiàn),提供了路由管理、服務(wù)注冊、服務(wù)發(fā)現(xiàn)的功能,是一個無狀態(tài)節(jié)點(diǎn)

          • nameserver是服務(wù)發(fā)現(xiàn)者,集群中各個角色(producer、broker、consumer等)都需要定時向nameserver上報自己的狀態(tài),以便互相發(fā)現(xiàn)彼此,超時不上報的話,nameserver會把它從列表中剔除

          • nameserver可以部署多個,當(dāng)多個nameserver存在的時候,其他角色同時向他們上報信息,以保證高可用,

          • NameServer集群間互不通信,沒有主備的概念

          • nameserver內(nèi)存式存儲,nameserver中的broker、topic等信息默認(rèn)不會持久化,所以他是無狀態(tài)節(jié)點(diǎn)

          2.3、Producer

          • 消息的生產(chǎn)者

          • 隨機(jī)選擇其中一個NameServer節(jié)點(diǎn)建立長連接,獲得Topic路由信息(包括topic下的queue,這些queue分布在哪些broker上等等)

          • 接下來向提供topic服務(wù)的master建立長連接(因?yàn)閞ocketmq只有master才能寫消息),且定時向master發(fā)送心跳

          2.4、Consumer

          • 消息的消費(fèi)者

          • 通過NameServer集群獲得Topic的路由信息,連接到對應(yīng)的Broker上消費(fèi)消息

          • 由于Master和Slave都可以讀取消息,因此Consumer會與Master和Slave都建立連接進(jìn)行消費(fèi)消息

          3、核心流程

          • Broker都注冊到Nameserver上

          • Producer發(fā)消息的時候會從Nameserver上獲取發(fā)消息的topic信息

          • Producer向提供服務(wù)的所有master建立長連接,且定時向master發(fā)送心跳

          • Consumer通過NameServer集群獲得Topic的路由信息

          • Consumer會與所有的Master和所有的Slave都建立連接進(jìn)行監(jiān)聽新消息

          五、核心概念

          1、Message

          消息載體。Message發(fā)送或者消費(fèi)的時候必須指定Topic。Message有一個可選的Tag項(xiàng)用于過濾消息,還可以添加額外的鍵值對。

          2、topic

          消息的邏輯分類,發(fā)消息之前必須要指定一個topic才能發(fā),就是將這條消息發(fā)送到這個topic上。消費(fèi)消息的時候指定這個topic進(jìn)行消費(fèi)。就是邏輯分類。

          3、queue

          1個Topic會被分為N個Queue,數(shù)量是可配置的。message本身其實(shí)是存儲到queue上的,消費(fèi)者消費(fèi)的也是queue上的消息。多說一嘴,比如1個topic4個queue,有5個Consumer都在消費(fèi)這個topic,那么會有一個consumer浪費(fèi)掉了,因?yàn)樨?fù)載均衡策略,每個consumer消費(fèi)1個queue,5>4,溢出1個,這個會不工作。

          4、Tag

          Tag 是 Topic 的進(jìn)一步細(xì)分,顧名思義,標(biāo)簽。每個發(fā)送的時候消息都能打tag,消費(fèi)的時候可以根據(jù)tag進(jìn)行過濾,選擇性消費(fèi)。

          5、Message Model

          消息模型:集群(Clustering)和廣播(Broadcasting)

          6、Message Order

          消息順序:順序(Orderly)和并發(fā)(Concurrently)

          7、Producer Group

          消息生產(chǎn)者組

          8、Consumer Group

          消息消費(fèi)者組

          六、ACK

          首先要明確一點(diǎn):ACK機(jī)制是發(fā)生在Consumer端的,不是在Producer端的。也就是說Consumer消費(fèi)完消息后要進(jìn)行ACK確認(rèn),如果未確認(rèn)則代表是消費(fèi)失敗,這時候Broker會進(jìn)行重試策略(僅集群模式會重試)。ACK的意思就是:Consumer說:ok,我消費(fèi)成功了。這條消息給我標(biāo)記成已消費(fèi)吧。

          七、消費(fèi)模式

          1、集群模式(Clustering)

          1.1、圖解


          1.2、特點(diǎn)

          • 每條消息只需要被處理一次,broker只會把消息發(fā)送給消費(fèi)集群中的一個消費(fèi)者

          • 在消息重投時,不能保證路由到同一臺機(jī)器上

          • 消費(fèi)狀態(tài)由broker維護(hù)

          2、廣播模式(Broadcasting)

          2.1、圖解


          2.2、特點(diǎn)

          • 消費(fèi)進(jìn)度由consumer維護(hù)

          • 保證每個消費(fèi)者都消費(fèi)一次消息

          • 消費(fèi)失敗的消息不會重投

          八、Java API

          說明:

          • RocketMQ服務(wù)端版本為目前最新版:4.7.0

          • Java客戶端版本采取的目前最新版:4.7.0

          pom如下

          <dependency>
          ????<groupId>org.apache.rocketmqgroupId>
          ????<artifactId>rocketmq-clientartifactId>
          ????<version>4.7.0version>
          dependency>

          1、Producer

          發(fā)消息肯定要必備如下幾個條件:

          • 指定生產(chǎn)組名(不能用默認(rèn)的,會報錯)

          • 配置namesrv地址(必須)

          • 指定topic name(必須)

          • 指定tag/key(可選)

          驗(yàn)證消息是否發(fā)送成功:消息發(fā)送完后可以啟動消費(fèi)者進(jìn)行消費(fèi),也可以去管控臺上看消息是否存在。

          1.1、send(同步)

          public?class?Producer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//?指定生產(chǎn)組名為my-producer
          ????????DefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");
          ????????//?配置namesrv地址
          ????????producer.setNamesrvAddr("124.57.180.156:9876");
          ????????//?啟動Producer
          ????????producer.start();
          ????????//?創(chuàng)建消息對象,topic為:myTopic001,消息內(nèi)容為:hello world
          ????????Message?msg?=?new?Message("myTopic001",?"hello?world".getBytes());
          ????????//?發(fā)送消息到mq,同步的
          ????????SendResult?result?=?producer.send(msg);
          ????????System.out.println("發(fā)送消息成功!result is :?"?+?result);
          ????????//?關(guān)閉Producer
          ????????producer.shutdown();
          ????????System.out.println("生產(chǎn)者 shutdown!");
          ????}
          }

          輸出結(jié)果:

          發(fā)送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854140F418B4AAC26F7973910000, offsetMsgId=7B39B49D00002A9F00000000000589BE,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=0],?queueOffset=7]
          生產(chǎn)者 shutdown!

          1.2、send(批量)

          public?class?ProducerMultiMsg?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//?指定生產(chǎn)組名為my-producer
          ????????DefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");
          ????????//?配置namesrv地址
          ????????producer.setNamesrvAddr("124.57.180.156:9876");
          ????????//?啟動Producer
          ????????producer.start();

          ????????String?topic?=?"myTopic001";
          ????????//?創(chuàng)建消息對象,topic為:myTopic001,消息內(nèi)容為:hello world1/2/3
          ????????Message?msg1?=?new?Message(topic,?"hello?world1".getBytes());
          ????????Message?msg2?=?new?Message(topic,?"hello?world2".getBytes());
          ????????Message?msg3?=?new?Message(topic,?"hello?world3".getBytes());
          ????????//?創(chuàng)建消息對象的集合,用于批量發(fā)送
          ????????List?msgs?=?new?ArrayList<>();
          ????????msgs.add(msg1);
          ????????msgs.add(msg2);
          ????????msgs.add(msg3);
          ????????//?批量發(fā)送的api的也是send(),只是他的重載方法支持List,同樣是同步發(fā)送。
          ????????SendResult?result?=?producer.send(msgs);
          ????????System.out.println("發(fā)送消息成功!result is :?"?+?result);
          ????????//?關(guān)閉Producer
          ????????producer.shutdown();
          ????????System.out.println("生產(chǎn)者 shutdown!");
          ????}
          }

          輸出結(jié)果:

          發(fā)送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854139C418B4AAC26F7D13770000,A9FE854139C418B4AAC26F7D13770001,A9FE854139C418B4AAC26F7D13770002, offsetMsgId=7B39B49D00002A9F0000000000058A62,7B39B49D00002A9F0000000000058B07,7B39B49D00002A9F0000000000058BAC,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=0],?queueOffset=8]
          生產(chǎn)者 shutdown!

          從結(jié)果中可以看到只有一個msgId,所以可以發(fā)現(xiàn)雖然是三條消息對象,但是卻只發(fā)送了一次,大大節(jié)省了client與server的開銷。

          錯誤情況:

          批量發(fā)送的topic必須是同一個,如果message對象指定不同的topic,那么批量發(fā)送的時候會報錯:

          Exception?in?thread?"main"?org.apache.rocketmq.client.exception.MQClientException:?Failed?to?initiate?the?MessageBatch
          For?more?information,?please?visit?the?url,?http://rocketmq.apache.org/docs/faq/
          ????at?org.apache.rocketmq.client.producer.DefaultMQProducer.batch(DefaultMQProducer.java:950)
          ????at?org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:898)
          ????at?com.chentongwei.mq.rocketmq.ProducerMultiMsg.main(ProducerMultiMsg.java:29)
          Caused?by:?java.lang.UnsupportedOperationException:?The?topic?of?the?messages?in?one?batch?should?be?the?same
          ????at?org.apache.rocketmq.common.message.MessageBatch.generateFromList(MessageBatch.java:58)
          ????at?org.apache.rocketmq.client.producer.DefaultMQProducer.batch(DefaultMQProducer.java:942)
          ????...?2?more

          1.3、sendCallBack(異步)

          public?class?ProducerASync?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ???????//?指定生產(chǎn)組名為my-producer
          ????????DefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");
          ????????//?配置namesrv地址
          ????????producer.setNamesrvAddr("124.57.180.156:9876");
          ????????//?啟動Producer
          ????????producer.start();

          ????????//?創(chuàng)建消息對象,topic為:myTopic001,消息內(nèi)容為:hello world async
          ????????Message?msg?=?new?Message("myTopic001",?"hello?world?async".getBytes());
          ????????//?進(jìn)行異步發(fā)送,通過SendCallback接口來得知發(fā)送的結(jié)果
          ????????producer.send(msg,?new?SendCallback()?{
          ????????????//?發(fā)送成功的回調(diào)接口
          ????????????@Override
          ????????????public?void?onSuccess(SendResult?sendResult)?{
          ????????????????System.out.println("發(fā)送消息成功!result is :?"?+?sendResult);
          ????????????}
          ????????????//?發(fā)送失敗的回調(diào)接口
          ????????????@Override
          ????????????public?void?onException(Throwable?throwable)?{
          ????????????????throwable.printStackTrace();
          ????????????????System.out.println("發(fā)送消息失敗!result is :?"?+?throwable.getMessage());
          ????????????}
          ????????});

          ????????producer.shutdown();
          ????????System.out.println("生產(chǎn)者 shutdown!");
          ????}
          }

          輸出結(jié)果:

          生產(chǎn)者 shutdown!
          java.lang.IllegalStateException:?org.apache.rocketmq.remoting.exception.RemotingConnectException:?connect?to?[124.57.180.156:9876]?failed
          ????at?org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:681)
          ????at?org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:511)
          ????at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:692)
          ????at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:556)
          ????at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.access$300(DefaultMQProducerImpl.java:97)
          ????at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl$4.run(DefaultMQProducerImpl.java:510)
          ????at?java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          ????at?java.util.concurrent.FutureTask.run(FutureTask.java:266)
          ????at?java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          ????at?java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          ????at?java.lang.Thread.run(Thread.java:745)
          Caused?by:?org.apache.rocketmq.remoting.exception.RemotingConnectException:?connect?to?[124.57.180.156:9876]?failed
          ????at?org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel(NettyRemotingClient.java:441)
          ????at?org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel(NettyRemotingClient.java:396)
          ????at?org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:365)
          ????at?org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1371)
          ????at?org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1361)
          ????at?org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:624)
          ????...?10?more
          發(fā)送消息失敗!result is : org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [124.57.180.156:9876]?failed

          為啥報錯了?很簡單,他是異步的,從結(jié)果就能看出來,由于是異步的,我還沒發(fā)送到mq呢,你就先給我shutdown了。肯定不行,所以我們在shutdown前面sleep 1s在看效果

          public?class?ProducerASync?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ???????//?指定生產(chǎn)組名為my-producer
          ????????DefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");
          ????????//?配置namesrv地址
          ????????producer.setNamesrvAddr("124.57.180.156:9876");
          ????????//?啟動Producer
          ????????producer.start();

          ????????//?創(chuàng)建消息對象,topic為:myTopic001,消息內(nèi)容為:hello world async
          ????????Message?msg?=?new?Message("myTopic001",?"hello?world?async".getBytes());
          ????????//?進(jìn)行異步發(fā)送,通過SendCallback接口來得知發(fā)送的結(jié)果
          ????????producer.send(msg,?new?SendCallback()?{
          ????????????//?發(fā)送成功的回調(diào)接口
          ????????????@Override
          ????????????public?void?onSuccess(SendResult?sendResult)?{
          ????????????????System.out.println("發(fā)送消息成功!result is :?"?+?sendResult);
          ????????????}
          ????????????//?發(fā)送失敗的回調(diào)接口
          ????????????@Override
          ????????????public?void?onException(Throwable?throwable)?{
          ????????????????throwable.printStackTrace();
          ????????????????System.out.println("發(fā)送消息失敗!result is :?"?+?throwable.getMessage());
          ????????????}
          ????????});

          ????????Thread.sleep(1000);

          ????????producer.shutdown();
          ????????System.out.println("生產(chǎn)者 shutdown!");
          ????}
          }

          輸出結(jié)果:

          發(fā)送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854106E418B4AAC26F8719B20000, offsetMsgId=7B39B49D00002A9F0000000000058CFC,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=1],?queueOffset=2]
          生產(chǎn)者 shutdown!

          1.4、sendOneway

          public?class?ProducerOneWay?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//?指定生產(chǎn)組名為my-producer
          ????????DefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");
          ????????//?配置namesrv地址
          ????????producer.setNamesrvAddr("124.57.180.156:9876");
          ????????//?啟動Producer
          ????????producer.start();

          ????????//?創(chuàng)建消息對象,topic為:myTopic001,消息內(nèi)容為:hello world oneway
          ????????Message?msg?=?new?Message("myTopic001",?"hello?world?oneway".getBytes());
          ????????//?效率最高,因?yàn)閛neway不關(guān)心是否發(fā)送成功,我就投遞一下我就不管了。所以返回是void
          ????????producer.sendOneway(msg);
          ????????System.out.println("投遞消息成功!,注意這里是投遞成功,而不是發(fā)送消息成功哦!因?yàn)槲襰endOneway也不知道到底成沒成功,我沒返回值的。");
          ????????producer.shutdown();
          ????????System.out.println("生產(chǎn)者 shutdown!");
          ????}
          }

          輸出結(jié)果:

          投遞消息成功!,注意這里是投遞成功,而不是發(fā)送消息成功哦!因?yàn)槲襰endOneway也不知道到底成沒成功,我沒返回值的。
          生產(chǎn)者 shutdown!

          1.5、效率對比

          sendOneway > sendCallBack > send批量 > send單條

          很容易理解,sendOneway不求結(jié)果,我就負(fù)責(zé)投遞,我不管你失敗還是成功,相當(dāng)于中轉(zhuǎn)站,來了我就扔出去,我不進(jìn)行任何其他處理。所以最快。

          而sendCallBack是異步發(fā)送肯定比同步的效率高。

          send批量和send單條的效率也是分情況的,如果只有1條msg要發(fā),那還搞毛批量,直接send單條完事。

          2、Consumer

          每個consumer只能關(guān)注一個topic。

          發(fā)消息肯定要必備如下幾個條件:

          • 指定消費(fèi)組名(不能用默認(rèn)的,會報錯)

          • 配置namesrv地址(必須)

          • 指定topic name(必須)

          • 指定tag/key(可選)

          2.1、CLUSTERING

          集群模式,默認(rèn)。

          比如啟動五個Consumer,Producer生產(chǎn)一條消息后,Broker會選擇五個Consumer中的其中一個進(jìn)行消費(fèi)這條消息,所以他屬于點(diǎn)對點(diǎn)消費(fèi)模式。

          public?class?Consumer?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//?指定消費(fèi)組名為my-consumer
          ????????DefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("my-consumer");
          ????????//?配置namesrv地址
          ????????consumer.setNamesrvAddr("124.57.180.156:9876");
          ????????//?訂閱topic:myTopic001 下的全部消息(因?yàn)槭?,*指定的是tag標(biāo)簽,代表全部消息,不進(jìn)行任何過濾)
          ????????consumer.subscribe("myTopic001",?"*");
          ????????//?注冊監(jiān)聽器,進(jìn)行消息消息。
          ????????consumer.registerMessageListener(new?MessageListenerConcurrently()?{
          ????????????@Override
          ????????????public?ConsumeConcurrentlyStatus?consumeMessage(List?msgs,?ConsumeConcurrentlyContext?consumeConcurrentlyContext)?{
          ????????????????for?(MessageExt?msg?:?msgs)?{
          ????????????????????String?str?=?new?String(msg.getBody());
          ????????????????????//?輸出消息內(nèi)容
          ????????????????????System.out.println(str);
          ????????????????}
          ????????????????//?默認(rèn)情況下,這條消息只會被一個consumer消費(fèi),這叫點(diǎn)對點(diǎn)消費(fèi)模式。也就是集群模式。
          ????????????????//?ack確認(rèn)
          ????????????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          ????????????}
          ????????});
          ????????//?啟動消費(fèi)者
          ????????consumer.start();
          ????????System.out.println("Consumer?start");
          ????}
          }

          2.2、BROADCASTING

          廣播模式。

          比如啟動五個Consumer,Producer生產(chǎn)一條消息后,Broker會把這條消息廣播到五個Consumer中,這五個Consumer分別消費(fèi)一次,每個都消費(fèi)一次。

          //?代碼里只需要添加如下這句話即可:
          consumer.setMessageModel(MessageModel.BROADCASTING);?

          2.3、兩種模式對比

          • 集群默認(rèn)是默認(rèn)的,廣播模式是需要手動配置。

          • 一條消息:集群模式下的多個Consumer只會有一個Consumer消費(fèi)。廣播模式下的每一個Consumer都會消費(fèi)這條消息。

          • 廣播模式下,發(fā)送一條消息后,會被當(dāng)前被廣播的所有Consumer消費(fèi),但是后面新加入的Consumer不會消費(fèi)這條消息,很好理解:村里面大喇叭喊了全村來領(lǐng)雞蛋,第二天你們村新來個人,那個人肯定聽不到昨天大喇叭喊的消息呀。

          3、TAG&&KEY

          發(fā)送/消費(fèi) 消息的時候可以指定tag/key來進(jìn)行過濾消息,支持通配符。*代表消費(fèi)此topic下的全部消息,不進(jìn)行過濾。

          看下org.apache.rocketmq.common.message.Message源碼可以發(fā)現(xiàn)發(fā)消息的時候可以指定tag和keys:

          public?Message(String?topic,?String?tags,?String?keys,?byte[]?body)?{
          ????this(topic,?tags,?keys,?0,?body,?true);
          }

          比如:

          public?class?ProducerTagsKeys?{
          ????public?static?void?main(String[]?args)?throws?Exception?{
          ????????//?指定生產(chǎn)組名為my-producer
          ????????DefaultMQProducer?producer?=?new?DefaultMQProducer("my-producer");
          ????????//?配置namesrv地址
          ????????producer.setNamesrvAddr("124.57.180.156:9876");
          ????????//?啟動Producer
          ????????producer.start();
          ????????//?創(chuàng)建消息對象,topic為:myTopic001,消息內(nèi)容為:hello world,且tags為:test-tags,keys為test-keys
          ????????Message?msg?=?new?Message("myTopic001",?"test-tags",?"test-keys",?"hello?world".getBytes());
          ????????//?發(fā)送消息到mq,同步的
          ????????SendResult?result?=?producer.send(msg);
          ????????System.out.println("發(fā)送消息成功!result is :?"?+?result);
          ????????//?關(guān)閉Producer
          ????????producer.shutdown();
          ????????System.out.println("生產(chǎn)者 shutdown!");
          ????}
          }

          輸出結(jié)果:

          發(fā)送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854149DC18B4AAC26FA4B7200000, offsetMsgId=7B39B49D00002A9F0000000000058DA6,?messageQueue=MessageQueue?[topic=myTopic001,?brokerName=broker-a,?queueId=3],?queueOffset=3]
          生產(chǎn)者 shutdown!

          查看管控臺,可以發(fā)現(xiàn)tags和keys已經(jīng)生效了:


          消費(fèi)的時候如果指定*那就是此topic下的全部消息,我們可以指定前綴通配符,比如:

          //?這樣就只會消費(fèi)myTopic001下的tag為test-*開頭的消息。
          consumer.subscribe("myTopic001",?"test-*");

          //?代表訂閱Topic為myTopic001下的tag為TagA或TagB的所有消息
          consumer.subscribe("myTopic001",?"TagA||TagB");

          還支持SQL表達(dá)式過濾,不是很常用。不BB了。

          4、常見錯誤

          4.1、sendDefaultImpl call timeout

          4.1.1、異常

          Exception?in?thread?"main"?org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:?sendDefaultImpl?call?timeout
          ????at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:666)
          ????at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)
          ????at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1288)
          ????at?org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:324)
          ????at?com.chentongwei.mq.rocketmq.Producer.main(Producer.java:18)

          4.1.2、解決

          1.如果你是云服務(wù)器,首先檢查安全組是否允許9876這個端口訪問,是否開啟了防火墻,如果開啟了的話是否將9876映射了出去。

          2.修改配置文件broker.conf,加上:

          brokerIP1=我用的是阿里云服務(wù)器,這里是我的公網(wǎng)IP

          啟動namesrv和broker的時候加上本機(jī)IP(我用的是阿里云服務(wù)器,這里是我的公網(wǎng)IP):

          ./bin/mqnamesrv?-n?IP:9876
          ./bin/mqbroker?-n?IP:9876?-c?conf/broker.conf

          4.2、No route info of this topic

          4.2.1、異常

          Exception?in?thread?"main"?org.apache.rocketmq.client.exception.MQClientException:?No?route?info?of?this?topic:?myTopic001
          See?http://rocketmq.apache.org/docs/faq/?for?further?details.
          ????at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:684)
          ????at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)
          ????at?org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1288)
          ????at?org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:324)
          ????at?com.chentongwei.mq.rocketmq.Producer.main(Producer.java:18)

          4.2.2、解決

          很明顯發(fā)送成功了,不再是剛才的超時了,但是告訴我們沒有這個topic。那不能每次都手動創(chuàng)建呀,所以啟動broker的時候可以指定參數(shù)讓broker為我們自動創(chuàng)建。如下

          ./bin/mqbroker?-n?IP:9876?-c?conf/broker.conf?autoCreateTopicEnable=true


          推薦文章

          原創(chuàng)電子書

          歷時整整一年總結(jié)的?Java 面試 + Java 后端技術(shù)學(xué)習(xí)指南,這是本人這幾年及校招的總結(jié),各種高頻面試題已經(jīng)全部進(jìn)行總結(jié),按照章節(jié)復(fù)習(xí)即可,已經(jīng)拿到了大廠offer。

          原創(chuàng)思維導(dǎo)圖

          掃碼或者微信搜?程序員的技術(shù)圈子?回復(fù)?面試?領(lǐng)取原創(chuàng)電子書和思維導(dǎo)圖。


          瀏覽 81
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  操逼网在线视频 | 婷婷av网站 | 操逼一级大片 | 国产无遮挡裸体色视频免费观看 | 美女操逼免下载 |