RabbitMQ學(xué)習(xí):RabbitMQ的六種工作模式終結(jié)篇(四)
來源:my.oschina.net/u/4115134/blog/3228182
前言,在前面我講到了RabbitMQ的六種工作模式中簡單模式和工作模式,這里呢,我就一次性將剩下的四種--發(fā)布訂閱模式/路由模式/主題模式及Rpc異步調(diào)用模式,給大家進(jìn)行分析,講解一下,同時(shí)也給自己復(fù)習(xí)復(fù)習(xí)?。?!
三、發(fā)布訂閱模式


在前面的例子中,我們?nèi)蝿?wù)消息只交付給一個(gè)工作進(jìn)程。在這部分,我們將做一些完全不同的事情——我們將向多個(gè)消費(fèi)者傳遞同一條消息。這種模式稱為“發(fā)布/訂閱”。
為了說明該模式,我們將構(gòu)建一個(gè)簡單的日志系統(tǒng)。它將由兩個(gè)程序組成——第一個(gè)程序?qū)l(fā)出日志消息,第二個(gè)程序接收它們。
在我們的日志系統(tǒng)中,接收程序的每個(gè)運(yùn)行副本都將獲得消息。這樣,我們就可以運(yùn)行一個(gè)消費(fèi)者并將日志保存到磁盤; 同時(shí)我們可以運(yùn)行另一個(gè)消費(fèi)者在屏幕上打印日志。
最終, 消息會(huì)被廣播到所有消息接受者。
Exchanges 交換機(jī)
RabbitMQ消息傳遞模型的核心思想是,生產(chǎn)者永遠(yuǎn)不會(huì)將任何消息直接發(fā)送到隊(duì)列。實(shí)際上,通常生產(chǎn)者甚至不知道消息是否會(huì)被傳遞到任何隊(duì)列。
相反,生產(chǎn)者只能向交換機(jī)(Exchange)發(fā)送消息。交換機(jī)是一個(gè)非常簡單的東西。一邊接收來自生產(chǎn)者的消息,另一邊將消息推送到隊(duì)列。交換器必須確切地知道如何處理它接收到的消息。它應(yīng)該被添加到一個(gè)特定的隊(duì)列中嗎?它應(yīng)該添加到多個(gè)隊(duì)列中嗎?或者它應(yīng)該被丟棄。這些規(guī)則由exchange的類型定義。
有幾種可用的交換類型:direct、topic、header和fanout。我們將關(guān)注最后一個(gè)——fanout。讓我們創(chuàng)建一個(gè)這種類型的交換機(jī),并稱之為 logs: ch.exchangeDeclare("logs", "fanout");
fanout交換機(jī)非常簡單。它只是將接收到的所有消息廣播給它所知道的所有隊(duì)列。這正是我們的日志系統(tǒng)所需要的。
我們前面使用的隊(duì)列具有特定的名稱(還記得hello和task_queue嗎?)能夠?yàn)殛?duì)列命名對(duì)我們來說至關(guān)重要——我們需要將工作進(jìn)程指向同一個(gè)隊(duì)列,在生產(chǎn)者和消費(fèi)者之間共享隊(duì)列。
但日志記錄案例不是這種情況。我們想要接收所有的日志消息,而不僅僅是其中的一部分。我們還只對(duì)當(dāng)前的最新消息感興趣,而不是舊消息。
要解決這個(gè)問題,我們需要兩件事。首先,每當(dāng)我們連接到Rabbitmq時(shí),我們需要一個(gè)新的空隊(duì)列。為此,我們可以創(chuàng)建一個(gè)具有隨機(jī)名稱的隊(duì)列,或者,更好的方法是讓服務(wù)器為我們選擇一個(gè)隨機(jī)隊(duì)列名稱。其次,一旦斷開與使用者的連接,隊(duì)列就會(huì)自動(dòng)刪除。在Java客戶端中,當(dāng)我們不向queueDeclare()提供任何參數(shù)時(shí),會(huì)創(chuàng)建一個(gè)具有生成名稱的、非持久的、獨(dú)占的、自動(dòng)刪除隊(duì)列
//自動(dòng)生成隊(duì)列名
//非持久,獨(dú)占,自動(dòng)刪除
String queueName = ch.queueDeclare().getQueue();
綁定Bindings

我們已經(jīng)創(chuàng)建了一個(gè)fanout交換機(jī)和一個(gè)隊(duì)列?,F(xiàn)在我們需要告訴exchange向指定隊(duì)列發(fā)送消息。exchange和隊(duì)列之間的關(guān)系稱為綁定。
//指定的隊(duì)列,與指定的交換機(jī)關(guān)聯(lián)起來
//成為綁定 -- binding
//第三個(gè)參數(shù)時(shí) routingKey, 由于是fanout交換機(jī), 這里忽略 routingKey
ch.queueBind(queueName, "logs", "");
現(xiàn)在, logs交換機(jī)將會(huì)向我們指定的隊(duì)列添加消息
列出綁定關(guān)系:
rabbitmqctl list_bindings
完成代碼實(shí)現(xiàn)

生產(chǎn)者
生產(chǎn)者發(fā)出日志消息,看起來與前一教程沒有太大不同。最重要的更改是,我們現(xiàn)在希望將消息發(fā)布到logs交換機(jī),而不是無名的日志交換機(jī)。我們需要在發(fā)送時(shí)提供一個(gè)routingKey,但是對(duì)于fanout交換機(jī)類型,該值會(huì)被忽略。
package rabbitmq.publishsubscribe;
import java.util.Scanner;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Test1 {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//定義名字為logs的交換機(jī),交換機(jī)類型為fanout
//這一步是必須的,因?yàn)榻拱l(fā)布到不存在的交換。
ch.exchangeDeclare("logs", "fanout");
while (true) {
System.out.print("輸入消息: ");
String msg = new Scanner(System.in).nextLine();
if ("exit".equals(msg)) {
break;
}
//第一個(gè)參數(shù),向指定的交換機(jī)發(fā)送消息
//第二個(gè)參數(shù),不指定隊(duì)列,由消費(fèi)者向交換機(jī)綁定隊(duì)列
//如果還沒有隊(duì)列綁定到交換器,消息就會(huì)丟失,
//但這對(duì)我們來說沒有問題;即使沒有消費(fèi)者接收,我們也可以安全地丟棄這些信息。
ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
System.out.println("消息已發(fā)送: "+msg);
}
c.close();
}
}
消費(fèi)者
如果還沒有隊(duì)列綁定到交換器,消息就會(huì)丟失,但這對(duì)我們來說沒有問題;如果還沒有消費(fèi)者在聽,我們可以安全地丟棄這些信息。
package rabbitmq.publishsubscribe;
import java.io.IOException;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Test2 {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//定義名字為 logs 的交換機(jī), 它的類型是 fanout
ch.exchangeDeclare("logs", "fanout");
//自動(dòng)生成對(duì)列名,
//非持久,獨(dú)占,自動(dòng)刪除
String queueName = ch.queueDeclare().getQueue();
//把該隊(duì)列,綁定到 logs 交換機(jī)
//對(duì)于 fanout 類型的交換機(jī), routingKey會(huì)被忽略,不允許null值
ch.queueBind(queueName, "logs", "");
System.out.println("等待接收數(shù)據(jù)");
//收到消息后用來處理消息的回調(diào)對(duì)象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: "+msg);
}
};
//消費(fèi)者取消時(shí)的回調(diào)對(duì)象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
ch.basicConsume(queueName, true, callback, cancel);
}
}
四、路由模式


在上一小節(jié),我們構(gòu)建了一個(gè)簡單的日志系統(tǒng)。我們能夠向多個(gè)接收者廣播日志消息。
在這一節(jié),我們將向其添加一個(gè)特性—我們將只訂閱所有消息中的一部分。例如,我們只接收關(guān)鍵錯(cuò)誤消息并保存到日志文件(以節(jié)省磁盤空間),同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息。
綁定 Bindings
在上一節(jié),我們已經(jīng)創(chuàng)建了隊(duì)列與交換機(jī)的綁定。使用下面這樣的代碼:
ch.queueBind(queueName, "logs", "");
綁定是交換機(jī)和隊(duì)列之間的關(guān)系。這可以簡單地理解為:隊(duì)列對(duì)來自此交換的消息感興趣。
綁定可以使用額外的routingKey參數(shù)。為了避免與basic_publish參數(shù)混淆,我們將其稱為bindingKey。這是我們?nèi)绾蝿?chuàng)建一個(gè)鍵綁定:
ch.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey的含義取決于交換機(jī)類型。我們前面使用的fanout交換機(jī)完全忽略它。
直連交換機(jī) Direct exchange
上一節(jié)中的日志系統(tǒng)向所有消費(fèi)者廣播所有消息。我們希望擴(kuò)展它,允許根據(jù)消息的嚴(yán)重性過濾消息。例如,我們希望將日志消息寫入磁盤的程序只接收關(guān)鍵error,而不是在warning或info日志消息上浪費(fèi)磁盤空間。
前面我們使用的是fanout交換機(jī),這并沒有給我們太多的靈活性——它只能進(jìn)行簡單的廣播。
我們將用直連交換機(jī)(Direct exchange)代替。它背后的路由算法很簡單——消息傳遞到bindingKey與routingKey完全匹配的隊(duì)列。為了說明這一點(diǎn),請(qǐng)考慮以下設(shè)置

其中我們可以看到直連交換機(jī)X,它綁定了兩個(gè)隊(duì)列。第一個(gè)隊(duì)列用綁定鍵orange綁定,第二個(gè)隊(duì)列有兩個(gè)綁定,一個(gè)綁定black,另一個(gè)綁定鍵green。
這樣設(shè)置,使用路由鍵orange發(fā)布到交換器的消息將被路由到隊(duì)列Q1。帶有black或green路由鍵的消息將轉(zhuǎn)到Q2。而所有其他消息都將被丟棄。
多重綁定 Multiple bindings

使用相同的bindingKey綁定多個(gè)隊(duì)列是完全允許的。如圖所示,可以使用binding key black將X與Q1和Q2綁定。在這種情況下,直連交換機(jī)的行為類似于fanout,并將消息廣播給所有匹配的隊(duì)列。一條路由鍵為black的消息將同時(shí)發(fā)送到Q1和Q2。
發(fā)送日志
我們將在日志系統(tǒng)中使用這個(gè)模型。我們把消息發(fā)送到一個(gè)Direct交換機(jī),而不是fanout。我們將提供日志級(jí)別作為routingKey。這樣,接收程序?qū)⒛軌蜻x擇它希望接收的級(jí)別。讓我們首先來看發(fā)出日志。
和前面一樣,我們首先需要?jiǎng)?chuàng)建一個(gè)exchange:
//參數(shù)1: 交換機(jī)名
//參數(shù)2: 交換機(jī)類型
ch.exchangeDeclare("direct_logs", "direct");
接著來看發(fā)送消息的代碼
//參數(shù)1: 交換機(jī)名
//參數(shù)2: routingKey, 路由鍵,這里我們用日志級(jí)別,如"error","info","warning"
//參數(shù)3: 其他配置屬性
//參數(shù)4: 發(fā)布的消息數(shù)據(jù)
ch.basicPublish("direct_logs", "error", null, message.getBytes());
訂閱
接收消息的工作原理與前面章節(jié)一樣,但有一個(gè)例外——我們將為感興趣的每個(gè)日志級(jí)別創(chuàng)建一個(gè)新的綁定, 示例代碼如下:
ch.queueBind(queueName, "logs", "info");
ch.queueBind(queueName, "logs", "warning");
最終代碼實(shí)現(xiàn)

生產(chǎn)者
package rabbitmq.routing;
import java.util.Random;
import java.util.Scanner;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Test1 {
public static void main(String[] args) throws Exception {
String[] a = {"warning", "info", "error"};
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//參數(shù)1: 交換機(jī)名
//參數(shù)2: 交換機(jī)類型
ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
while (true) {
System.out.print("輸入消息: ");
String msg = new Scanner(System.in).nextLine();
if ("exit".equals(msg)) {
break;
}
//隨機(jī)產(chǎn)生日志級(jí)別
String level = a[new Random().nextInt(a.length)];
//參數(shù)1: 交換機(jī)名
//參數(shù)2: routingKey, 路由鍵,這里我們用日志級(jí)別,如"error","info","warning"
//參數(shù)3: 其他配置屬性
//參數(shù)4: 發(fā)布的消息數(shù)據(jù)
ch.basicPublish("direct_logs", level, null, msg.getBytes());
System.out.println("消息已發(fā)送: "+level+" - "+msg);
}
c.close();
}
}
消費(fèi)者
package rabbitmq.routing;
import java.io.IOException;
import java.util.Scanner;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Test2 {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//定義名字為 direct_logs 的交換機(jī), 它的類型是 "direct"
ch.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
//自動(dòng)生成對(duì)列名,
//非持久,獨(dú)占,自動(dòng)刪除
String queueName = ch.queueDeclare().getQueue();
System.out.println("輸入接收的日志級(jí)別,用空格隔開:");
String[] a = new Scanner(System.in).nextLine().split("\\s");
//把該隊(duì)列,綁定到 direct_logs 交換機(jī)
//允許使用多個(gè) bindingKey
for (String level : a) {
ch.queueBind(queueName, "direct_logs", level);
}
System.out.println("等待接收數(shù)據(jù)");
//收到消息后用來處理消息的回調(diào)對(duì)象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
String routingKey = message.getEnvelope().getRoutingKey();
System.out.println("收到: "+routingKey+" - "+msg);
}
};
//消費(fèi)者取消時(shí)的回調(diào)對(duì)象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
ch.basicConsume(queueName, true, callback, cancel);
}
}
五、主題模式

在上一小節(jié),我們改進(jìn)了日志系統(tǒng)。我們沒有使用只能進(jìn)行廣播的fanout交換機(jī),而是使用Direct交換機(jī),從而可以選擇性接收日志。
雖然使用Direct交換機(jī)改進(jìn)了我們的系統(tǒng),但它仍然有局限性——它不能基于多個(gè)標(biāo)準(zhǔn)進(jìn)行路由。
在我們的日志系統(tǒng)中,我們可能不僅希望根據(jù)級(jí)別訂閱日志,還希望根據(jù)發(fā)出日志的源訂閱日志。
這將給我們帶來很大的靈活性——我們可能只想接收來自“cron”的關(guān)鍵錯(cuò)誤,但也要接收來自“kern”的所有日志。
要在日志系統(tǒng)中實(shí)現(xiàn)這一點(diǎn),我們需要了解更復(fù)雜的Topic交換機(jī)。推薦:Java面試練題寶典
主題交換機(jī) Topic exchange
發(fā)送到Topic交換機(jī)的消息,它的的routingKey,必須是由點(diǎn)分隔的多個(gè)單詞。單詞可以是任何東西,但通常是與消息相關(guān)的一些特性。幾個(gè)有效的routingKey示例:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。routingKey可以有任意多的單詞,最多255個(gè)字節(jié)。
bindingKey也必須采用相同的形式。Topic交換機(jī)的邏輯與直連交換機(jī)類似——使用特定routingKey發(fā)送的消息將被傳遞到所有使用匹配bindingKey綁定的隊(duì)列。bindingKey有兩個(gè)重要的特殊點(diǎn):
*可以通配單個(gè)單詞。#可以通配零個(gè)或多個(gè)單詞。
用一個(gè)例子來解釋這個(gè)問題是最簡單的

在本例中,我們將發(fā)送描述動(dòng)物的消息。這些消息將使用由三個(gè)單詞(兩個(gè)點(diǎn))組成的routingKey發(fā)送。routingKey中的第一個(gè)單詞表示速度,第二個(gè)是顏色,第三個(gè)是物種:“<速度>.<顏色>.<物種>”。
我們創(chuàng)建三個(gè)綁定:Q1與bindingKey “.orange.” 綁定。和Q2是 “*.*.rabbit” 和 “lazy.#” 。
這些綁定可概括為:
Q1對(duì)所有橙色的動(dòng)物感興趣。
Q2想接收關(guān)于兔子和慢速動(dòng)物的所有消息。
將routingKey設(shè)置為"quick.orange.rabbit"的消息將被發(fā)送到兩個(gè)隊(duì)列。消息 "lazy.orange.elephant“也發(fā)送到它們兩個(gè)。另外”quick.orange.fox“只會(huì)發(fā)到第一個(gè)隊(duì)列,”lazy.brown.fox“只發(fā)給第二個(gè)?!?code style="">lazy.pink.rabbit“將只被傳遞到第二個(gè)隊(duì)列一次,即使它匹配兩個(gè)綁定?!?code style="">quick.brown.fox"不匹配任何綁定,因此將被丟棄。
如果我們違反約定,發(fā)送一個(gè)或四個(gè)單詞的信息,比如"orange“或”quick.orange.male.rabbit",會(huì)發(fā)生什么?這些消息將不匹配任何綁定,并將丟失。
另外,"lazy.orange.male.rabbit",即使它有四個(gè)單詞,也將匹配最后一個(gè)綁定,并將被傳遞到第二個(gè)隊(duì)列。
最終代碼實(shí)現(xiàn)
生產(chǎn)者
package rabbitmq.topic;
import java.util.Random;
import java.util.Scanner;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Test1 {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//參數(shù)1: 交換機(jī)名
//參數(shù)2: 交換機(jī)類型
ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
while (true) {
System.out.print("輸入消息: ");
String msg = new Scanner(System.in).nextLine();
if ("exit".contentEquals(msg)) {
break;
}
System.out.print("輸入routingKey: ");
String routingKey = new Scanner(System.in).nextLine();
//參數(shù)1: 交換機(jī)名
//參數(shù)2: routingKey, 路由鍵,這里我們用日志級(jí)別,如"error","info","warning"
//參數(shù)3: 其他配置屬性
//參數(shù)4: 發(fā)布的消息數(shù)據(jù)
ch.basicPublish("topic_logs", routingKey, null, msg.getBytes());
System.out.println("消息已發(fā)送: "+routingKey+" - "+msg);
}
c.close();
}
}
消費(fèi)者
package rabbitmq.topic;
import java.io.IOException;
import java.util.Scanner;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Test2 {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
ch.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
//自動(dòng)生成對(duì)列名,
//非持久,獨(dú)占,自動(dòng)刪除
String queueName = ch.queueDeclare().getQueue();
System.out.println("輸入bindingKey,用空格隔開:");
String[] a = new Scanner(System.in).nextLine().split("\\s");
//把該隊(duì)列,綁定到 topic_logs 交換機(jī)
//允許使用多個(gè) bindingKey
for (String bindingKey : a) {
ch.queueBind(queueName, "topic_logs", bindingKey);
}
System.out.println("等待接收數(shù)據(jù)");
//收到消息后用來處理消息的回調(diào)對(duì)象
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String msg = new String(message.getBody(), "UTF-8");
String routingKey = message.getEnvelope().getRoutingKey();
System.out.println("收到: "+routingKey+" - "+msg);
}
};
//消費(fèi)者取消時(shí)的回調(diào)對(duì)象
CancelCallback cancel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
ch.basicConsume(queueName, true, callback, cancel);
}
}
六、RPC模式

客戶端
在客戶端定義一個(gè)RPCClient類,并定義一個(gè)call()方法,這個(gè)方法發(fā)送一個(gè)RPC請(qǐng)求,并等待接收響應(yīng)結(jié)果
RPCClient client = new RPCClient();
String result = client.call("4");
System.out.println( "第四個(gè)斐波那契數(shù)是: " + result);
回調(diào)隊(duì)列 Callback Queue
使用RabbitMQ去實(shí)現(xiàn)RPC很容易。一個(gè)客戶端發(fā)送請(qǐng)求信息,并得到一個(gè)服務(wù)器端回復(fù)的響應(yīng)信息。為了得到響應(yīng)信息,我們需要在請(qǐng)求的時(shí)候發(fā)送一個(gè)“回調(diào)”隊(duì)列地址。我們可以使用默認(rèn)隊(duì)列。下面是示例代碼:
//定義回調(diào)隊(duì)列,
//自動(dòng)生成對(duì)列名,非持久,獨(dú)占,自動(dòng)刪除
callbackQueueName = ch.queueDeclare().getQueue();
//用來設(shè)置回調(diào)隊(duì)列的參數(shù)對(duì)象
BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();
//發(fā)送調(diào)用消息
ch.basicPublish("", "rpc_queue", props, message.getBytes());
消息屬性 Message Properties
AMQP 0-9-1協(xié)議定義了消息的14個(gè)屬性。大部分屬性很少使用,下面是比較常用的4個(gè):
deliveryMode:將消息標(biāo)記為持久化(值為2)或非持久化(任何其他值)。
contentType:用于描述mime類型。例如,對(duì)于經(jīng)常使用的JSON格式,將此屬性設(shè)置為:application/json。
replyTo:通常用于指定回調(diào)隊(duì)列。
correlationId:將RPC響應(yīng)與請(qǐng)求關(guān)聯(lián)起來非常有用。
關(guān)聯(lián)id (correlationId):
在上面的代碼中,我們會(huì)為每個(gè)RPC請(qǐng)求創(chuàng)建一個(gè)回調(diào)隊(duì)列。這是非常低效的,這里還有一個(gè)更好的方法:讓我們?yōu)槊總€(gè)客戶端創(chuàng)建一個(gè)回調(diào)隊(duì)列。
這就提出了一個(gè)新的問題,在隊(duì)列中得到一個(gè)響應(yīng)時(shí),我們不清楚這個(gè)響應(yīng)所對(duì)應(yīng)的是哪一條請(qǐng)求。這時(shí)候就需要使用關(guān)聯(lián)id(correlationId)。我們將為每一條請(qǐng)求設(shè)置唯一的的id值。推薦:Java面試練題寶典
稍后,當(dāng)我們?cè)诨卣{(diào)隊(duì)列里收到一條消息的時(shí)候,我們將查看它的id屬性,這樣我們就可以匹配對(duì)應(yīng)的請(qǐng)求和響應(yīng)。如果我們發(fā)現(xiàn)了一個(gè)未知的id值,我們可以安全的丟棄這條消息,因?yàn)樗粚儆谖覀兊恼?qǐng)求。
最終實(shí)現(xiàn)代碼

RPC的工作方式是這樣的:
對(duì)于RPC請(qǐng)求,客戶端發(fā)送一條帶有兩個(gè)屬性的消息:replyTo,設(shè)置為僅為請(qǐng)求創(chuàng)建的匿名獨(dú)占隊(duì)列,和correlationId,設(shè)置為每個(gè)請(qǐng)求的惟一id值。
請(qǐng)求被發(fā)送到rpc_queue隊(duì)列。
RPC工作進(jìn)程(即:服務(wù)器)在隊(duì)列上等待請(qǐng)求。當(dāng)一個(gè)請(qǐng)求出現(xiàn)時(shí),它執(zhí)行任務(wù),并使用replyTo字段中的隊(duì)列將結(jié)果發(fā)回客戶機(jī)。
客戶機(jī)在回應(yīng)消息隊(duì)列上等待數(shù)據(jù)。當(dāng)消息出現(xiàn)時(shí),它檢查correlationId屬性。如果匹配請(qǐng)求中的值,則向程序返回該響應(yīng)數(shù)據(jù)。
服務(wù)器端
package rabbitmq.rpc;
import java.io.IOException;
import java.util.Random;
import java.util.Scanner;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCServer {
public static void main(String[] args) throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
/*
* 定義隊(duì)列 rpc_queue, 將從它接收請(qǐng)求信息
*
* 參數(shù):
* 1. queue, 對(duì)列名
* 2. durable, 持久化
* 3. exclusive, 排他
* 4. autoDelete, 自動(dòng)刪除
* 5. arguments, 其他參數(shù)屬性
*/
ch.queueDeclare("rpc_queue",false,false,false,null);
ch.queuePurge("rpc_queue");//清除隊(duì)列中的內(nèi)容
ch.basicQos(1);//一次只接收一條消息
//收到請(qǐng)求消息后的回調(diào)對(duì)象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//處理收到的數(shù)據(jù)(要求第幾個(gè)斐波那契數(shù))
String msg = new String(message.getBody(), "UTF-8");
int n = Integer.parseInt(msg);
//求出第n個(gè)斐波那契數(shù)
int r = fbnq(n);
String response = String.valueOf(r);
//設(shè)置發(fā)回響應(yīng)的id, 與請(qǐng)求id一致, 這樣客戶端可以把該響應(yīng)與它的請(qǐng)求進(jìn)行對(duì)應(yīng)
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(message.getProperties().getCorrelationId())
.build();
/*
* 發(fā)送響應(yīng)消息
* 1. 默認(rèn)交換機(jī)
* 2. 由客戶端指定的,用來傳遞響應(yīng)消息的隊(duì)列名
* 3. 參數(shù)(關(guān)聯(lián)id)
* 4. 發(fā)回的響應(yīng)消息
*/
ch.basicPublish("",message.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
//發(fā)送確認(rèn)消息
ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
//
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//消費(fèi)者開始接收消息, 等待從 rpc_queue接收請(qǐng)求消息, 不自動(dòng)確認(rèn)
ch.basicConsume("rpc_queue", false, deliverCallback, cancelCallback);
}
protected static int fbnq(int n) {
if(n == 1 || n == 2) return 1;
return fbnq(n-1)+fbnq(n-2);
}
}
客戶端
package rabbitmq.rpc;
import java.io.IOException;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.AMQP.BasicProperties;
public class RPCClient {
Connection con;
Channel ch;
public RPCClient() throws Exception {
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
con = f.newConnection();
ch = con.createChannel();
}
public String call(String msg) throws Exception {
//自動(dòng)生成對(duì)列名,非持久,獨(dú)占,自動(dòng)刪除
String replyQueueName = ch.queueDeclare().getQueue();
//生成關(guān)聯(lián)id
String corrId = UUID.randomUUID().toString();
//設(shè)置兩個(gè)參數(shù):
//1. 請(qǐng)求和響應(yīng)的關(guān)聯(lián)id
//2. 傳遞響應(yīng)數(shù)據(jù)的queue
BasicProperties props = new BasicProperties.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
//向 rpc_queue 隊(duì)列發(fā)送請(qǐng)求數(shù)據(jù), 請(qǐng)求第n個(gè)斐波那契數(shù)
ch.basicPublish("", "rpc_queue", props, msg.getBytes("UTF-8"));
//用來保存結(jié)果的阻塞集合,取數(shù)據(jù)時(shí),沒有數(shù)據(jù)會(huì)暫停等待
BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
//接收響應(yīng)數(shù)據(jù)的回調(diào)對(duì)象
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//如果響應(yīng)消息的關(guān)聯(lián)id,與請(qǐng)求的關(guān)聯(lián)id相同,我們來處理這個(gè)響應(yīng)數(shù)據(jù)
if (message.getProperties().getCorrelationId().contentEquals(corrId)) {
//把收到的響應(yīng)數(shù)據(jù),放入阻塞集合
response.offer(new String(message.getBody(), "UTF-8"));
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
//開始從隊(duì)列接收響應(yīng)數(shù)據(jù)
ch.basicConsume(replyQueueName, true, deliverCallback, cancelCallback);
//返回保存在集合中的響應(yīng)數(shù)據(jù)
return response.take();
}
public static void main(String[] args) throws Exception {
RPCClient client = new RPCClient();
while (true) {
System.out.print("求第幾個(gè)斐波那契數(shù):");
int n = new Scanner(System.in).nextInt();
String r = client.call(""+n);
System.out.println(r);
}
}
}
七、RabbitMQ六種工作模式總結(jié):

最近給大家找了 mysql入門到精通
資源,怎么領(lǐng)?。?/span>
掃二維碼為,加我微信,回復(fù):mysql入門到精通
注意,不要亂回復(fù) 沒錯(cuò),不是機(jī)器人 記得一定要等待,等待才有好東西
