RabbitMQ 的第一個程序
點擊上方藍色字體,選擇“置頂或者星標”?
優(yōu)質文章第一時間送達!
RabbitMQ 的第一個程序
RabbitMQ-生產者|消費者
搭建環(huán)境
java client
生產者和消費者都屬于客戶端, rabbitMQ的java客戶端如下

創(chuàng)建 maven 工程
<dependency>
??<groupId>com.rabbitmqgroupId>
??<artifactId>amqp-clientartifactId>
??<version>5.10.0version>
dependency>
AMQP協議的回顧

RabbitMQ支持的消息模型


第一種模型(直連)

在上圖的模型中,有以下概念:
- P:生產者,也就是要發(fā)送消息的程序
- C:消費者:消息的接受者,會一直等待消息到來。
- queue:消息隊列,圖中紅色部分。類似一個郵箱,可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。
開發(fā)生產者
/**
?*?生產者
?*?
?*?直連模式
?*
?*?@author?mxz
?*/
@Component
public?class?Provider?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接對象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?獲取連接中通道
????????Channel?channel?=?connection.createChannel();
????????//?通道綁定消息隊列
????????//?參數1?隊列的名稱,?如果不存在則自動創(chuàng)建
????????//?參數2?用來定義隊列是否需要持久化,?true?持久化隊列(mq關閉時,?會存到磁盤中)?false?不持久化(關閉即失)
????????//?參數3?exclusive?是否獨占隊列???true?獨占隊列??false?不獨占
????????//?參數4?autoDelete?是否在消費后自動刪除隊列??true?自動刪除???false?不刪除
????????//?參數5?額外的附加參數
????????channel.queueDeclare("hello",?false,?false,?false,?null);
????????//?發(fā)布消息
????????//?參數1?交換機名稱
????????//?參數2?隊列名稱
????????//?參數3?傳遞消息額外設置
????????//?參數4?消息的具體內容
????????channel.basicPublish("",?"hello",?null,?"hello?rabbitMQ".getBytes());
????????RabbitMQUtils.closeConnectionAndChannel(channel,?connection);
????}
}
開發(fā)消費者
/**
?*?消費者
?*
?*?@author?mxz
?*/
@Component
public?class?Customer?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接對象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?通道綁定對象
????????channel.queueDeclare("hello",?false,?false,?false,?null);
????????//?消費消息
????????//?參數1?消息隊列的消息,?隊列名稱
????????//?參數2?開啟消息的確認機制
????????//?參數3?消息時的回調接口
????????channel.basicConsume("hello",?true,?new?DefaultConsumer(channel)?{
????????????//?最后一個參數?消息隊列中取出的消息
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("new?String(body)"?+?new?String(body));
????????????}
????????});
//????????channel.close();
//????????connection.close();
????}
}
工具類
/**
?*?@author?mxz
?*/
public?class?RabbitMQUtils?{
????private?static?ConnectionFactory?connectionFactory;
????//?重量級資源??類加載執(zhí)行一次(即可)
????static?{
????????//?創(chuàng)建連接?mq?的連接工廠
????????connectionFactory?=?new?ConnectionFactory();
????????//?設置?rabbitmq?主機
????????connectionFactory.setHost("127.0.0.1");
????????//?設置端口號
????????connectionFactory.setPort(5672);
????????//?設置連接哪個虛擬主機
????????connectionFactory.setVirtualHost("/codingce");
????????//?設置訪問虛擬主機用戶名密碼
????????connectionFactory.setUsername("codingce");
????????connectionFactory.setPassword("123456");
????}
????/**
?????*?定義提供連接對象的方法
?????*
?????*?@return
?????*/
????public?static?Connection?getConnection()?{
????????try?{
????????????return?connectionFactory.newConnection();
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????????return?null;
????}
????/**
?????*?關閉通道和關閉連接工具方法
?????*
?????*?@param?connection
?????*?@param?channel
?????*/
????public?static?void?closeConnectionAndChannel(Channel?channel,?Connection?connection)?{
????????try?{
????????????//?先關?channel
????????????if?(channel?!=?null)
????????????????channel.close();
????????????if?(connection?!=?null)
????????????????connection.close();
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
}
第二種模型(work quene)
Work queues,也被稱為(Task queues),任務模型。當消息處理比較耗時的時候,可能生產消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。此時就可以使用work 模型:讓多個消費者綁定到一個隊列,共同消費隊列中的消息。隊列中的消息一旦消費,就會消失,因此任務是不會被重復執(zhí)行的。

角色:
- P:生產者:任務的發(fā)布者
- C1:消費者-1,領取任務并且完成任務,假設完成速度較慢
- C2:消費者-2:領取任務并完成任務,假設完成速度快
開發(fā)生產者
/**
?*?生產者
?*?
?*?任務模型?work?quenue
?*
?*?@author?mxz
?*/
@Component
public?class?Provider?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????//?通過通道聲明隊列
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????for?(int?i?=?0;?i?10;?i++)?{
????????????//?生產消息
????????????channel.basicPublish("",?"work",?null,?("?"?+?i?+?"work?quenue").getBytes());
????????}
????????//?關閉資源
????????RabbitMQUtils.closeConnectionAndChannel(channel,?connection);
????}
}
開發(fā)消費者-1
/**
?*?自動確認消費?autoAck?true?12搭配測試
?*?
?*?消費者?1
?*
?*?@author?mxz
?*/
@Component
public?class?CustomerOne?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接對象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?通道綁定對象
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????//?消費消息
????????//?參數1?消息隊列的消息,?隊列名稱
????????//?參數2?開啟消息的確認機制
????????//?參數3?消息時的回調接口
????????channel.basicConsume("work",?true,?new?DefaultConsumer(channel)?{
????????????//?最后一個參數?消息隊列中取出的消息
????????????//?默認分配是平均的
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費者-1"?+?new?String(body));
????????????????try?{
????????????????????Thread.sleep(1000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????});
//????????channel.close();
//????????connection.close();
????}
}
開發(fā)消費者-2
/**
?*?自動確認消費?autoAck?true?12搭配測試
?*?
?*?消費者?2
?*
?*?@author?mxz
?*/
@Component
public?class?CustomerTwo?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//?獲取連接對象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?通道綁定對象
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????channel.basicConsume("work",?true,?new?DefaultConsumer(channel)?{
????????????//?最后一個參數?消息隊列中取出的消息
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費者-1"?+?new?String(body));
????????????}
????????});
//????????channel.close();
//????????connection.close();
????}
}
測試結果


總結:默認情況下,RabbitMQ將按順序將每個消息發(fā)送給下一個使用者。平均而言,每個消費者都會收到相同數量的消息。這種分發(fā)消息的方式稱為循環(huán)。
消息自動確認機制
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
消費者3
/**
?*?能者多勞??34?搭配測試
?*?
?*?消費者?3
?*
?*?@author?mxz
?*/
@Component
public?class?CustomerThree?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接對象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?每一次只能消費一個消息
????????channel.basicQos(1);
????????//?通道綁定對象
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????//?參數1?隊列名稱?參數2(autoAck)?消息自動確認?true?消費者自動向?rabbitMQ?確認消息消費??false?不會自動確認消息
????????//?若出現消費者宕機情況?消費者三可以進行消費
????????channel.basicConsume("work",?false,?new?DefaultConsumer(channel)?{
????????????//?最后一個參數?消息隊列中取出的消息
????????????//?默認分配是平均的
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費者-1"?+?new?String(body));
????????????????//?手動確認?參數1?確認隊列中
????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
????????????????try?{
????????????????????Thread.sleep(1000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????});
//????????channel.close();
//????????connection.close();
????}
}
消費者4
/**
?*?能者多勞??34?搭配測試
?*?
?*?消費者?4
?*
?*?@author?mxz
?*/
@Component
public?class?CustomerFour?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//?獲取連接對象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?每一次只能消費一個消息
????????channel.basicQos(1);
????????//?通道綁定對象
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????channel.basicConsume("work",?false,?new?DefaultConsumer(channel)?{
????????????//?最后一個參數?消息隊列中取出的消息
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費者-1"?+?new?String(body));
????????????????//?手動確認?參數1?手動確認
????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
????????????}
????????});
//????????channel.close();
//????????connection.close();
????}
}
文章已上傳gitee https://gitee.com/codingce/hexo-blog
項目地址: https://github.com/xzMhehe/codingce-java
?
更多推薦內容
↓↓↓
如果你喜歡本文
請長按二維碼,關注公眾號
轉發(fā)朋友圈,是對我最大的支持喲
以上,便是今天的分享,希望大家喜歡,覺得內容不錯的,歡迎「分享」「贊」或者點擊「在看」支持,謝謝各位。
