RabbitMQ實(shí)現(xiàn)分布式事務(wù),保證數(shù)據(jù)一致性
點(diǎn)擊“藍(lán)字”,關(guān)注,置頂公眾號(hào)
每日技術(shù)干貨,第一時(shí)間送達(dá)!
一、實(shí)驗(yàn)環(huán)境
Lunix系統(tǒng):Centos7.5 安裝軟件:rabbitmq 開(kāi)發(fā)工具:IDEA
二、實(shí)驗(yàn)?zāi)康?/span>
Rabbitmq實(shí)現(xiàn)多系統(tǒng)間的分布式事務(wù),保證數(shù)據(jù)一致性
三、實(shí)驗(yàn)方案
rabbitmq作為消息中間件
訂單中心和運(yùn)單中心分別作為消息的生產(chǎn)者和消息的消費(fèi)者,通過(guò)rabbitmq傳遞消息
訂單中心作為生產(chǎn)者,模擬用戶創(chuàng)建訂單,在本地持久化訂單信息,記錄消息的狀態(tài)信息,并將消息發(fā)送到rabbitmq,同時(shí)開(kāi)啟confirm機(jī)制,接收消息中間件rabbitmq的響應(yīng)信息,更新本地消息發(fā)送狀態(tài)(定時(shí)任務(wù)輪訓(xùn)消息狀態(tài)信息表,一定時(shí)間內(nèi)未發(fā)送成功的數(shù)據(jù)將再次發(fā)起推送,保證atlestonce.
運(yùn)單中心作為消費(fèi)者,消費(fèi)rabbitmq中的訂單信息,開(kāi)啟ack確認(rèn)機(jī)制,確保不遺漏訂單。并通過(guò)消息全局唯一ID保證數(shù)據(jù)的唯一性,不重復(fù)處理訂單。
四、實(shí)驗(yàn)步驟
1、消息隊(duì)列
1.1 rabbitmq安裝過(guò)程略過(guò)。。。。
1.2 創(chuàng)建訂單交換器:orderExchange

1.3 創(chuàng)建訂單隊(duì)列:orderQueue

1.4 綁定

2、數(shù)據(jù)庫(kù)準(zhǔn)備
2.1訂單表

2.2 消息發(fā)送狀態(tài)表

2.3 運(yùn)單表

3、訂單中心
3.1 訂單中心分析

利用Rabbitmq發(fā)布確認(rèn)機(jī)制(confirm),確保發(fā)送成功的數(shù)據(jù)能被通知到 做個(gè)定時(shí)任務(wù)輪訓(xùn)發(fā)送失敗以及發(fā)送后未響應(yīng)的訂單信息,重新發(fā)送。推薦:Java進(jìn)階視頻資源
3.2 編寫(xiě)代碼
3.2.1 Springboot整合rabbitmq和mysql數(shù)據(jù)庫(kù)
3.2.1.1依賴如下:
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<scope>runtimescope>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<optional>trueoptional>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-jdbcartifactId>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.17version>
dependency>
3.2.1.2配置文件內(nèi)容:
server:
port: 8080
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true
username: root
password: root123
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /
#必須配置這個(gè),生產(chǎn)者才會(huì)確認(rèn)回調(diào)
publisher-confirm-type: correlated
publisher-returns: true
#重要,手動(dòng)開(kāi)啟消費(fèi)者ACK,控制消息在MQ中的刪除、重發(fā)
listener:
simple:
acknowledge-mode: MANUAL
3.2.2 訂單中心代碼
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.beans.Transient;
/**
* @Author Lee
* @Description 訂單中心
* @Date 2020/1/30 16:57
* @Version 1.0
*/
@Slf4j
@Service
public class OrderService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void setup(){
//消息發(fā)送完成后,則回調(diào)此方法,ack代表此方法是否發(fā)送成功
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//ack為true,代表MQ已經(jīng)準(zhǔn)確收到消息
if(!ack){
return;
}
try{
String sql = "update tb_msgstatus set status = 1 where msgid = ?";
int count = jdbcTemplate.update(sql,correlationData.getId());
if(count != 1){
log.warn("本地消息表狀態(tài)修改失敗");
}
}catch (Exception e){
log.warn("本息消息表狀態(tài)修改異常",e);
}
}
});
}
/**
* 創(chuàng)建訂單信息
* @param order 訂單信息
* @throws Exception
*/
public void createOrder(JSONObject order) throws Exception {
//保存訂單信息
saveOrder(order);
//發(fā)送MQ消息,直接發(fā)送時(shí)不可靠,可能會(huì)失敗(發(fā)送后根據(jù)回執(zhí)修改狀態(tài)表,定時(shí)任務(wù)掃表讀取失敗數(shù)據(jù)重新發(fā)送)
sendMsg(order);
}
/**
* 發(fā)送訂單信息至MQ
* @param order 訂單信息
*/
private void sendMsg(JSONObject order) {
//發(fā)送消息到MQ,CorrelationData作用:當(dāng)收到消息回執(zhí)時(shí)會(huì)帶上這個(gè)參數(shù)
rabbitTemplate.convertAndSend("orderExchange","",order.toJSONString(),new CorrelationData((String) order.get("orderid")));
}
/**
* 保存訂單信息
* @param order 訂單信息
* @throws Exception
*/
@Transient
private void saveOrder(JSONObject order) throws Exception {
String sql = "insert into tb_order (orderid,userid,goodsid,ordertime) values (? , ? , ? , now())";
//保存訂單信息
int count = jdbcTemplate.update(sql,order.get("orderid"),order.get("userid"),order.get("goodsid"));
if(count != 1){
throw new Exception("訂單創(chuàng)建失敗");
}
//保存消息發(fā)送狀態(tài)
saveLocalMsg(order);
}
/**
* 記錄消息發(fā)送狀態(tài)
* @param order 訂單信息
* @throws Exception
*/
private void saveLocalMsg(JSONObject order) throws Exception {
String sql = "insert into tb_msgstatus (msgid,msg,status,sendtime) values (? , ? , 0 , now())";
//記錄消息發(fā)送狀態(tài)
int count = jdbcTemplate.update(sql,order.get("orderid"),order.toJSONString());
if(count != 1){
throw new Exception("記錄消息發(fā)送狀態(tài)失敗");
}
}
}
3.3 訂單中心測(cè)試
3.3.1 測(cè)試代碼
@Autowired
private OrderService orderService;
@Test
public void orderServiceTest() throws Exception {
//生成訂單信息
JSONObject orderinfo = new JSONObject();
orderinfo.put("orderid",UUID.randomUUID().toString());
orderinfo.put("userid",UUID.randomUUID().toString());
orderinfo.put("goodsid",UUID.randomUUID().toString());
orderService.createOrder(orderinfo);
}
3.3.2 測(cè)試驗(yàn)證結(jié)果
orderQueue消息隊(duì)列中已經(jīng)接收到數(shù)據(jù)


訂單表里的數(shù)據(jù)

狀態(tài)表數(shù)據(jù):

4、運(yùn)單中心
4.1 運(yùn)單中心分析

消費(fèi)者收到消息進(jìn)行處理,處理成功則發(fā)送ACK消息通知MQ清除該條記錄,否則通知MQ重發(fā)或者等待MQ自動(dòng)重發(fā)。本地維護(hù)一個(gè)處理次數(shù),如果多次處理仍然失敗,則將該消息丟棄或者加入到死信隊(duì)列(DLQ)中。死信隊(duì)列中的數(shù)據(jù)可以人工干預(yù)。推薦:Java進(jìn)階視頻資源
4.2 編寫(xiě)代碼
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import java.beans.Transient;
import java.io.IOException;
/**
* @Author Lee
* @Description 運(yùn)單系統(tǒng)
* @Date 2020/1/30 21:58
* @Version 1.0
*/
@Slf4j
@Service
public class DispatchService {
@Autowired
private JdbcTemplate jdbcTemplate;
@RabbitListener(queues = "orderQueue")
public void messageCunsumer(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try{
//MQ里面的數(shù)據(jù)轉(zhuǎn)換成JSON數(shù)據(jù)
JSONObject orderInfo = JSONObject.parseObject(message);
log.warn("收到MQ里面的消息:" + orderInfo.toJSONString());
Thread.sleep(1000L);
//執(zhí)行業(yè)務(wù)操作,同一個(gè)數(shù)據(jù)不能處理兩次,根據(jù)業(yè)務(wù)情況去重,保證冪等性
String orderid = orderInfo.getString("orderid");
//分配快遞員配送
dispatch(orderid);
//ack 通知MQ數(shù)據(jù)已經(jīng)收到
channel.basicAck(tag,false);
}catch (Exception e){
//異常情況,需要根據(jù)需求去重發(fā)或者丟棄
//重發(fā)一定次數(shù)后丟棄,日志告警(rabbitmq沒(méi)有設(shè)置重發(fā)次數(shù)功能,重發(fā)時(shí)需要代碼實(shí)現(xiàn),比如使用redis記錄重發(fā)次數(shù),)
channel.basicNack(tag,false,false);
//系統(tǒng)關(guān)鍵數(shù)據(jù)異常,需要人工干預(yù)
}
//如果不給確認(rèn)回復(fù),就等這個(gè)consumer斷開(kāi)連接后,MQ會(huì)繼續(xù)推送
}
/**
* 分配快遞員
* @param orderid 訂單編號(hào)
*/
@Transient
private void dispatch(String orderid) throws Exception {
String sql = "insert into tb_dispatch (orderid,courier,status) values (?,?,?)";
int count = jdbcTemplate.update(sql,orderid,"東哥","配送中");
if(count != 1){
throw new Exception("調(diào)度數(shù)據(jù)插入失敗,原因[數(shù)據(jù)庫(kù)操作]");
}
}
}
4.3 訂單中心測(cè)試
啟動(dòng)springboot后自動(dòng)監(jiān)聽(tīng)MQ中的消息隊(duì)列,自動(dòng)處理
測(cè)試結(jié)果如下:

感謝閱讀,希望對(duì)你有所幫助 :)
來(lái)源:blog.csdn.net/qq_31463999/article/details/79220061
