<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          RabbitMQ實(shí)現(xiàn)分布式事務(wù),保證數(shù)據(jù)一致性

          共 14228字,需瀏覽 29分鐘

           ·

          2021-09-12 21:28

          點(diǎn)擊“藍(lán)字”,關(guān)注,置頂公眾號(hào)

          每日技術(shù)干貨,第一時(shí)間送達(dá)!


          一、實(shí)驗(yàn)環(huán)境

          • Lunix系統(tǒng):Centos7.5
          • 安裝軟件:rabbitmq
          • 開(kāi)發(fā)工具:IDEA

          二、實(shí)驗(yàn)?zāi)康?/span>

          Rabbitmq實(shí)現(xiàn)多系統(tǒng)間的分布式事務(wù),保證數(shù)據(jù)一致性

          三、實(shí)驗(yàn)方案

          rabbitmq作為消息中間件

          訂單中心和運(yùn)單中心分別作為消息的生產(chǎn)者和消息的消費(fèi)者,通過(guò)rabbitmq傳遞消息

          訂單中心作為生產(chǎn)者,模擬用戶創(chuàng)建訂單,在本地持久化訂單信息,記錄消息的狀態(tài)信息,并將消息發(fā)送到rabbitmq,同時(shí)開(kāi)啟confirm機(jī)制,接收消息中間件rabbitmq的響應(yīng)信息,更新本地消息發(fā)送狀態(tài)(定時(shí)任務(wù)輪訓(xùn)消息狀態(tài)信息表,一定時(shí)間內(nèi)未發(fā)送成功的數(shù)據(jù)將再次發(fā)起推送,保證atlestonce.

          運(yùn)單中心作為消費(fèi)者,消費(fèi)rabbitmq中的訂單信息,開(kāi)啟ack確認(rèn)機(jī)制,確保不遺漏訂單。并通過(guò)消息全局唯一ID保證數(shù)據(jù)的唯一性,不重復(fù)處理訂單。

          四、實(shí)驗(yàn)步驟

          1、消息隊(duì)列

          1.1 rabbitmq安裝過(guò)程略過(guò)。。。。
          1.2 創(chuàng)建訂單交換器:orderExchange
          1.3 創(chuàng)建訂單隊(duì)列:orderQueue
          1.4 綁定

          2、數(shù)據(jù)庫(kù)準(zhǔn)備

          2.1訂單表
          2.2 消息發(fā)送狀態(tài)表
          2.3 運(yùn)單表

          3、訂單中心

          3.1 訂單中心分析

          利用Rabbitmq發(fā)布確認(rèn)機(jī)制(confirm),確保發(fā)送成功的數(shù)據(jù)能被通知到 做個(gè)定時(shí)任務(wù)輪訓(xùn)發(fā)送失敗以及發(fā)送后未響應(yīng)的訂單信息,重新發(fā)送。推薦:Java進(jìn)階視頻資源

          3.2 編寫(xiě)代碼

          3.2.1 Springboot整合rabbitmq和mysql數(shù)據(jù)庫(kù)

          3.2.1.1依賴如下:

          <dependency>
           <groupId>org.springframework.bootgroupId>
           <artifactId>spring-boot-starter-webartifactId>
          dependency>

          <dependency>
           <groupId>org.springframework.bootgroupId>
           <artifactId>spring-boot-starter-amqpartifactId>
          dependency>

          <dependency>
           <groupId>mysqlgroupId>
           <artifactId>mysql-connector-javaartifactId>
           <scope>runtimescope>
          dependency>

          <dependency>
           <groupId>org.projectlombokgroupId>
           <artifactId>lombokartifactId>
           <optional>trueoptional>
          dependency>

          <dependency>
           <groupId>org.springframework.bootgroupId>
           <artifactId>spring-boot-starter-jdbcartifactId>
          dependency>

          <dependency>
           <groupId>com.alibabagroupId>
           <artifactId>fastjsonartifactId>
           <version>1.2.17version>
          dependency>

          3.2.1.2配置文件內(nèi)容:

          server:
            port: 8080

          spring:
            datasource:
              driver-class-name: com.mysql.cj.jdbc.Driver
              url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
              username: root
              password: root123
            rabbitmq:
              host: localhost
              port: 5672
              username: admin
              password: admin123
              virtual-host: /
              #必須配置這個(gè),生產(chǎn)者才會(huì)確認(rèn)回調(diào)
              publisher-confirm-type: correlated
              publisher-returns: true
              #重要,手動(dòng)開(kāi)啟消費(fèi)者ACK,控制消息在MQ中的刪除、重發(fā)
              listener:
                simple:
                  acknowledge-mode: MANUAL
          3.2.2 訂單中心代碼
          import com.alibaba.fastjson.JSONObject;
          import lombok.extern.slf4j.Slf4j;
          import org.springframework.amqp.rabbit.connection.CorrelationData;
          import org.springframework.amqp.rabbit.core.RabbitTemplate;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.jdbc.core.JdbcTemplate;
          import org.springframework.stereotype.Service;

          import javax.annotation.PostConstruct;
          import java.beans.Transient;

          /**
           * @Author Lee
           * @Description 訂單中心
           * @Date 2020/1/30 16:57
           * @Version 1.0
           */

          @Slf4j
          @Service
          public class OrderService {
              @Autowired
              private JdbcTemplate jdbcTemplate;

              @Autowired
              private RabbitTemplate rabbitTemplate;

              @PostConstruct
              public void setup(){
                  //消息發(fā)送完成后,則回調(diào)此方法,ack代表此方法是否發(fā)送成功
                  rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){

                      @Override
                      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                          //ack為true,代表MQ已經(jīng)準(zhǔn)確收到消息
                          if(!ack){
                              return;
                          }

                          try{
                              String sql = "update tb_msgstatus set status = 1 where msgid = ?";
                              int count = jdbcTemplate.update(sql,correlationData.getId());
                              if(count != 1){
                                  log.warn("本地消息表狀態(tài)修改失敗");
                              }
                          }catch (Exception e){
                              log.warn("本息消息表狀態(tài)修改異常",e);
                          }
                      }
                  });
              }

              /**
               * 創(chuàng)建訂單信息
               * @param order 訂單信息
               * @throws Exception
               */

              public void createOrder(JSONObject order) throws Exception {
                  //保存訂單信息
                  saveOrder(order);

                  //發(fā)送MQ消息,直接發(fā)送時(shí)不可靠,可能會(huì)失敗(發(fā)送后根據(jù)回執(zhí)修改狀態(tài)表,定時(shí)任務(wù)掃表讀取失敗數(shù)據(jù)重新發(fā)送)
                  sendMsg(order);
              }

              /**
               * 發(fā)送訂單信息至MQ
               * @param order 訂單信息
               */

              private void sendMsg(JSONObject order) {
                  //發(fā)送消息到MQ,CorrelationData作用:當(dāng)收到消息回執(zhí)時(shí)會(huì)帶上這個(gè)參數(shù)
                  rabbitTemplate.convertAndSend("orderExchange","",order.toJSONString(),new CorrelationData((String) order.get("orderid")));
              }

              /**
               * 保存訂單信息
               * @param order 訂單信息
               * @throws Exception
               */

              @Transient
              private void saveOrder(JSONObject order) throws Exception {
                  String sql = "insert into tb_order (orderid,userid,goodsid,ordertime) values (? , ? , ? , now())";

                  //保存訂單信息
                  int count = jdbcTemplate.update(sql,order.get("orderid"),order.get("userid"),order.get("goodsid"));
                  if(count != 1){
                      throw new Exception("訂單創(chuàng)建失敗");
                  }

                  //保存消息發(fā)送狀態(tài)
                  saveLocalMsg(order);
              }

              /**
               * 記錄消息發(fā)送狀態(tài)
               * @param order 訂單信息
               * @throws Exception
               */

              private void saveLocalMsg(JSONObject order) throws Exception {
                  String sql = "insert into tb_msgstatus (msgid,msg,status,sendtime) values (? , ? , 0 , now())";

                  //記錄消息發(fā)送狀態(tài)
                  int count = jdbcTemplate.update(sql,order.get("orderid"),order.toJSONString());
                  if(count != 1){
                      throw new Exception("記錄消息發(fā)送狀態(tài)失敗");
                  }
              }
          }
          3.3 訂單中心測(cè)試

          3.3.1 測(cè)試代碼

          @Autowired
          private OrderService orderService;

          @Test
          public void orderServiceTest() throws Exception {
              //生成訂單信息
              JSONObject orderinfo = new JSONObject();
              orderinfo.put("orderid",UUID.randomUUID().toString());
              orderinfo.put("userid",UUID.randomUUID().toString());
              orderinfo.put("goodsid",UUID.randomUUID().toString());
              orderService.createOrder(orderinfo);
          }

          3.3.2 測(cè)試驗(yàn)證結(jié)果

          orderQueue消息隊(duì)列中已經(jīng)接收到數(shù)據(jù)

          訂單表里的數(shù)據(jù)

          狀態(tài)表數(shù)據(jù):

          4、運(yùn)單中心

          4.1 運(yùn)單中心分析

          消費(fèi)者收到消息進(jìn)行處理,處理成功則發(fā)送ACK消息通知MQ清除該條記錄,否則通知MQ重發(fā)或者等待MQ自動(dòng)重發(fā)。本地維護(hù)一個(gè)處理次數(shù),如果多次處理仍然失敗,則將該消息丟棄或者加入到死信隊(duì)列(DLQ)中。死信隊(duì)列中的數(shù)據(jù)可以人工干預(yù)。推薦:Java進(jìn)階視頻資源

          4.2 編寫(xiě)代碼
          import com.alibaba.fastjson.JSONObject;
          import com.rabbitmq.client.Channel;
          import lombok.extern.slf4j.Slf4j;
          import org.springframework.amqp.rabbit.annotation.RabbitListener;
          import org.springframework.amqp.support.AmqpHeaders;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.jdbc.core.JdbcTemplate;
          import org.springframework.messaging.handler.annotation.Header;
          import org.springframework.stereotype.Service;

          import java.beans.Transient;
          import java.io.IOException;


          /**
           * @Author Lee
           * @Description 運(yùn)單系統(tǒng)
           * @Date 2020/1/30 21:58
           * @Version 1.0
           */

          @Slf4j
          @Service
          public class DispatchService {
              @Autowired
              private JdbcTemplate jdbcTemplate;

              @RabbitListener(queues = "orderQueue")
              public void messageCunsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
                  try{
                      //MQ里面的數(shù)據(jù)轉(zhuǎn)換成JSON數(shù)據(jù)
                      JSONObject orderInfo = JSONObject.parseObject(message);
                      log.warn("收到MQ里面的消息:" + orderInfo.toJSONString());
                      Thread.sleep(1000L);

                      //執(zhí)行業(yè)務(wù)操作,同一個(gè)數(shù)據(jù)不能處理兩次,根據(jù)業(yè)務(wù)情況去重,保證冪等性
                      String orderid = orderInfo.getString("orderid");
                      //分配快遞員配送
                      dispatch(orderid);

                      //ack 通知MQ數(shù)據(jù)已經(jīng)收到
                      channel.basicAck(tag,false);
                  }catch (Exception e){
                      //異常情況,需要根據(jù)需求去重發(fā)或者丟棄
                      //重發(fā)一定次數(shù)后丟棄,日志告警(rabbitmq沒(méi)有設(shè)置重發(fā)次數(shù)功能,重發(fā)時(shí)需要代碼實(shí)現(xiàn),比如使用redis記錄重發(fā)次數(shù),)
                      channel.basicNack(tag,false,false);
                      //系統(tǒng)關(guān)鍵數(shù)據(jù)異常,需要人工干預(yù)
                  }
                  //如果不給確認(rèn)回復(fù),就等這個(gè)consumer斷開(kāi)連接后,MQ會(huì)繼續(xù)推送
              }

              /**
               * 分配快遞員
               * @param orderid 訂單編號(hào)
               */

              @Transient
              private void dispatch(String orderid) throws Exception {
                  String sql = "insert into tb_dispatch (orderid,courier,status) values (?,?,?)";
                  int count = jdbcTemplate.update(sql,orderid,"東哥","配送中");
                  if(count != 1){
                      throw new Exception("調(diào)度數(shù)據(jù)插入失敗,原因[數(shù)據(jù)庫(kù)操作]");
                  }
              }
          }
          4.3 訂單中心測(cè)試

          啟動(dòng)springboot后自動(dòng)監(jiān)聽(tīng)MQ中的消息隊(duì)列,自動(dòng)處理

          測(cè)試結(jié)果如下:

          感謝閱讀,希望對(duì)你有所幫助 :) 

          來(lái)源:blog.csdn.net/qq_31463999/article/details/79220061



          瀏覽 88
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  污污成人免费网站 | 夜夜撸夜夜操 | 人人看人人摸人人操天天看天天摸天天操 | 日本在线中文不卡 | 成人性生交大片免费卡看 |