RabbiMQ 的 6 種模式的基本應用
咱們從今天開始進入開源組件的學習,一邊學習一邊總結(jié)一邊分享
文章提綱如下:
RabbitMQ 成員組成
RabbitMQ 的六種工作模式編碼
RabbitMQ 成員組成
生產(chǎn)者 producer
消費者 consumer
交換機 exchange
用于接受、分配消息
消息 message
隊列 queue
用于存儲生產(chǎn)者的消息
信道 channel AMQP
消息推送使用的通道
連接 connections
生成者或者消費者與Rabbit 建立的TCP 連接
路由鍵 routingKey
用于把生成者的數(shù)據(jù)分配到交換器上
綁定鍵 BindingKey
用于把交換器的消息綁定到隊列上
連接管理器 ConnectionFactory
應用程序與 Rabbit 之間建立連接的管理器,程序代碼中使用
RabbitMQ 的六種工作模式編碼
single 模式
消息產(chǎn)生者將消息放入隊列
消息的消費者監(jiān)聽消息隊列,如果隊列中有消息就消費掉
目錄如下:
.
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
└── xmtmq.go
復制代碼實際編碼如下:
每種模式的編碼思路如下:
生產(chǎn)者 / 消費者
連接 RabbitMQ 的 server
初始化連接 connection
初始化通道 channel
初始化交換機 exchange
初始化隊列 queue
使用路由key,綁定隊列 bind , key
生產(chǎn)消息 / 消費消息 produce , consume
消息xmtmq.go
package xmtmq
import (
"github.com/streadway/amqp"
"log"
)
// single 模式
// 定義 RabbitMQ 的數(shù)據(jù)結(jié)構(gòu)
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 連接
channel *amqp.Channel // 通道
QueueName string // 隊列名
Exchange string // 交換機
Key string // 路由鍵
MQUrl string // MQ的虛擬機地址
}
// New 一個 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" {
log.Panic("please check QueueName,Exchange,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {
log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {
log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ){
if rbt == nil{
log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()
}
func (rbt *RabbitMQ) Init() {
// 申請隊列
_, err := rbt.channel.QueueDeclare(
rbt.QueueName, // 隊列名
true, // 是否持久化
false, // 是否自動刪除
false, // 是否排他
false, // 是否阻塞
nil, // 其他參數(shù)
)
if err != nil {
log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
}
// 生產(chǎn)消息
func (rbt *RabbitMQ) Produce(data []byte) {
// 向隊列中加入數(shù)據(jù)
err := rbt.channel.Publish(
rbt.Exchange, // 交換機
rbt.QueueName, // 隊列名
false, // 若為true,根據(jù)自身exchange類型和routekey規(guī)則無法找到符合條件的隊列會把消息返還給發(fā)送者
false, // 若為true,當exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有消費者,則會把消息返還給發(fā)送者
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {
log.Printf("rbt.channel.Publish error : %v", err)
return
}
return
}
// 消費消息
func (rbt *RabbitMQ) Consume() {
// 消費數(shù)據(jù)
msg, err := rbt.channel.Consume(
rbt.QueueName, // 隊列名
"xmt", // 消費者的名字
true, // 是否自動應答
false, // 是否排他
false, // 若為true,表示 不能將同一個Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中 的消費者
false, // 是否阻塞
nil, // 其他屬性
)
if err != nil {
log.Printf("rbt.channel.Consume error : %v", err)
return
}
for data := range msg {
log.Printf("received data is %v", string(data.Body))
}
}
復制代碼main.go
package main
import (
"fmt"
"log"
"time"
"xmt/xmtmq"
)
/*
RabbimtMQ single 模式 案例
應用場景:簡單消息隊列的使用,一個生產(chǎn)者一個消費者
生產(chǎn)消息
*/
func main() {
// 設(shè)置日志
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
var index = 0
for {
// 生產(chǎn)消息
rbt.Produce([]byte(fmt.Sprintf("hello wolrd %d ", index)))
log.Println("發(fā)送成功 ", index)
index++
time.Sleep(1 * time.Second)
}
}
復制代碼consumer.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Consume()
}
復制代碼運行的時候,打開2個終端
終端1:go run main.go
終端2:go run consumer.go
work 模式
多個消費端消費同一個隊列中的消息,隊列采用輪詢的方式將消息是平均發(fā)送給消費者,此處的資源是競爭關(guān)系
當生產(chǎn)者生產(chǎn)消息的速度大于消費者消費的速度,就要考慮用 work 工作模式,這樣能提高處理速度提高負載
work 模式與 single 模式類似, 只是work 模式比 single 模式多了一些消費者
基于single 模式,開一個終端3 :go run consumer.go
publish / subscribe 模式
publish / subscribe 發(fā)布訂閱模式 , 相對于Work queues模式多了一個交換機,此處的資源是共享的
用于場景
郵件群發(fā)
群聊天
廣播(廣告等)
目錄和上述編碼保持一致:
xmtmq.go
開始用到交換機 exchange ,fanout 類型
生產(chǎn)端先把消息發(fā)送到交換機,再由交換機把消息發(fā)送到綁定的隊列中,每個綁定的隊列都能收到由生產(chǎn)端發(fā)送的消息
package xmtmq
import (
"github.com/streadway/amqp"
"log"
)
// publish 模式
// 定義 RabbitMQ 的數(shù)據(jù)結(jié)構(gòu)
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 連接
channel *amqp.Channel // 通道
QueueName string // 隊列名
Exchange string // 交換機
Key string // 路由鍵
MQUrl string // MQ的虛擬機地址
}
// New 一個 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.Exchange == "" || rbt.MQUrl == "" {
log.Panic("please check Exchange,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {
log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {
log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
if rbt == nil {
log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()
}
func (rbt *RabbitMQ) Init() {
// 1、創(chuàng)建交換機
err := rbt.channel.ExchangeDeclare(
rbt.Exchange, // 交換機
amqp.ExchangeFanout, // 交換機類型
true, // 是否持久化
false, //是否自動刪除
false, //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
false, // 是否阻塞
nil, // 其他屬性
)
if err != nil {
log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
return
}
}
// 生產(chǎn)消息 publish
func (rbt *RabbitMQ) PublishMsg(data []byte) {
// 1、向隊列中加入數(shù)據(jù)
err := rbt.channel.Publish(
rbt.Exchange, // 交換機
"", // 隊列名
false, // 若為true,根據(jù)自身exchange類型和routekey規(guī)則無法找到符合條件的隊列會把消息返還給發(fā)送者
false, // 若為true,當exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有消費者,則會把消息返還給發(fā)送者
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {
log.Printf("rbt.channel.Publish error : %v", err)
return
}
return
}
// 消費消息
func (rbt *RabbitMQ) SubscribeMsg() {
// 1、創(chuàng)建隊列
q, err := rbt.channel.QueueDeclare(
"", // 此處我們傳入的是空,則是隨機產(chǎn)生隊列的名稱
true,
false,
false,
false,
nil,
)
if err != nil {
log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
// 2、綁定隊列
err = rbt.channel.QueueBind(
q.Name, // 隊列名字
"", // 在publish模式下,這里key 為空
rbt.Exchange, // 交換機名稱
false, // 是否阻塞
nil, // 其他屬性
)
if err != nil {
log.Printf("rbt.channel.QueueBind error : %v", err)
return
}
// 3、消費數(shù)據(jù)
msg, err := rbt.channel.Consume(
q.Name, // 隊列名
"xmt", // 消費者的名字
true, // 是否自動應答
false, // 是否排他
false, // 若為true,表示 不能將同一個Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中 的消費者
false, // 是否阻塞
nil, // 其他屬性
)
if err != nil {
log.Printf("rbt.channel.Consume error : %v", err)
return
}
for data := range msg {
log.Printf("received data is %v", string(data.Body))
}
}
復制代碼main.go
package main
import (
"fmt"
"log"
"time"
"xmt/xmtmq"
)
/*
RabbimtMQ publish 模式 案例
應用場景:郵件群發(fā),群聊天,廣播(廣告)
生產(chǎn)消息
*/
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx",
MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Init()
var index = 0
for {
rbt.PublishMsg([]byte(fmt.Sprintf("hello wolrd %d ", index)))
log.Println("發(fā)送成功 ", index)
index++
time.Sleep(1 * time.Second)
}
xmtmq.RabbitMQFree(rbt)
}
復制代碼consumer.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx",
MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.SubscribeMsg()
xmtmq.RabbitMQFree(rbt)
}
復制代碼執(zhí)行的操作和上述保持一致
終端1:go run main.go
終端2:go run consumer.go
終端3:go run consumer.go
效果和上述single 模式和 work模式的明顯區(qū)別是:發(fā)布訂閱模式的案例,生產(chǎn)者生產(chǎn)的消息,對應的消費者消費其生產(chǎn)的內(nèi)容
routing 模式
消息生產(chǎn)者將消息發(fā)送給交換機按照路由判斷,路由是字符串 當前產(chǎn)生的消息攜帶路由字符(對象的方法),交換機根據(jù)路由的key,只能匹配上路由key對應的消息隊列,對應的消費者才能消費消息
**應用場景:**從系統(tǒng)的代碼邏輯中獲取對應的功能字符串,將消息任務(wù)扔到對應的隊列中業(yè)務(wù)場景,例如處理錯誤,處理特定消息等
生產(chǎn)者處理流程:
聲明隊列并聲明交換機 -> 創(chuàng)建連接 -> 創(chuàng)建通道 -> 通道聲明交換機 -> 通道聲明隊列 -> 通過通道使隊列綁定到交換機并指定該隊列的routingkey(通配符) -> 制定消息 -> 發(fā)送消息并指定routingkey(通配符)
復制代碼消費者處理流程:
聲明隊列并聲明交換機 -> 創(chuàng)建連接 -> 創(chuàng)建通道 -> 通道聲明交換機 -> 通道聲明隊列 -> 通過通道使隊列綁定到交換機并指定routingkey(通配符) -> 重寫消息消費方法 -> 執(zhí)行消息方法
復制代碼目錄結(jié)構(gòu)如下:
.
├── consumer2.go
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
└── xmtmq.go
復制代碼xmtmq.go
用到交換機 為
direct類型用到路由鍵
package xmtmq
import (
"github.com/streadway/amqp"
"log"
)
// routing 模式
// 定義 RabbitMQ 的數(shù)據(jù)結(jié)構(gòu)
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 連接
channel *amqp.Channel // 通道
QueueName string // 隊列名
Exchange string // 交換機
Key string // 路由鍵
MQUrl string // MQ的虛擬機地址
}
// New 一個 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.Exchange == "" || rbt.QueueName == "" || rbt.Key == "" || rbt.MQUrl == "" {
log.Panic("please check Exchange,,QueueName,Key,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {
log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {
log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
if rbt == nil {
log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()
}
func (rbt *RabbitMQ) Init() {
// 1、創(chuàng)建交換機
err := rbt.channel.ExchangeDeclare(
rbt.Exchange, // 交換機
amqp.ExchangeDirect, // 交換機類型
true, // 是否持久化
false, //是否自動刪除
false, //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
false, // 是否阻塞
nil, // 其他屬性
)
if err != nil {
log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
return
}
// 2、創(chuàng)建隊列
_, err = rbt.channel.QueueDeclare(
rbt.QueueName, // 此處我們傳入的是空,則是隨機產(chǎn)生隊列的名稱
true,
false,
false,
false,
nil,
)
if err != nil {
log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
// 3、綁定隊列
err = rbt.channel.QueueBind(
rbt.QueueName, // 隊列名字
rbt.Key, // routing,這里key 需要填
rbt.Exchange, // 交換機名稱
false, // 是否阻塞
nil, // 其他屬性
)
if err != nil {
log.Printf("rbt.channel.QueueBind error : %v", err)
return
}
}
// 生產(chǎn)消息 publish
func (rbt *RabbitMQ) ProduceRouting(data []byte) {
// 1、向隊列中加入數(shù)據(jù)
err := rbt.channel.Publish(
rbt.Exchange, // 交換機
rbt.Key, // key
false, // 若為true,根據(jù)自身exchange類型和routekey規(guī)則無法找到符合條件的隊列會把消息返還給發(fā)送者
false, // 若為true,當exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有消費者,則會把消息返還給發(fā)送者
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {
log.Printf("rbt.channel.Publish error : %v", err)
return
}
return
}
// 消費消息
func (rbt *RabbitMQ) ConsumeRoutingMsg() {
// 4、消費數(shù)據(jù)
msg, err := rbt.channel.Consume(
rbt.QueueName, // 隊列名
"", // 消費者的名字
true, // 是否自動應答
false, // 是否排他
false, // 若為true,表示 不能將同一個Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中 的消費者
false, // 是否阻塞
nil, // 其他屬性
)
if err != nil {
log.Printf("rbt.channel.Consume error : %v", err)
return
}
for data := range msg {
log.Printf("received data is %v", string(data.Body))
}
}
復制代碼main.go
package main
import (
"fmt"
"log"
"time"
"xmt/xmtmq"
)
/*
RabbimtMQ routing 模式 案例
應用場景:從系統(tǒng)的代碼邏輯中獲取對應的功能字符串,將消息任務(wù)扔到對應的隊列中業(yè)務(wù)場景,例如處理錯誤,處理特定消息等
生產(chǎn)消息
*/
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt1 := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt1",
QueueName: "Routingqueuexmt1",
MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt1)
rbt1.Init()
rbt2 := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt2",
QueueName: "Routingqueuexmt2",
MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt2)
rbt2.Init()
var index = 0
for {
rbt1.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt1 %d ", index)))
log.Println("發(fā)送成功xmt1 ", index)
rbt2.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt2 %d ", index)))
log.Println("發(fā)送成功xmt2 ", index)
index++
time.Sleep(1 * time.Second)
}
xmtmq.RabbitMQFree(rbt1)
xmtmq.RabbitMQFree(rbt2)
}
復制代碼consumer.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt1",
QueueName: "Routingqueuexmt1",
MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.ConsumeRoutingMsg()
xmtmq.RabbitMQFree(rbt)
}
復制代碼consumer2.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt2",
QueueName: "Routingqueuexmt2",
MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.ConsumeRoutingMsg()
xmtmq.RabbitMQFree(rbt)
}
復制代碼topic 模式
話題模式,一個消息被多個消費者獲取,消息的目標 queue 可用 BindingKey 的通配符
Topics 模式實際上是路由模式的一種
他倆的最大的區(qū)別是 :Topics 模式發(fā)送消息和消費消息的時候是通過通配符去進行匹配的
*號代表可以同通配一個單詞
#號代表可以通配零個或多個單詞
編碼的案例與上述 routing 模式保持一直,只是 exchange 為 topic類型
如下是上述幾種模式涉及到的交換機和隊列

rpc 模式
RPC 遠程過程調(diào)用,客戶端遠程調(diào)用服務(wù)端的方法 ,使用 MQ 可以實現(xiàn) RPC 的異步調(diào)用
目錄結(jié)構(gòu)為:
.
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
└── xmtmq.go
復制代碼客戶端即是生產(chǎn)者也是消費者,向
RPC請求隊列發(fā)送RPC調(diào)用消息,同時監(jiān)聽RPC響應隊列服務(wù)端監(jiān)聽
RPC請求隊列的消息,收到消息后執(zhí)行服務(wù)端的方法,得到方法返回的結(jié)果服務(wù)端將
RPC方法 的結(jié)果發(fā)送到RPC響應隊列。客戶端監(jiān)聽
RPC響應隊列,接收到RPC調(diào)用結(jié)果
xmtmq.go
package xmtmq
import (
"github.com/streadway/amqp"
"log"
"math/rand"
)
// rpc 模式
// 定義 RabbitMQ 的數(shù)據(jù)結(jié)構(gòu)
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 連接
channel *amqp.Channel // 通道
QueueName string // 隊列名
Exchange string // 交換機
Key string // 路由鍵
MQUrl string // MQ的虛擬機地址
}
// New 一個 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" {
log.Panic("please check QueueName,Exchange,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {
log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {
log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
if rbt == nil {
log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()
}
// 生產(chǎn)消息
func (rbt *RabbitMQ) Produce(data []byte) {
// 申請隊列
q, err := rbt.channel.QueueDeclare(
rbt.QueueName, // 隊列名
true, // 是否持久化
false, // 是否自動刪除
false, // 是否排他
false, // 是否阻塞
nil, // 其他參數(shù)
)
if err != nil {
log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
err = rbt.channel.Qos(1, 0, false)
if err != nil {
log.Printf("rbt.channel.Qos error : %v", err)
return
}
d, err := rbt.channel.Consume(
q.Name,
"",
false,
false,
false,
false,
nil)
if err != nil {
log.Printf("rbt.channel.Consume error : %v", err)
return
}
for msg := range d {
log.Println("received msg is ", string(msg.Body))
err := rbt.channel.Publish(
"",
msg.ReplyTo,
false,
false,
amqp.Publishing{
ContentType: "test/plain",
CorrelationId: msg.CorrelationId,
Body: data,
})
if err != nil {
log.Printf("rbt.channel.Publish error : %v", err)
return
}
msg.Ack(false)
log.Println("svr response ok ")
}
return
}
func randomString(l int) string {
bytes := make([]byte, l)
for i := 0; i < l; i++ {
bytes[i] = byte(rand.Intn(l))
}
return string(bytes)
}
// 消費消息
func (rbt *RabbitMQ) Consume() {
// 申請隊列
q, err := rbt.channel.QueueDeclare(
"", // 隊列名
true, // 是否持久化
false, // 是否自動刪除
false, // 是否排他
false, // 是否阻塞
nil, // 其他參數(shù)
)
if err != nil {
log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
// 消費數(shù)據(jù)
msg, err := rbt.channel.Consume(
q.Name, // 隊列名
"xmt", // 消費者的名字
true, // 是否自動應答
false, // 是否排他
false, // 若為true,表示 不能將同一個Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中 的消費者
false, // 是否阻塞
nil, // 其他屬性
)
if err != nil {
log.Printf("rbt.channel.Consume error : %v", err)
return
}
id := randomString(32)
err = rbt.channel.Publish(
"",
rbt.QueueName,
false,
false,
amqp.Publishing{
ContentType: "test/plain",
CorrelationId: id,
ReplyTo: q.Name,
Body: []byte("321"),
})
if err != nil {
log.Printf("rbt.channel.Publish error : %v", err)
return
}
for data := range msg {
log.Printf("received data is %v", string(data.Body))
}
}
復制代碼main.go
package main
import (
"fmt"
"log"
"xmt/xmtmq"
)
/*
RabbimtMQ rpc 模式 案例
應用場景:簡單消息隊列的使用,一個生產(chǎn)者一個消費者
生產(chǎn)消息
*/
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Produce([]byte(fmt.Sprintf("hello wolrd")))
}
復制代碼consumer.go
package main
import (
"log"
"math/rand"
"time"
"xmt/xmtmq"
)
func main() {
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rand.Seed(time.Now().UTC().UnixNano())
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Consume()
}
復制代碼咱們先運行消費者,多運行幾個,可以看到咱們的隊列中已經(jīng)有數(shù)據(jù)了,咱們運行的是2個消費者,因此此處是 2

再運行生產(chǎn)者,就能看到生產(chǎn)者將消費者發(fā)送的消息消費掉,并且通過 CorrelationId 找到對應消費者監(jiān)聽的隊列,將數(shù)據(jù)發(fā)送到隊列中
消費者監(jiān)聽的隊列有數(shù)據(jù)了,消費者就取出來進行消費
總結(jié)
RabbitMQ 的六種工作模式:
single 模式
work 模式
publish / subscribe 模式
routing 模式
topic 模式
rpc 模式
參考資料:
RabbitMQ Tutorials
歡迎點贊,關(guān)注,收藏
朋友們,你的支持和鼓勵,是我堅持分享,提高質(zhì)量的動力

好了,本次就到這里
技術(shù)是開放的,我們的心態(tài),更應是開放的。擁抱變化,向陽而生,努力向前行。
歡迎點贊關(guān)注收藏,下次見~
作者:小魔童哪吒
鏈接:https://juejin.cn/post/6985182716358557733
來源:掘金
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。
