<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ學(xué)習(xí):RabbitMQ的六種工作模式終結(jié)篇(四)

          共 26495字,需瀏覽 53分鐘

           ·

          2021-03-19 13:12


          來源:my.oschina.net/u/4115134/blog/3228182

          前言,在前面我講到了RabbitMQ的六種工作模式中簡單模式和工作模式,這里呢,我就一次性將剩下的四種--發(fā)布訂閱模式/路由模式/主題模式及Rpc異步調(diào)用模式,給大家進(jìn)行分析,講解一下,同時(shí)也給自己復(fù)習(xí)復(fù)習(xí)?。?!

          三、發(fā)布訂閱模式

          在前面的例子中,我們?nèi)蝿?wù)消息只交付給一個(gè)工作進(jìn)程。在這部分,我們將做一些完全不同的事情——我們將向多個(gè)消費(fèi)者傳遞同一條消息。這種模式稱為“發(fā)布/訂閱”。

          為了說明該模式,我們將構(gòu)建一個(gè)簡單的日志系統(tǒng)。它將由兩個(gè)程序組成——第一個(gè)程序?qū)l(fā)出日志消息,第二個(gè)程序接收它們。

          在我們的日志系統(tǒng)中,接收程序的每個(gè)運(yùn)行副本都將獲得消息。這樣,我們就可以運(yùn)行一個(gè)消費(fèi)者并將日志保存到磁盤; 同時(shí)我們可以運(yùn)行另一個(gè)消費(fèi)者在屏幕上打印日志。

          最終, 消息會(huì)被廣播到所有消息接受者。

          Exchanges 交換機(jī)

          RabbitMQ消息傳遞模型的核心思想是,生產(chǎn)者永遠(yuǎn)不會(huì)將任何消息直接發(fā)送到隊(duì)列。實(shí)際上,通常生產(chǎn)者甚至不知道消息是否會(huì)被傳遞到任何隊(duì)列。

          相反,生產(chǎn)者只能向交換機(jī)(Exchange)發(fā)送消息。交換機(jī)是一個(gè)非常簡單的東西。一邊接收來自生產(chǎn)者的消息,另一邊將消息推送到隊(duì)列。交換器必須確切地知道如何處理它接收到的消息。它應(yīng)該被添加到一個(gè)特定的隊(duì)列中嗎?它應(yīng)該添加到多個(gè)隊(duì)列中嗎?或者它應(yīng)該被丟棄。這些規(guī)則由exchange的類型定義。

          有幾種可用的交換類型:direct、topic、header和fanout。我們將關(guān)注最后一個(gè)——fanout。讓我們創(chuàng)建一個(gè)這種類型的交換機(jī),并稱之為 logs: ch.exchangeDeclare("logs", "fanout");

          fanout交換機(jī)非常簡單。它只是將接收到的所有消息廣播給它所知道的所有隊(duì)列。這正是我們的日志系統(tǒng)所需要的。

          我們前面使用的隊(duì)列具有特定的名稱(還記得hello和task_queue嗎?)能夠?yàn)殛?duì)列命名對(duì)我們來說至關(guān)重要——我們需要將工作進(jìn)程指向同一個(gè)隊(duì)列,在生產(chǎn)者和消費(fèi)者之間共享隊(duì)列。

          但日志記錄案例不是這種情況。我們想要接收所有的日志消息,而不僅僅是其中的一部分。我們還只對(duì)當(dāng)前的最新消息感興趣,而不是舊消息。

          要解決這個(gè)問題,我們需要兩件事。首先,每當(dāng)我們連接到Rabbitmq時(shí),我們需要一個(gè)新的空隊(duì)列。為此,我們可以創(chuàng)建一個(gè)具有隨機(jī)名稱的隊(duì)列,或者,更好的方法是讓服務(wù)器為我們選擇一個(gè)隨機(jī)隊(duì)列名稱。其次,一旦斷開與使用者的連接,隊(duì)列就會(huì)自動(dòng)刪除。在Java客戶端中,當(dāng)我們不向queueDeclare()提供任何參數(shù)時(shí),會(huì)創(chuàng)建一個(gè)具有生成名稱的、非持久的、獨(dú)占的、自動(dòng)刪除隊(duì)列

          //自動(dòng)生成隊(duì)列名
          //非持久,獨(dú)占,自動(dòng)刪除
          String queueName = ch.queueDeclare().getQueue();

          綁定Bindings

          我們已經(jīng)創(chuàng)建了一個(gè)fanout交換機(jī)和一個(gè)隊(duì)列?,F(xiàn)在我們需要告訴exchange向指定隊(duì)列發(fā)送消息。exchange和隊(duì)列之間的關(guān)系稱為綁定。

          //指定的隊(duì)列,與指定的交換機(jī)關(guān)聯(lián)起來
          //成為綁定 -- binding
          //第三個(gè)參數(shù)時(shí) routingKey, 由于是fanout交換機(jī), 這里忽略 routingKey
          ch.queueBind(queueName, "logs""");

          現(xiàn)在, logs交換機(jī)將會(huì)向我們指定的隊(duì)列添加消息

          列出綁定關(guān)系:

          rabbitmqctl list_bindings

          完成代碼實(shí)現(xiàn)

          生產(chǎn)者

          生產(chǎn)者發(fā)出日志消息,看起來與前一教程沒有太大不同。最重要的更改是,我們現(xiàn)在希望將消息發(fā)布到logs交換機(jī),而不是無名的日志交換機(jī)。我們需要在發(fā)送時(shí)提供一個(gè)routingKey,但是對(duì)于fanout交換機(jī)類型,該值會(huì)被忽略。

          package rabbitmq.publishsubscribe;

          import java.util.Scanner;

          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;

          public class Test1 {
           public static void main(String[] args) throws Exception {
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("192.168.64.140");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            
            Connection c = f.newConnection();
            Channel ch = c.createChannel();
            
            //定義名字為logs的交換機(jī),交換機(jī)類型為fanout
            //這一步是必須的,因?yàn)榻拱l(fā)布到不存在的交換。
            ch.exchangeDeclare("logs""fanout");
            
            while (true) {
             System.out.print("輸入消息: ");
             String msg = new Scanner(System.in).nextLine();
             if ("exit".equals(msg)) {
              break;
             }
             
             //第一個(gè)參數(shù),向指定的交換機(jī)發(fā)送消息
             //第二個(gè)參數(shù),不指定隊(duì)列,由消費(fèi)者向交換機(jī)綁定隊(duì)列
             //如果還沒有隊(duì)列綁定到交換器,消息就會(huì)丟失,
             //但這對(duì)我們來說沒有問題;即使沒有消費(fèi)者接收,我們也可以安全地丟棄這些信息。
             ch.basicPublish("logs"""null, msg.getBytes("UTF-8"));
             System.out.println("消息已發(fā)送: "+msg);
            }

            c.close();
           }
          }
          消費(fèi)者

          如果還沒有隊(duì)列綁定到交換器,消息就會(huì)丟失,但這對(duì)我們來說沒有問題;如果還沒有消費(fèi)者在聽,我們可以安全地丟棄這些信息。

          package rabbitmq.publishsubscribe;

          import java.io.IOException;

          import com.rabbitmq.client.CancelCallback;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;
          import com.rabbitmq.client.DeliverCallback;
          import com.rabbitmq.client.Delivery;

          public class Test2 {
           public static void main(String[] args) throws Exception {
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("192.168.64.140");
            f.setUsername("admin");
            f.setPassword("admin");
            Connection c = f.newConnection();
            Channel ch = c.createChannel();
            
            //定義名字為 logs 的交換機(jī), 它的類型是 fanout
            ch.exchangeDeclare("logs""fanout");
            
            //自動(dòng)生成對(duì)列名,
            //非持久,獨(dú)占,自動(dòng)刪除
            String queueName = ch.queueDeclare().getQueue();
            
            //把該隊(duì)列,綁定到 logs 交換機(jī)
            //對(duì)于 fanout 類型的交換機(jī), routingKey會(huì)被忽略,不允許null值
            ch.queueBind(queueName, "logs""");
            
            System.out.println("等待接收數(shù)據(jù)");
            
            //收到消息后用來處理消息的回調(diào)對(duì)象
            DeliverCallback callback = new DeliverCallback() {
             @Override
             public void handle(String consumerTag, Delivery message) throws IOException {
              String msg = new String(message.getBody(), "UTF-8");
              System.out.println("收到: "+msg);
             }
            };
            
            //消費(fèi)者取消時(shí)的回調(diào)對(duì)象
            CancelCallback cancel = new CancelCallback() {
             @Override
             public void handle(String consumerTag) throws IOException {
             }
            };
            
            ch.basicConsume(queueName, true, callback, cancel);
           }
          }

          四、路由模式

          在上一小節(jié),我們構(gòu)建了一個(gè)簡單的日志系統(tǒng)。我們能夠向多個(gè)接收者廣播日志消息。

          在這一節(jié),我們將向其添加一個(gè)特性—我們將只訂閱所有消息中的一部分。例如,我們只接收關(guān)鍵錯(cuò)誤消息并保存到日志文件(以節(jié)省磁盤空間),同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息。

          綁定 Bindings

          在上一節(jié),我們已經(jīng)創(chuàng)建了隊(duì)列與交換機(jī)的綁定。使用下面這樣的代碼:

          ch.queueBind(queueName, "logs""");

          綁定是交換機(jī)和隊(duì)列之間的關(guān)系。這可以簡單地理解為:隊(duì)列對(duì)來自此交換的消息感興趣。

          綁定可以使用額外的routingKey參數(shù)。為了避免與basic_publish參數(shù)混淆,我們將其稱為bindingKey。這是我們?nèi)绾蝿?chuàng)建一個(gè)鍵綁定:

          ch.queueBind(queueName, EXCHANGE_NAME, "black");

          bindingKey的含義取決于交換機(jī)類型。我們前面使用的fanout交換機(jī)完全忽略它。

          直連交換機(jī) Direct exchange

          上一節(jié)中的日志系統(tǒng)向所有消費(fèi)者廣播所有消息。我們希望擴(kuò)展它,允許根據(jù)消息的嚴(yán)重性過濾消息。例如,我們希望將日志消息寫入磁盤的程序只接收關(guān)鍵error,而不是在warning或info日志消息上浪費(fèi)磁盤空間。

          前面我們使用的是fanout交換機(jī),這并沒有給我們太多的靈活性——它只能進(jìn)行簡單的廣播。

          我們將用直連交換機(jī)(Direct exchange)代替。它背后的路由算法很簡單——消息傳遞到bindingKey與routingKey完全匹配的隊(duì)列。為了說明這一點(diǎn),請(qǐng)考慮以下設(shè)置

          其中我們可以看到直連交換機(jī)X,它綁定了兩個(gè)隊(duì)列。第一個(gè)隊(duì)列用綁定鍵orange綁定,第二個(gè)隊(duì)列有兩個(gè)綁定,一個(gè)綁定black,另一個(gè)綁定鍵green

          這樣設(shè)置,使用路由鍵orange發(fā)布到交換器的消息將被路由到隊(duì)列Q1。帶有blackgreen路由鍵的消息將轉(zhuǎn)到Q2。而所有其他消息都將被丟棄。

          多重綁定 Multiple bindings

          使用相同的bindingKey綁定多個(gè)隊(duì)列是完全允許的。如圖所示,可以使用binding key blackXQ1Q2綁定。在這種情況下,直連交換機(jī)的行為類似于fanout,并將消息廣播給所有匹配的隊(duì)列。一條路由鍵為black的消息將同時(shí)發(fā)送到Q1和Q2。

          發(fā)送日志

          我們將在日志系統(tǒng)中使用這個(gè)模型。我們把消息發(fā)送到一個(gè)Direct交換機(jī),而不是fanout。我們將提供日志級(jí)別作為routingKey。這樣,接收程序?qū)⒛軌蜻x擇它希望接收的級(jí)別。讓我們首先來看發(fā)出日志。

          和前面一樣,我們首先需要?jiǎng)?chuàng)建一個(gè)exchange:

          //參數(shù)1: 交換機(jī)名
          //參數(shù)2: 交換機(jī)類型
          ch.exchangeDeclare("direct_logs""direct");

          接著來看發(fā)送消息的代碼

          //參數(shù)1: 交換機(jī)名
          //參數(shù)2: routingKey, 路由鍵,這里我們用日志級(jí)別,如"error","info","warning"
          //參數(shù)3: 其他配置屬性
          //參數(shù)4: 發(fā)布的消息數(shù)據(jù) 
          ch.basicPublish("direct_logs""error"null, message.getBytes());

          訂閱

          接收消息的工作原理與前面章節(jié)一樣,但有一個(gè)例外——我們將為感興趣的每個(gè)日志級(jí)別創(chuàng)建一個(gè)新的綁定, 示例代碼如下:

          ch.queueBind(queueName, "logs""info");
          ch.queueBind(queueName, "logs""warning");

          最終代碼實(shí)現(xiàn)

          生產(chǎn)者
          package rabbitmq.routing;

          import java.util.Random;
          import java.util.Scanner;

          import com.rabbitmq.client.BuiltinExchangeType;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;

          public class Test1 {
           public static void main(String[] args) throws Exception {
            String[] a = {"warning""info""error"};
            
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("192.168.64.140");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            
            Connection c = f.newConnection();
            Channel ch = c.createChannel();
            
            //參數(shù)1: 交換機(jī)名
            //參數(shù)2: 交換機(jī)類型
            ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
            
            while (true) {
             System.out.print("輸入消息: ");
             String msg = new Scanner(System.in).nextLine();
             if ("exit".equals(msg)) {
              break;
             }
             
             //隨機(jī)產(chǎn)生日志級(jí)別
             String level = a[new Random().nextInt(a.length)];
             
             //參數(shù)1: 交換機(jī)名
             //參數(shù)2: routingKey, 路由鍵,這里我們用日志級(jí)別,如"error","info","warning"
             //參數(shù)3: 其他配置屬性
             //參數(shù)4: 發(fā)布的消息數(shù)據(jù) 
             ch.basicPublish("direct_logs", level, null, msg.getBytes());
             System.out.println("消息已發(fā)送: "+level+" - "+msg);
             
            }

            c.close();
           }
          }
          消費(fèi)者
          package rabbitmq.routing;

          import java.io.IOException;
          import java.util.Scanner;

          import com.rabbitmq.client.BuiltinExchangeType;
          import com.rabbitmq.client.CancelCallback;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;
          import com.rabbitmq.client.DeliverCallback;
          import com.rabbitmq.client.Delivery;

          public class Test2 {
           public static void main(String[] args) throws Exception {
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("192.168.64.140");
            f.setUsername("admin");
            f.setPassword("admin");
            Connection c = f.newConnection();
            Channel ch = c.createChannel();
            
            //定義名字為 direct_logs 的交換機(jī), 它的類型是 "direct"
            ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
            
            //自動(dòng)生成對(duì)列名,
            //非持久,獨(dú)占,自動(dòng)刪除
            String queueName = ch.queueDeclare().getQueue();
            
            System.out.println("輸入接收的日志級(jí)別,用空格隔開:");
            String[] a = new Scanner(System.in).nextLine().split("\\s");
            
            //把該隊(duì)列,綁定到 direct_logs 交換機(jī)
            //允許使用多個(gè) bindingKey
            for (String level : a) {
             ch.queueBind(queueName, "direct_logs", level);
            }
            
            System.out.println("等待接收數(shù)據(jù)");
            
            //收到消息后用來處理消息的回調(diào)對(duì)象
            DeliverCallback callback = new DeliverCallback() {
             @Override
             public void handle(String consumerTag, Delivery message) throws IOException {
              String msg = new String(message.getBody(), "UTF-8");
              String routingKey = message.getEnvelope().getRoutingKey();
              System.out.println("收到: "+routingKey+" - "+msg);
             }
            };
            
            //消費(fèi)者取消時(shí)的回調(diào)對(duì)象
            CancelCallback cancel = new CancelCallback() {
             @Override
             public void handle(String consumerTag) throws IOException {
             }
            };
            
            ch.basicConsume(queueName, true, callback, cancel);
           }
          }

          五、主題模式

          在上一小節(jié),我們改進(jìn)了日志系統(tǒng)。我們沒有使用只能進(jìn)行廣播的fanout交換機(jī),而是使用Direct交換機(jī),從而可以選擇性接收日志。

          雖然使用Direct交換機(jī)改進(jìn)了我們的系統(tǒng),但它仍然有局限性——它不能基于多個(gè)標(biāo)準(zhǔn)進(jìn)行路由。

          在我們的日志系統(tǒng)中,我們可能不僅希望根據(jù)級(jí)別訂閱日志,還希望根據(jù)發(fā)出日志的源訂閱日志。

          這將給我們帶來很大的靈活性——我們可能只想接收來自“cron”的關(guān)鍵錯(cuò)誤,但也要接收來自“kern”的所有日志。

          要在日志系統(tǒng)中實(shí)現(xiàn)這一點(diǎn),我們需要了解更復(fù)雜的Topic交換機(jī)。推薦:Java面試練題寶典

          主題交換機(jī) Topic exchange

          發(fā)送到Topic交換機(jī)的消息,它的的routingKey,必須是由點(diǎn)分隔的多個(gè)單詞。單詞可以是任何東西,但通常是與消息相關(guān)的一些特性。幾個(gè)有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的單詞,最多255個(gè)字節(jié)。

          bindingKey也必須采用相同的形式。Topic交換機(jī)的邏輯與直連交換機(jī)類似——使用特定routingKey發(fā)送的消息將被傳遞到所有使用匹配bindingKey綁定的隊(duì)列。bindingKey有兩個(gè)重要的特殊點(diǎn):

          • * 可以通配單個(gè)單詞。

          • # 可以通配零個(gè)或多個(gè)單詞。

          用一個(gè)例子來解釋這個(gè)問題是最簡單的

          在本例中,我們將發(fā)送描述動(dòng)物的消息。這些消息將使用由三個(gè)單詞(兩個(gè)點(diǎn))組成的routingKey發(fā)送。routingKey中的第一個(gè)單詞表示速度,第二個(gè)是顏色,第三個(gè)是物種:“<速度>.<顏色>.<物種>”。

          我們創(chuàng)建三個(gè)綁定:Q1與bindingKey “.orange.” 綁定。和Q2是 “*.*.rabbit” 和 “lazy.#” 。

          這些綁定可概括為:

          • Q1對(duì)所有橙色的動(dòng)物感興趣。

          • Q2想接收關(guān)于兔子和慢速動(dòng)物的所有消息。

          將routingKey設(shè)置為"quick.orange.rabbit"的消息將被發(fā)送到兩個(gè)隊(duì)列。消息 "lazy.orange.elephant“也發(fā)送到它們兩個(gè)。另外”quick.orange.fox“只會(huì)發(fā)到第一個(gè)隊(duì)列,”lazy.brown.fox“只發(fā)給第二個(gè)?!?code style="">lazy.pink.rabbit“將只被傳遞到第二個(gè)隊(duì)列一次,即使它匹配兩個(gè)綁定?!?code style="">quick.brown.fox"不匹配任何綁定,因此將被丟棄。

          如果我們違反約定,發(fā)送一個(gè)或四個(gè)單詞的信息,比如"orange“或”quick.orange.male.rabbit",會(huì)發(fā)生什么?這些消息將不匹配任何綁定,并將丟失。

          另外,"lazy.orange.male.rabbit",即使它有四個(gè)單詞,也將匹配最后一個(gè)綁定,并將被傳遞到第二個(gè)隊(duì)列。

          最終代碼實(shí)現(xiàn)

          生產(chǎn)者
          package rabbitmq.topic;

          import java.util.Random;
          import java.util.Scanner;

          import com.rabbitmq.client.BuiltinExchangeType;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;

          public class Test1 {
           public static void main(String[] args) throws Exception {
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("192.168.64.140");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            
            Connection c = f.newConnection();
            Channel ch = c.createChannel();
            
            //參數(shù)1: 交換機(jī)名
            //參數(shù)2: 交換機(jī)類型
            ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
            
            while (true) {
             System.out.print("輸入消息: ");
             String msg = new Scanner(System.in).nextLine();
             if ("exit".contentEquals(msg)) {
              break;
             }
             System.out.print("輸入routingKey: ");
             String routingKey = new Scanner(System.in).nextLine();
             
             //參數(shù)1: 交換機(jī)名
             //參數(shù)2: routingKey, 路由鍵,這里我們用日志級(jí)別,如"error","info","warning"
             //參數(shù)3: 其他配置屬性
             //參數(shù)4: 發(fā)布的消息數(shù)據(jù) 
             ch.basicPublish("topic_logs", routingKey, null, msg.getBytes());
             
             System.out.println("消息已發(fā)送: "+routingKey+" - "+msg);
            }

            c.close();
           }
          }
          消費(fèi)者
          package rabbitmq.topic;

          import java.io.IOException;
          import java.util.Scanner;

          import com.rabbitmq.client.BuiltinExchangeType;
          import com.rabbitmq.client.CancelCallback;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;
          import com.rabbitmq.client.DeliverCallback;
          import com.rabbitmq.client.Delivery;

          public class Test2 {
           public static void main(String[] args) throws Exception {
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("192.168.64.140");
            f.setUsername("admin");
            f.setPassword("admin");
            Connection c = f.newConnection();
            Channel ch = c.createChannel();
            
            ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
            
            //自動(dòng)生成對(duì)列名,
            //非持久,獨(dú)占,自動(dòng)刪除
            String queueName = ch.queueDeclare().getQueue();
            
            System.out.println("輸入bindingKey,用空格隔開:");
            String[] a = new Scanner(System.in).nextLine().split("\\s");
            
            //把該隊(duì)列,綁定到 topic_logs 交換機(jī)
            //允許使用多個(gè) bindingKey
            for (String bindingKey : a) {
             ch.queueBind(queueName, "topic_logs", bindingKey);
            }
            
            System.out.println("等待接收數(shù)據(jù)");
            
            //收到消息后用來處理消息的回調(diào)對(duì)象
            DeliverCallback callback = new DeliverCallback() {
             @Override
             public void handle(String consumerTag, Delivery message) throws IOException {
              String msg = new String(message.getBody(), "UTF-8");
              String routingKey = message.getEnvelope().getRoutingKey();
              System.out.println("收到: "+routingKey+" - "+msg);
             }
            };
            
            //消費(fèi)者取消時(shí)的回調(diào)對(duì)象
            CancelCallback cancel = new CancelCallback() {
             @Override
             public void handle(String consumerTag) throws IOException {
             }
            };
            
            ch.basicConsume(queueName, true, callback, cancel);
           }
          }

          六、RPC模式

          客戶端

          在客戶端定義一個(gè)RPCClient類,并定義一個(gè)call()方法,這個(gè)方法發(fā)送一個(gè)RPC請(qǐng)求,并等待接收響應(yīng)結(jié)果

          RPCClient client = new RPCClient();
          String result = client.call("4");
          System.out.println( "第四個(gè)斐波那契數(shù)是: " + result);

          回調(diào)隊(duì)列 Callback Queue

          使用RabbitMQ去實(shí)現(xiàn)RPC很容易。一個(gè)客戶端發(fā)送請(qǐng)求信息,并得到一個(gè)服務(wù)器端回復(fù)的響應(yīng)信息。為了得到響應(yīng)信息,我們需要在請(qǐng)求的時(shí)候發(fā)送一個(gè)“回調(diào)”隊(duì)列地址。我們可以使用默認(rèn)隊(duì)列。下面是示例代碼:

          //定義回調(diào)隊(duì)列,
          //自動(dòng)生成對(duì)列名,非持久,獨(dú)占,自動(dòng)刪除
          callbackQueueName = ch.queueDeclare().getQueue();

          //用來設(shè)置回調(diào)隊(duì)列的參數(shù)對(duì)象
          BasicProperties props = new BasicProperties
                                      .Builder()
                                      .replyTo(callbackQueueName)
                                      .build();
          //發(fā)送調(diào)用消息
          ch.basicPublish("""rpc_queue", props, message.getBytes());

          消息屬性 Message Properties

          AMQP 0-9-1協(xié)議定義了消息的14個(gè)屬性。大部分屬性很少使用,下面是比較常用的4個(gè):

          • deliveryMode:將消息標(biāo)記為持久化(值為2)或非持久化(任何其他值)。

          • contentType:用于描述mime類型。例如,對(duì)于經(jīng)常使用的JSON格式,將此屬性設(shè)置為:application/json。

          • replyTo:通常用于指定回調(diào)隊(duì)列。

          • correlationId:將RPC響應(yīng)與請(qǐng)求關(guān)聯(lián)起來非常有用。

          關(guān)聯(lián)id (correlationId):

          在上面的代碼中,我們會(huì)為每個(gè)RPC請(qǐng)求創(chuàng)建一個(gè)回調(diào)隊(duì)列。這是非常低效的,這里還有一個(gè)更好的方法:讓我們?yōu)槊總€(gè)客戶端創(chuàng)建一個(gè)回調(diào)隊(duì)列。

          這就提出了一個(gè)新的問題,在隊(duì)列中得到一個(gè)響應(yīng)時(shí),我們不清楚這個(gè)響應(yīng)所對(duì)應(yīng)的是哪一條請(qǐng)求。這時(shí)候就需要使用關(guān)聯(lián)id(correlationId)。我們將為每一條請(qǐng)求設(shè)置唯一的的id值。推薦:Java面試練題寶典

          稍后,當(dāng)我們?cè)诨卣{(diào)隊(duì)列里收到一條消息的時(shí)候,我們將查看它的id屬性,這樣我們就可以匹配對(duì)應(yīng)的請(qǐng)求和響應(yīng)。如果我們發(fā)現(xiàn)了一個(gè)未知的id值,我們可以安全的丟棄這條消息,因?yàn)樗粚儆谖覀兊恼?qǐng)求。

          最終實(shí)現(xiàn)代碼

          RPC的工作方式是這樣的:
          • 對(duì)于RPC請(qǐng)求,客戶端發(fā)送一條帶有兩個(gè)屬性的消息:replyTo,設(shè)置為僅為請(qǐng)求創(chuàng)建的匿名獨(dú)占隊(duì)列,和correlationId,設(shè)置為每個(gè)請(qǐng)求的惟一id值。

          • 請(qǐng)求被發(fā)送到rpc_queue隊(duì)列。

          • RPC工作進(jìn)程(即:服務(wù)器)在隊(duì)列上等待請(qǐng)求。當(dāng)一個(gè)請(qǐng)求出現(xiàn)時(shí),它執(zhí)行任務(wù),并使用replyTo字段中的隊(duì)列將結(jié)果發(fā)回客戶機(jī)。

          • 客戶機(jī)在回應(yīng)消息隊(duì)列上等待數(shù)據(jù)。當(dāng)消息出現(xiàn)時(shí),它檢查correlationId屬性。如果匹配請(qǐng)求中的值,則向程序返回該響應(yīng)數(shù)據(jù)。

          服務(wù)器端
          package rabbitmq.rpc;

          import java.io.IOException;
          import java.util.Random;
          import java.util.Scanner;

          import com.rabbitmq.client.AMQP;
          import com.rabbitmq.client.BuiltinExchangeType;
          import com.rabbitmq.client.CancelCallback;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;
          import com.rabbitmq.client.DeliverCallback;
          import com.rabbitmq.client.Delivery;
          import com.rabbitmq.client.AMQP.BasicProperties;

          public class RPCServer {
           public static void main(String[] args) throws Exception {
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("192.168.64.140");
            f.setPort(5672);
            f.setUsername("admin");
            f.setPassword("admin");
            
            Connection c = f.newConnection();
            Channel ch = c.createChannel();
            /*
             * 定義隊(duì)列 rpc_queue, 將從它接收請(qǐng)求信息
             * 
             * 參數(shù):
             * 1. queue, 對(duì)列名
             * 2. durable, 持久化
             * 3. exclusive, 排他
             * 4. autoDelete, 自動(dòng)刪除
             * 5. arguments, 其他參數(shù)屬性
             */

            ch.queueDeclare("rpc_queue",false,false,false,null);
            ch.queuePurge("rpc_queue");//清除隊(duì)列中的內(nèi)容
            
            ch.basicQos(1);//一次只接收一條消息
            
            
            //收到請(qǐng)求消息后的回調(diào)對(duì)象
            DeliverCallback deliverCallback = new DeliverCallback() {
             @Override
             public void handle(String consumerTag, Delivery message) throws IOException {
              //處理收到的數(shù)據(jù)(要求第幾個(gè)斐波那契數(shù))
              String msg = new String(message.getBody(), "UTF-8");
              int n = Integer.parseInt(msg);
              //求出第n個(gè)斐波那契數(shù)
              int r = fbnq(n);
              String response = String.valueOf(r);
              
              //設(shè)置發(fā)回響應(yīng)的id, 與請(qǐng)求id一致, 這樣客戶端可以把該響應(yīng)與它的請(qǐng)求進(jìn)行對(duì)應(yīng)
              BasicProperties replyProps = new BasicProperties.Builder()
                .correlationId(message.getProperties().getCorrelationId())
                .build();
              /*
               * 發(fā)送響應(yīng)消息
               * 1. 默認(rèn)交換機(jī)
               * 2. 由客戶端指定的,用來傳遞響應(yīng)消息的隊(duì)列名
               * 3. 參數(shù)(關(guān)聯(lián)id)
               * 4. 發(fā)回的響應(yīng)消息
               */

              ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
              //發(fā)送確認(rèn)消息
              ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
             }
            };
            
            //
            CancelCallback cancelCallback = new CancelCallback() {
             @Override
             public void handle(String consumerTag) throws IOException {
             }
            };
            
            //消費(fèi)者開始接收消息, 等待從 rpc_queue接收請(qǐng)求消息, 不自動(dòng)確認(rèn)
            ch.basicConsume("rpc_queue"false, deliverCallback, cancelCallback);
           }

           protected static int fbnq(int n) {
            if(n == 1 || n == 2return 1;
            
            return fbnq(n-1)+fbnq(n-2);
           }
          }
          客戶端
          package rabbitmq.rpc;

          import java.io.IOException;
          import java.util.Scanner;
          import java.util.UUID;
          import java.util.concurrent.ArrayBlockingQueue;
          import java.util.concurrent.BlockingQueue;

          import com.rabbitmq.client.BuiltinExchangeType;
          import com.rabbitmq.client.CancelCallback;
          import com.rabbitmq.client.Channel;
          import com.rabbitmq.client.Connection;
          import com.rabbitmq.client.ConnectionFactory;
          import com.rabbitmq.client.DeliverCallback;
          import com.rabbitmq.client.Delivery;
          import com.rabbitmq.client.AMQP.BasicProperties;

          public class RPCClient {
           Connection con;
           Channel ch;
           
           public RPCClient() throws Exception {
            ConnectionFactory f = new ConnectionFactory();
            f.setHost("192.168.64.140");
            f.setUsername("admin");
            f.setPassword("admin");
            con = f.newConnection();
            ch = con.createChannel();
           }
           
           public String call(String msg) throws Exception {
            //自動(dòng)生成對(duì)列名,非持久,獨(dú)占,自動(dòng)刪除
            String replyQueueName = ch.queueDeclare().getQueue();
            //生成關(guān)聯(lián)id
            String corrId = UUID.randomUUID().toString();
            
            //設(shè)置兩個(gè)參數(shù):
            //1. 請(qǐng)求和響應(yīng)的關(guān)聯(lián)id
            //2. 傳遞響應(yīng)數(shù)據(jù)的queue
            BasicProperties props = new BasicProperties.Builder()
              .correlationId(corrId)
              .replyTo(replyQueueName)
              .build();
            //向 rpc_queue 隊(duì)列發(fā)送請(qǐng)求數(shù)據(jù), 請(qǐng)求第n個(gè)斐波那契數(shù)
            ch.basicPublish("""rpc_queue", props, msg.getBytes("UTF-8"));
            
            //用來保存結(jié)果的阻塞集合,取數(shù)據(jù)時(shí),沒有數(shù)據(jù)會(huì)暫停等待
            BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
            
            //接收響應(yīng)數(shù)據(jù)的回調(diào)對(duì)象
            DeliverCallback deliverCallback = new DeliverCallback() {
             @Override
             public void handle(String consumerTag, Delivery message) throws IOException {
              //如果響應(yīng)消息的關(guān)聯(lián)id,與請(qǐng)求的關(guān)聯(lián)id相同,我們來處理這個(gè)響應(yīng)數(shù)據(jù)
              if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
               //把收到的響應(yīng)數(shù)據(jù),放入阻塞集合
               response.offer(new String(message.getBody(), "UTF-8"));
              }
             }
            };

            CancelCallback cancelCallback = new CancelCallback() {
             @Override
             public void handle(String consumerTag) throws IOException {
             }
            };
            
            //開始從隊(duì)列接收響應(yīng)數(shù)據(jù)
            ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
            //返回保存在集合中的響應(yīng)數(shù)據(jù)
            return response.take();
           }
           
           public static void main(String[] args) throws Exception {
            RPCClient client = new RPCClient();
            while (true) {
             System.out.print("求第幾個(gè)斐波那契數(shù):");
             int n = new Scanner(System.in).nextInt();
             String r = client.call(""+n);
             System.out.println(r);
            }
           }
          }

          七、RabbitMQ六種工作模式總結(jié):

          最近給大家找了 mysql入門到精通


          資源,怎么領(lǐng)?。?/span>


          掃二維碼為,加我微信,回復(fù):mysql入門到精通

           注意,不要亂回復(fù) 


          沒錯(cuò),不是機(jī)器人
          記得一定要等待,等待才有好東西
          瀏覽 61
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  中文最新天堂8√ | 成人91AV网 | 黑人大鷄巴A片 | 日本在线视频精品 | avav手机在线 |