RabbitMQ 最常用的 3 大模式!

Java技術(shù)棧
www.javastack.cn
關(guān)注閱讀更多優(yōu)質(zhì)文章
作者:海向
出處:www.cnblogs.com/haixiang/p/10864339.html
Direct 模式
所有發(fā)送到 Direct Exchange 的消息被轉(zhuǎn)發(fā)到 RouteKey 中指定的 Queue。
Direct 模式可以使用 RabbitMQ 自帶的 Exchange: default Exchange,所以不需要將 Exchange 進(jìn)行任何綁定(binding)操作。
消息傳遞時(shí),RouteKey 必須完全匹配才會(huì)被隊(duì)列接收,否則該消息會(huì)被拋棄,

import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
public?class?DirectProducer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//4.?聲明
????????String?exchangeName?=?"test_direct_exchange";
????????String?routingKey?=?"item.direct";
????????//5.?發(fā)送
????????String?msg?=?"this?is?direct?msg";
????????channel.basicPublish(exchangeName,?routingKey,?null,?msg.getBytes());
????????System.out.println("Send?message?:?"?+?msg);
????????//6.?關(guān)閉連接
????????channel.close();
????????connection.close();
????}
}
import?com.rabbitmq.client.*;
import?java.io.IOException;
public?class?DirectConsumer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????factory.setAutomaticRecoveryEnabled(true);
????????factory.setNetworkRecoveryInterval(3000);
??????
????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//4.?聲明
????????String?exchangeName?=?"test_direct_exchange";
????????String?queueName?=?"test_direct_queue";
????????String?routingKey?=?"item.direct";
????????channel.exchangeDeclare(exchangeName,?"direct",?true,?false,?null);
????????channel.queueDeclare(queueName,?false,?false,?false,?null);
????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
????????channel.queueBind(queueName,?exchangeName,?routingKey);
????????//5.?創(chuàng)建消費(fèi)者并接收消息
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
????????????????????throws?IOException?{
????????????????String?message?=?new?String(body,?"UTF-8");
????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
????????????}
????????};
????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
????????channel.basicConsume(queueName,?true,?consumer);
????}
}
?Send?message?:?this?is?direct?msg
?
?[x]?Received?'this?is?direct?msg'
Topic 模式
可以使用通配符進(jìn)行模糊匹配
符號(hào)'#" 匹配一個(gè)或多個(gè)詞
符號(hào)"*”匹配不多不少一個(gè)詞
例如:
'log.#"能夠匹配到'log.info.oa"
"log.*"只會(huì)匹配到"log.erro“

import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
public?class?TopicProducer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//4.?聲明
????????String?exchangeName?=?"test_topic_exchange";
????????String?routingKey1?=?"item.update";
????????String?routingKey2?=?"item.delete";
????????String?routingKey3?=?"user.add";
????????//5.?發(fā)送
????????String?msg?=?"this?is?topic?msg";
????????channel.basicPublish(exchangeName,?routingKey1,?null,?msg.getBytes());
????????channel.basicPublish(exchangeName,?routingKey2,?null,?msg.getBytes());
????????channel.basicPublish(exchangeName,?routingKey3,?null,?msg.getBytes());
????????System.out.println("Send?message?:?"?+?msg);
????????//6.?關(guān)閉連接
????????channel.close();
????????connection.close();
????}
}
import?com.rabbitmq.client.*;
import?java.io.IOException;
public?class?TopicConsumer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????factory.setAutomaticRecoveryEnabled(true);
????????factory.setNetworkRecoveryInterval(3000);
????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//4.?聲明
????????String?exchangeName?=?"test_topic_exchange";
????????String?queueName?=?"test_topic_queue";
????????String?routingKey?=?"item.#";
????????channel.exchangeDeclare(exchangeName,?"topic",?true,?false,?null);
????????channel.queueDeclare(queueName,?false,?false,?false,?null);
????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
????????channel.queueBind(queueName,?exchangeName,?routingKey);
????????//5.?創(chuàng)建消費(fèi)者并接收消息
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
????????????????????throws?IOException?{
????????????????String?message?=?new?String(body,?"UTF-8");
????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
????????????}
????????};
????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
????????channel.basicConsume(queueName,?true,?consumer);
????}
}
Send?message?:?this?is?topc?msg
[x]?Received?'this?is?topc?msg'
[x]?Received?'this?is?topc?msg'
Fanout 模式
不處理路由鍵,只需要簡(jiǎn)單的將隊(duì)列綁定到交換機(jī)上發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。系列RabbitMQ教程請(qǐng)關(guān)注公眾號(hào)Java技術(shù)棧獲取閱讀。
Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的。

import?com.rabbitmq.client.*;
import?java.io.IOException;
public?class?FanoutConsumer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????factory.setAutomaticRecoveryEnabled(true);
????????factory.setNetworkRecoveryInterval(3000);
????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//4.?聲明
????????String?exchangeName?=?"test_fanout_exchange";
????????String?queueName?=?"test_fanout_queue";
????????String?routingKey?=?"item.#";
????????channel.exchangeDeclare(exchangeName,?"fanout",?true,?false,?null);
????????channel.queueDeclare(queueName,?false,?false,?false,?null);
????????//一般不用代碼綁定,在管理界面手動(dòng)綁定
????????channel.queueBind(queueName,?exchangeName,?routingKey);
????????//5.?創(chuàng)建消費(fèi)者并接收消息
????????Consumer?consumer?=?new?DefaultConsumer(channel)?{
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,
???????????????????????????????????????AMQP.BasicProperties?properties,?byte[]?body)
????????????????????throws?IOException?{
????????????????String?message?=?new?String(body,?"UTF-8");
????????????????System.out.println("?[x]?Received?'"?+?message?+?"'");
????????????}
????????};
????????//6.?設(shè)置?Channel?消費(fèi)者綁定隊(duì)列
????????channel.basicConsume(queueName,?true,?consumer);
????}
}
import?com.rabbitmq.client.Channel;
import?com.rabbitmq.client.Connection;
import?com.rabbitmq.client.ConnectionFactory;
public?class?FanoutProducer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//1.?創(chuàng)建一個(gè)?ConnectionFactory?并進(jìn)行設(shè)置
????????ConnectionFactory?factory?=?new?ConnectionFactory();
????????factory.setHost("localhost");
????????factory.setVirtualHost("/");
????????factory.setUsername("guest");
????????factory.setPassword("guest");
????????//2.?通過(guò)連接工廠來(lái)創(chuàng)建連接
????????Connection?connection?=?factory.newConnection();
????????//3.?通過(guò)?Connection?來(lái)創(chuàng)建?Channel
????????Channel?channel?=?connection.createChannel();
????????//4.?聲明
????????String?exchangeName?=?"test_fanout_exchange";
????????String?routingKey1?=?"item.update";
????????String?routingKey2?=?"";
????????String?routingKey3?=?"ookjkjjkhjhk";//任意routingkey
????????//5.?發(fā)送
????????String?msg?=?"this?is?fanout?msg";
????????channel.basicPublish(exchangeName,?routingKey1,?null,?msg.getBytes());
????????channel.basicPublish(exchangeName,?routingKey2,?null,?msg.getBytes());
????????channel.basicPublish(exchangeName,?routingKey3,?null,?msg.getBytes());
????????System.out.println("Send?message?:?"?+?msg);
????????//6.?關(guān)閉連接
????????channel.close();
????????connection.close();
????}
}
Send?message?:?this?is?fanout?msg
[x]?Received?'this?is?fanout?msg'
[x]?Received?'this?is?fanout?msg'
[x]?Received?'this?is?fanout?msg'





關(guān)注Java技術(shù)??锤喔韶?/strong>

評(píng)論
圖片
表情

