<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbiMQ 的 6 種模式的基本應用

          共 18658字,需瀏覽 38分鐘

           ·

          2021-07-22 12:53

          咱們從今天開始進入開源組件的學習,一邊學習一邊總結(jié)一邊分享

          文章提綱如下:

          • RabbitMQ 成員組成

          • RabbitMQ 的六種工作模式編碼

          RabbitMQ 成員組成

          • 生產(chǎn)者 producer

          • 消費者 consumer

          • 交換機 exchange

          用于接受、分配消息

          • 消息 message

          • 隊列 queue

          用于存儲生產(chǎn)者的消息

          • 信道 channel AMQP

          消息推送使用的通道

          • 連接 connections

          生成者或者消費者與Rabbit 建立的TCP 連接

          • 路由鍵 routingKey

          用于把生成者的數(shù)據(jù)分配到交換器上

          • 綁定鍵 BindingKey

          用于把交換器的消息綁定到隊列上

          • 連接管理器 ConnectionFactory

          應用程序與 Rabbit 之間建立連接的管理器,程序代碼中使用

          RabbitMQ 的六種工作模式編碼

          single 模式

          • 消息產(chǎn)生者將消息放入隊列

          • 消息的消費者監(jiān)聽消息隊列,如果隊列中有消息就消費掉

          目錄如下:

          .
          ├── consumer.go
          ├── go.mod
          ├── go.sum
          ├── main.go
          └── xmtmq
          └── xmtmq.go
          復制代碼

          實際編碼如下:

          每種模式的編碼思路如下:

          生產(chǎn)者 / 消費者

          • 連接 RabbitMQ 的 server

          • 初始化連接 connection

          • 初始化通道 channel

          • 初始化交換機 exchange

          • 初始化隊列 queue

          • 使用路由key,綁定隊列  bind , key

          • 生產(chǎn)消息 / 消費消息  produce , consume

          消息xmtmq.go

          package xmtmq

          import (
          "github.com/streadway/amqp"
          "log"
          )
          // single 模式
          // 定義 RabbitMQ 的數(shù)據(jù)結(jié)構(gòu)
          // go get github.com/streadway/amqp

          type RabbitMQ struct {
          conn *amqp.Connection // 連接
          channel *amqp.Channel // 通道
          QueueName string // 隊列名
          Exchange string // 交換機
          Key string // 路由鍵
          MQUrl string // MQ的虛擬機地址
          }

          // New 一個 RabbitMQ
          func NewRabbitMQ(rbt *RabbitMQ) {
          if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" {
          log.Panic("please check QueueName,Exchange,MQUrl ...")
          }

          conn, err := amqp.Dial(rbt.MQUrl)
          if err != nil {
          log.Panicf("amqp.Dial error : %v", err)
          }
          rbt.conn = conn

          channel, err := rbt.conn.Channel()
          if err != nil {
          log.Panicf("rbt.conn.Channel error : %v", err)
          }
          rbt.channel = channel
          }


          func RabbitMQFree(rbt *RabbitMQ){
          if rbt == nil{
          log.Printf("rbt is nil,free failed")
          return
          }
          rbt.channel.Close()
          rbt.conn.Close()
          }
          func (rbt *RabbitMQ) Init() {
          // 申請隊列
          _, err := rbt.channel.QueueDeclare(
          rbt.QueueName, // 隊列名
          true, // 是否持久化
          false, // 是否自動刪除
          false, // 是否排他
          false, // 是否阻塞
          nil, // 其他參數(shù)
          )
          if err != nil {
          log.Printf("rbt.channel.QueueDeclare error : %v", err)
          return
          }
          }


          // 生產(chǎn)消息

          func (rbt *RabbitMQ) Produce(data []byte) {

          // 向隊列中加入數(shù)據(jù)
          err := rbt.channel.Publish(
          rbt.Exchange, // 交換機
          rbt.QueueName, // 隊列名
          false, // 若為true,根據(jù)自身exchange類型和routekey規(guī)則無法找到符合條件的隊列會把消息返還給發(fā)送者
          false, // 若為true,當exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有消費者,則會把消息返還給發(fā)送者
          amqp.Publishing{
          ContentType: "text/plain",
          Body: data,
          },
          )
          if err != nil {
          log.Printf("rbt.channel.Publish error : %v", err)
          return
          }
          return
          }

          // 消費消息
          func (rbt *RabbitMQ) Consume() {

          // 消費數(shù)據(jù)
          msg, err := rbt.channel.Consume(
          rbt.QueueName, // 隊列名
          "xmt", // 消費者的名字
          true, // 是否自動應答
          false, // 是否排他
          false, // 若為true,表示 不能將同一個Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中 的消費者
          false, // 是否阻塞
          nil, // 其他屬性
          )

          if err != nil {
          log.Printf("rbt.channel.Consume error : %v", err)
          return
          }

          for data := range msg {
          log.Printf("received data is %v", string(data.Body))
          }

          }
          復制代碼

          main.go

          package main

          import (
          "fmt"
          "log"
          "time"
          "xmt/xmtmq"
          )

          /*
          RabbimtMQ single 模式 案例
          應用場景:簡單消息隊列的使用,一個生產(chǎn)者一個消費者
          生產(chǎn)消息
          */


          func main() {
          // 設(shè)置日志
          log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

          rbt := &xmtmq.RabbitMQ{
          QueueName: "xmtqueue",
          MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
          }

          xmtmq.NewRabbitMQ(rbt)

          var index = 0

          for {
          // 生產(chǎn)消息
          rbt.Produce([]byte(fmt.Sprintf("hello wolrd %d ", index)))
          log.Println("發(fā)送成功 ", index)
          index++
          time.Sleep(1 * time.Second)
          }

          }
          復制代碼

          consumer.go

          package main

          import (
          "log"
          "xmt/xmtmq"
          )

          func main() {

          log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

          rbt := &xmtmq.RabbitMQ{
          QueueName: "xmtqueue",
          MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
          }

          xmtmq.NewRabbitMQ(rbt)
          rbt.Consume()
          }
          復制代碼

          運行的時候,打開2個終端

          終端1:go run main.go

          終端2:go run consumer.go

          work 模式

          多個消費端消費同一個隊列中的消息,隊列采用輪詢的方式將消息是平均發(fā)送給消費者,此處的資源是競爭關(guān)系

          當生產(chǎn)者生產(chǎn)消息的速度大于消費者消費的速度,就要考慮用 work 工作模式,這樣能提高處理速度提高負載

          work 模式與 single 模式類似, 只是work 模式比 single 模式多了一些消費者

          基于single 模式,開一個終端3 :go run consumer.go

          publish / subscribe 模式

          publish / subscribe 發(fā)布訂閱模式 , 相對于Work queues模式多了一個交換機,此處的資源是共享的

          用于場景

          • 郵件群發(fā)

          • 群聊天

          • 廣播(廣告等)

          目錄和上述編碼保持一致:

          xmtmq.go

          開始用到交換機 exchange ,fanout 類型

          生產(chǎn)端先把消息發(fā)送到交換機,再由交換機把消息發(fā)送到綁定的隊列中,每個綁定的隊列都能收到由生產(chǎn)端發(fā)送的消息

          package xmtmq

          import (
          "github.com/streadway/amqp"
          "log"
          )

          // publish 模式
          // 定義 RabbitMQ 的數(shù)據(jù)結(jié)構(gòu)
          // go get github.com/streadway/amqp

          type RabbitMQ struct {
          conn *amqp.Connection // 連接
          channel *amqp.Channel // 通道
          QueueName string // 隊列名
          Exchange string // 交換機
          Key string // 路由鍵
          MQUrl string // MQ的虛擬機地址
          }

          // New 一個 RabbitMQ
          func NewRabbitMQ(rbt *RabbitMQ) {
          if rbt == nil || rbt.Exchange == "" || rbt.MQUrl == "" {
          log.Panic("please check Exchange,MQUrl ...")
          }

          conn, err := amqp.Dial(rbt.MQUrl)
          if err != nil {
          log.Panicf("amqp.Dial error : %v", err)
          }
          rbt.conn = conn

          channel, err := rbt.conn.Channel()
          if err != nil {
          log.Panicf("rbt.conn.Channel error : %v", err)
          }
          rbt.channel = channel
          }

          func RabbitMQFree(rbt *RabbitMQ) {
          if rbt == nil {
          log.Printf("rbt is nil,free failed")
          return
          }

          rbt.channel.Close()
          rbt.conn.Close()
          }

          func (rbt *RabbitMQ) Init() {
          // 1、創(chuàng)建交換機
          err := rbt.channel.ExchangeDeclare(
          rbt.Exchange, // 交換機
          amqp.ExchangeFanout, // 交換機類型
          true, // 是否持久化
          false, //是否自動刪除
          false, //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
          false, // 是否阻塞
          nil, // 其他屬性
          )
          if err != nil {
          log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
          return
          }

          }

          // 生產(chǎn)消息 publish

          func (rbt *RabbitMQ) PublishMsg(data []byte) {

          // 1、向隊列中加入數(shù)據(jù)
          err := rbt.channel.Publish(
          rbt.Exchange, // 交換機
          "", // 隊列名
          false, // 若為true,根據(jù)自身exchange類型和routekey規(guī)則無法找到符合條件的隊列會把消息返還給發(fā)送者
          false, // 若為true,當exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有消費者,則會把消息返還給發(fā)送者
          amqp.Publishing{
          ContentType: "text/plain",
          Body: data,
          },
          )
          if err != nil {
          log.Printf("rbt.channel.Publish error : %v", err)
          return
          }
          return

          }

          // 消費消息
          func (rbt *RabbitMQ) SubscribeMsg() {

          // 1、創(chuàng)建隊列
          q, err := rbt.channel.QueueDeclare(
          "", // 此處我們傳入的是空,則是隨機產(chǎn)生隊列的名稱
          true,
          false,
          false,
          false,
          nil,
          )
          if err != nil {
          log.Printf("rbt.channel.QueueDeclare error : %v", err)
          return
          }

          // 2、綁定隊列
          err = rbt.channel.QueueBind(
          q.Name, // 隊列名字
          "", // 在publish模式下,這里key 為空
          rbt.Exchange, // 交換機名稱
          false, // 是否阻塞
          nil, // 其他屬性
          )
          if err != nil {
          log.Printf("rbt.channel.QueueBind error : %v", err)
          return
          }

          // 3、消費數(shù)據(jù)
          msg, err := rbt.channel.Consume(
          q.Name, // 隊列名
          "xmt", // 消費者的名字
          true, // 是否自動應答
          false, // 是否排他
          false, // 若為true,表示 不能將同一個Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中 的消費者
          false, // 是否阻塞
          nil, // 其他屬性
          )

          if err != nil {
          log.Printf("rbt.channel.Consume error : %v", err)
          return
          }

          for data := range msg {
          log.Printf("received data is %v", string(data.Body))
          }

          }
          復制代碼

          main.go

          package main

          import (
          "fmt"
          "log"
          "time"
          "xmt/xmtmq"
          )

          /*
          RabbimtMQ publish 模式 案例
          應用場景:郵件群發(fā),群聊天,廣播(廣告)
          生產(chǎn)消息
          */


          func main() {

          log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

          rbt := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx",
          MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
          }

          xmtmq.NewRabbitMQ(rbt)
          rbt.Init()

          var index = 0

          for {
          rbt.PublishMsg([]byte(fmt.Sprintf("hello wolrd %d ", index)))
          log.Println("發(fā)送成功 ", index)
          index++
          time.Sleep(1 * time.Second)
          }

          xmtmq.RabbitMQFree(rbt)

          }
          復制代碼

          consumer.go

          package main

          import (
          "log"
          "xmt/xmtmq"
          )

          func main() {

          log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

          rbt := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx",
          MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
          }

          xmtmq.NewRabbitMQ(rbt)
          rbt.SubscribeMsg()
          xmtmq.RabbitMQFree(rbt)
          }
          復制代碼

          執(zhí)行的操作和上述保持一致

          終端1:go run main.go

          終端2:go run consumer.go

          終端3:go run consumer.go

          效果和上述single 模式和 work模式的明顯區(qū)別是:發(fā)布訂閱模式的案例,生產(chǎn)者生產(chǎn)的消息,對應的消費者消費其生產(chǎn)的內(nèi)容

          routing 模式

          消息生產(chǎn)者將消息發(fā)送給交換機按照路由判斷,路由是字符串 當前產(chǎn)生的消息攜帶路由字符(對象的方法),交換機根據(jù)路由的key,只能匹配上路由key對應的消息隊列,對應的消費者才能消費消息

          **應用場景:**從系統(tǒng)的代碼邏輯中獲取對應的功能字符串,將消息任務(wù)扔到對應的隊列中業(yè)務(wù)場景,例如處理錯誤,處理特定消息等

          生產(chǎn)者處理流程:

          聲明隊列并聲明交換機 -> 創(chuàng)建連接 -> 創(chuàng)建通道 -> 通道聲明交換機 -> 通道聲明隊列 -> 通過通道使隊列綁定到交換機并指定該隊列的routingkey(通配符) -> 制定消息 -> 發(fā)送消息并指定routingkey(通配符)
          復制代碼

          消費者處理流程:

          聲明隊列并聲明交換機 -> 創(chuàng)建連接 -> 創(chuàng)建通道 -> 通道聲明交換機 -> 通道聲明隊列 -> 通過通道使隊列綁定到交換機并指定routingkey(通配符) -> 重寫消息消費方法 -> 執(zhí)行消息方法
          復制代碼

          目錄結(jié)構(gòu)如下:

          .
          ├── consumer2.go
          ├── consumer.go
          ├── go.mod
          ├── go.sum
          ├── main.go
          └── xmtmq
          └── xmtmq.go
          復制代碼

          xmtmq.go

          • 用到交換機 為 direct 類型

          • 用到路由鍵

          package xmtmq

          import (
          "github.com/streadway/amqp"
          "log"
          )

          // routing 模式
          // 定義 RabbitMQ 的數(shù)據(jù)結(jié)構(gòu)
          // go get github.com/streadway/amqp

          type RabbitMQ struct {
          conn *amqp.Connection // 連接
          channel *amqp.Channel // 通道
          QueueName string // 隊列名
          Exchange string // 交換機
          Key string // 路由鍵
          MQUrl string // MQ的虛擬機地址
          }

          // New 一個 RabbitMQ
          func NewRabbitMQ(rbt *RabbitMQ) {
          if rbt == nil || rbt.Exchange == "" || rbt.QueueName == "" || rbt.Key == "" || rbt.MQUrl == "" {
          log.Panic("please check Exchange,,QueueName,Key,MQUrl ...")
          }

          conn, err := amqp.Dial(rbt.MQUrl)
          if err != nil {
          log.Panicf("amqp.Dial error : %v", err)
          }
          rbt.conn = conn

          channel, err := rbt.conn.Channel()
          if err != nil {
          log.Panicf("rbt.conn.Channel error : %v", err)
          }
          rbt.channel = channel
          }

          func RabbitMQFree(rbt *RabbitMQ) {
          if rbt == nil {
          log.Printf("rbt is nil,free failed")
          return
          }

          rbt.channel.Close()
          rbt.conn.Close()
          }

          func (rbt *RabbitMQ) Init() {
          // 1、創(chuàng)建交換機
          err := rbt.channel.ExchangeDeclare(
          rbt.Exchange, // 交換機
          amqp.ExchangeDirect, // 交換機類型
          true, // 是否持久化
          false, //是否自動刪除
          false, //true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定
          false, // 是否阻塞
          nil, // 其他屬性
          )
          if err != nil {
          log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
          return
          }

          // 2、創(chuàng)建隊列
          _, err = rbt.channel.QueueDeclare(
          rbt.QueueName, // 此處我們傳入的是空,則是隨機產(chǎn)生隊列的名稱
          true,
          false,
          false,
          false,
          nil,
          )
          if err != nil {
          log.Printf("rbt.channel.QueueDeclare error : %v", err)
          return
          }

          // 3、綁定隊列
          err = rbt.channel.QueueBind(
          rbt.QueueName, // 隊列名字
          rbt.Key, // routing,這里key 需要填
          rbt.Exchange, // 交換機名稱
          false, // 是否阻塞
          nil, // 其他屬性
          )
          if err != nil {
          log.Printf("rbt.channel.QueueBind error : %v", err)
          return
          }

          }

          // 生產(chǎn)消息 publish

          func (rbt *RabbitMQ) ProduceRouting(data []byte) {

          // 1、向隊列中加入數(shù)據(jù)
          err := rbt.channel.Publish(
          rbt.Exchange, // 交換機
          rbt.Key, // key
          false, // 若為true,根據(jù)自身exchange類型和routekey規(guī)則無法找到符合條件的隊列會把消息返還給發(fā)送者
          false, // 若為true,當exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有消費者,則會把消息返還給發(fā)送者
          amqp.Publishing{
          ContentType: "text/plain",
          Body: data,
          },
          )
          if err != nil {
          log.Printf("rbt.channel.Publish error : %v", err)
          return
          }

          return
          }

          // 消費消息
          func (rbt *RabbitMQ) ConsumeRoutingMsg() {

          // 4、消費數(shù)據(jù)
          msg, err := rbt.channel.Consume(
          rbt.QueueName, // 隊列名
          "", // 消費者的名字
          true, // 是否自動應答
          false, // 是否排他
          false, // 若為true,表示 不能將同一個Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中 的消費者
          false, // 是否阻塞
          nil, // 其他屬性
          )

          if err != nil {
          log.Printf("rbt.channel.Consume error : %v", err)
          return
          }


          for data := range msg {
          log.Printf("received data is %v", string(data.Body))
          }

          }
          復制代碼

          main.go

          package main

          import (
          "fmt"
          "log"
          "time"
          "xmt/xmtmq"
          )

          /*
          RabbimtMQ routing 模式 案例
          應用場景:從系統(tǒng)的代碼邏輯中獲取對應的功能字符串,將消息任務(wù)扔到對應的隊列中業(yè)務(wù)場景,例如處理錯誤,處理特定消息等
          生產(chǎn)消息
          */


          func main() {

          log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

          rbt1 := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx2",
          Key: "xmt1",
          QueueName: "Routingqueuexmt1",
          MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
          }

          xmtmq.NewRabbitMQ(rbt1)
          rbt1.Init()


          rbt2 := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx2",
          Key: "xmt2",
          QueueName: "Routingqueuexmt2",
          MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
          }

          xmtmq.NewRabbitMQ(rbt2)
          rbt2.Init()


          var index = 0

          for {
          rbt1.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt1 %d ", index)))
          log.Println("發(fā)送成功xmt1 ", index)

          rbt2.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt2 %d ", index)))
          log.Println("發(fā)送成功xmt2 ", index)


          index++
          time.Sleep(1 * time.Second)
          }


          xmtmq.RabbitMQFree(rbt1)
          xmtmq.RabbitMQFree(rbt2)

          }
          復制代碼

          consumer.go

          package main

          import (
          "log"
          "xmt/xmtmq"
          )

          func main() {

          log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

          rbt := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx2",
          Key: "xmt1",
          QueueName: "Routingqueuexmt1",
          MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
          }

          xmtmq.NewRabbitMQ(rbt)
          rbt.ConsumeRoutingMsg()
          xmtmq.RabbitMQFree(rbt)
          }
          復制代碼

          consumer2.go

          package main

          import (
          "log"
          "xmt/xmtmq"
          )

          func main() {

          log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

          rbt := &xmtmq.RabbitMQ{
          Exchange: "xmtPubEx2",
          Key: "xmt2",
          QueueName: "Routingqueuexmt2",
          MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
          }

          xmtmq.NewRabbitMQ(rbt)
          rbt.ConsumeRoutingMsg()
          xmtmq.RabbitMQFree(rbt)
          }
          復制代碼

          topic 模式

          話題模式,一個消息被多個消費者獲取,消息的目標 queue 可用 BindingKey  的通配符

          Topics 模式實際上是路由模式的一種

          他倆的最大的區(qū)別是 :Topics 模式發(fā)送消息和消費消息的時候是通過通配符去進行匹配的

          • *號代表可以同通配一個單詞

          • #號代表可以通配零個或多個單詞

          編碼的案例與上述 routing 模式保持一直,只是 exchange 為 topic類型

          如下是上述幾種模式涉及到的交換機隊列

          rpc 模式

          RPC 遠程過程調(diào)用,客戶端遠程調(diào)用服務(wù)端的方法 ,使用 MQ 可以實現(xiàn) RPC 的異步調(diào)用

          目錄結(jié)構(gòu)為:

          .
          ├── consumer.go
          ├── go.mod
          ├── go.sum
          ├── main.go
          └── xmtmq
          └── xmtmq.go
          復制代碼
          • 客戶端即是生產(chǎn)者也是消費者,向 RPC 請求隊列發(fā)送 RPC 調(diào)用消息,同時監(jiān)聽RPC響應隊列

          • 服務(wù)端監(jiān)聽RPC請求隊列的消息,收到消息后執(zhí)行服務(wù)端的方法,得到方法返回的結(jié)果

          • 服務(wù)端將RPC方法 的結(jié)果發(fā)送到RPC響應隊列。

          • 客戶端監(jiān)聽RPC響應隊列,接收到RPC調(diào)用結(jié)果

          xmtmq.go

          package xmtmq

          import (
          "github.com/streadway/amqp"
          "log"
          "math/rand"
          )

          // rpc 模式
          // 定義 RabbitMQ 的數(shù)據(jù)結(jié)構(gòu)
          // go get github.com/streadway/amqp

          type RabbitMQ struct {
          conn *amqp.Connection // 連接
          channel *amqp.Channel // 通道
          QueueName string // 隊列名
          Exchange string // 交換機
          Key string // 路由鍵
          MQUrl string // MQ的虛擬機地址
          }

          // New 一個 RabbitMQ
          func NewRabbitMQ(rbt *RabbitMQ) {
          if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" {
          log.Panic("please check QueueName,Exchange,MQUrl ...")
          }

          conn, err := amqp.Dial(rbt.MQUrl)
          if err != nil {
          log.Panicf("amqp.Dial error : %v", err)
          }
          rbt.conn = conn

          channel, err := rbt.conn.Channel()
          if err != nil {
          log.Panicf("rbt.conn.Channel error : %v", err)
          }
          rbt.channel = channel
          }

          func RabbitMQFree(rbt *RabbitMQ) {
          if rbt == nil {
          log.Printf("rbt is nil,free failed")
          return
          }
          rbt.channel.Close()
          rbt.conn.Close()
          }

          // 生產(chǎn)消息

          func (rbt *RabbitMQ) Produce(data []byte) {

          // 申請隊列
          q, err := rbt.channel.QueueDeclare(
          rbt.QueueName, // 隊列名
          true, // 是否持久化
          false, // 是否自動刪除
          false, // 是否排他
          false, // 是否阻塞
          nil, // 其他參數(shù)
          )
          if err != nil {
          log.Printf("rbt.channel.QueueDeclare error : %v", err)
          return
          }

          err = rbt.channel.Qos(1, 0, false)
          if err != nil {
          log.Printf("rbt.channel.Qos error : %v", err)
          return
          }

          d, err := rbt.channel.Consume(
          q.Name,
          "",
          false,
          false,
          false,
          false,
          nil)
          if err != nil {
          log.Printf("rbt.channel.Consume error : %v", err)
          return
          }

          for msg := range d {
          log.Println("received msg is ", string(msg.Body))
          err := rbt.channel.Publish(
          "",
          msg.ReplyTo,
          false,
          false,
          amqp.Publishing{
          ContentType: "test/plain",
          CorrelationId: msg.CorrelationId,
          Body: data,
          })
          if err != nil {
          log.Printf("rbt.channel.Publish error : %v", err)
          return
          }
          msg.Ack(false)
          log.Println("svr response ok ")
          }

          return
          }
          func randomString(l int) string {
          bytes := make([]byte, l)
          for i := 0; i < l; i++ {
          bytes[i] = byte(rand.Intn(l))
          }
          return string(bytes)
          }

          // 消費消息
          func (rbt *RabbitMQ) Consume() {

          // 申請隊列
          q, err := rbt.channel.QueueDeclare(
          "", // 隊列名
          true, // 是否持久化
          false, // 是否自動刪除
          false, // 是否排他
          false, // 是否阻塞
          nil, // 其他參數(shù)
          )
          if err != nil {
          log.Printf("rbt.channel.QueueDeclare error : %v", err)
          return
          }

          // 消費數(shù)據(jù)
          msg, err := rbt.channel.Consume(
          q.Name, // 隊列名
          "xmt", // 消費者的名字
          true, // 是否自動應答
          false, // 是否排他
          false, // 若為true,表示 不能將同一個Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中 的消費者
          false, // 是否阻塞
          nil, // 其他屬性
          )
          if err != nil {
          log.Printf("rbt.channel.Consume error : %v", err)
          return
          }
          id := randomString(32)
          err = rbt.channel.Publish(
          "",
          rbt.QueueName,
          false,
          false,
          amqp.Publishing{
          ContentType: "test/plain",
          CorrelationId: id,
          ReplyTo: q.Name,
          Body: []byte("321"),
          })
          if err != nil {
          log.Printf("rbt.channel.Publish error : %v", err)
          return
          }

          for data := range msg {
          log.Printf("received data is %v", string(data.Body))
          }
          }
          復制代碼

          main.go

          package main

          import (
          "fmt"
          "log"
          "xmt/xmtmq"
          )

          /*
          RabbimtMQ rpc 模式 案例
          應用場景:簡單消息隊列的使用,一個生產(chǎn)者一個消費者
          生產(chǎn)消息
          */


          func main() {

          log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)

          rbt := &xmtmq.RabbitMQ{
          QueueName: "xmtqueue",
          MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
          }

          xmtmq.NewRabbitMQ(rbt)

          rbt.Produce([]byte(fmt.Sprintf("hello wolrd")))

          }
          復制代碼

          consumer.go

          package main

          import (
          "log"
          "math/rand"
          "time"
          "xmt/xmtmq"
          )

          func main() {

          log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
          rand.Seed(time.Now().UTC().UnixNano())
          rbt := &xmtmq.RabbitMQ{
          QueueName: "xmtqueue",
          MQUrl: "amqp://guest:[email protected]:5672/xmtmq",
          }

          xmtmq.NewRabbitMQ(rbt)
          rbt.Consume()
          }
          復制代碼

          咱們先運行消費者,多運行幾個,可以看到咱們的隊列中已經(jīng)有數(shù)據(jù)了,咱們運行的是2個消費者,因此此處是 2

          再運行生產(chǎn)者,就能看到生產(chǎn)者將消費者發(fā)送的消息消費掉,并且通過 CorrelationId 找到對應消費者監(jiān)聽的隊列,將數(shù)據(jù)發(fā)送到隊列中

          消費者監(jiān)聽的隊列有數(shù)據(jù)了,消費者就取出來進行消費

          總結(jié)

          RabbitMQ 的六種工作模式:

          • single 模式

          • work 模式

          • publish / subscribe 模式

          • routing 模式

          • topic 模式

          • rpc 模式

          參考資料:

          RabbitMQ Tutorials

          歡迎點贊,關(guān)注,收藏

          朋友們,你的支持和鼓勵,是我堅持分享,提高質(zhì)量的動力

          好了,本次就到這里

          技術(shù)是開放的,我們的心態(tài),更應是開放的。擁抱變化,向陽而生,努力向前行。

          歡迎點贊關(guān)注收藏,下次見~


          作者:小魔童哪吒
          鏈接:https://juejin.cn/post/6985182716358557733
          來源:掘金
          著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。



          瀏覽 36
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产一区乱伦 | 999一区二区三区 | 久久R5| 含羞草一区二区三区 | 日韩在线大香蕉 |