SpringCloud微服務(wù)架構(gòu)中分布式事務(wù)解決方案,一次性給你說到爛
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
76套java從入門到精通實(shí)戰(zhàn)課程分享
說明
微服務(wù)的發(fā)展
微服務(wù)落地存在的問題
單體應(yīng)用拆分為分布式系統(tǒng)后,進(jìn)程間的通訊機(jī)制和故障處理措施變的更加復(fù)雜。
系統(tǒng)微服務(wù)化后,一個(gè)看似簡單的功能,內(nèi)部可能需要調(diào)用多個(gè)服務(wù)并操作多個(gè)數(shù)據(jù)庫實(shí)現(xiàn),服務(wù)調(diào)用的分布式事務(wù)問題變的非常突出。
微服務(wù)數(shù)量眾多,其測試、部署、監(jiān)控等都變的更加困難。
ACID
原子性(Atomicity):?一個(gè)事務(wù)的所有系列操作步驟被看成是一個(gè)動作,所有的步驟要么全部完成要么一個(gè)也不會完成,如果事務(wù)過程中任何一點(diǎn)失敗,將要被改變的數(shù)據(jù)庫記錄就不會被真正被改變。
一致性(Consistency):?數(shù)據(jù)庫的約束 級聯(lián)和觸發(fā)機(jī)制Trigger都必須滿足事務(wù)的一致性。也就是說,通過各種途徑包括外鍵約束等任何寫入數(shù)據(jù)庫的數(shù)據(jù)都是有效的,不能發(fā)生表與表之間存在外鍵約束,但是有數(shù)據(jù)卻違背這種約束性。所有改變數(shù)據(jù)庫數(shù)據(jù)的動作事務(wù)必須完成,沒有事務(wù)會創(chuàng)建一個(gè)無效數(shù)據(jù)狀態(tài),這是不同于CAP理論的一致性"consistency".
隔離性(Isolation):?主要用于實(shí)現(xiàn)并發(fā)控制, 隔離能夠確保并發(fā)執(zhí)行的事務(wù)能夠順序一個(gè)接一個(gè)執(zhí)行,通過隔離,一個(gè)未完成事務(wù)不會影響另外一個(gè)未完成事務(wù)。
持久性(Durability):?一旦一個(gè)事務(wù)被提交,它應(yīng)該持久保存,不會因?yàn)楹推渌僮鳑_突而取消這個(gè)事務(wù)。很多人認(rèn)為這意味著事務(wù)是持久在磁盤上,但是規(guī)范沒有特別定義這點(diǎn)。
一致性理論
CAP 理論
一致性:分布式環(huán)境下,多個(gè)節(jié)點(diǎn)的數(shù)據(jù)是否強(qiáng)一致。
可用性:分布式服務(wù)能一直保證可用狀態(tài)。當(dāng)用戶發(fā)出一個(gè)請求后,服務(wù)能在有限時(shí)間內(nèi)返回結(jié)果。
分區(qū)容忍性:特指對網(wǎng)絡(luò)分區(qū)的容忍性。
BASE 理論
基本可用(?Basically?Available):指分布式系統(tǒng)在出現(xiàn)故障時(shí),允許損失部分的可用性來保證核心可用;
軟狀態(tài)(?Soft state):指允許分布式系統(tǒng)存在中間狀態(tài),該中間狀態(tài)不會影響到系統(tǒng)的整體可用性;
最終一致性(?Eventual consistency):指分布式系統(tǒng)中的所有副本數(shù)據(jù)經(jīng)過一定時(shí)間后,最終能夠達(dá)到一致的狀態(tài);
原子性(A)與持久性(D)必須根本保障;
為了可用性、性能與降級服務(wù)的需要,只有降低一致性( C ) 與 隔離性( I ) 的要求;
酸堿平衡(ACID-BASE Balance);
一致性模型
強(qiáng)一致性:數(shù)據(jù)更新成功后,任意時(shí)刻所有副本中的數(shù)據(jù)都是一致的,一般采用同步的方式實(shí)現(xiàn)。
弱一致性:數(shù)據(jù)更新成功后,系統(tǒng)不承諾立即可以讀到最新寫入的值,也不承諾具體多久之后可以讀到。
最終一致性:弱一致性的一種形式,數(shù)據(jù)更新成功后,系統(tǒng)不承諾立即可以返回最新寫入的值,但是保證最終會返回上一次更新操作的值。
本地事務(wù)
在單個(gè)數(shù)據(jù)庫的本地并且限制在單個(gè)進(jìn)程內(nèi)的事務(wù)
本地事務(wù)不涉及多個(gè)數(shù)據(jù)來源
分布式事務(wù)典型方案
兩階段提交(2PC, Two Phase Commit)方案;
本地消息表 (eBay 事件隊(duì)列方案);
TCC 補(bǔ)償模式;
兩
階段型 補(bǔ)償型
異步確保型
最大努力通知型
可查詢操作
冪等操作
TCC操作
可補(bǔ)償操作
兩階段提交2PC(強(qiáng)一致性)
第一階段是表決階段,所有參與者都將本事務(wù)能否成功的信息反饋發(fā)給協(xié)調(diào)者;
第二階段是執(zhí)行階段,協(xié)調(diào)者根據(jù)所有參與者的反饋,通知所有參與者,步調(diào)一致地在所有分支上提交或者回滾;
單點(diǎn)問題:事務(wù)管理器在整個(gè)流程中扮演的角色很關(guān)鍵,如果其宕機(jī),比如在第一階段已經(jīng)完成,在第二階段正準(zhǔn)備提交的時(shí)候事務(wù)管理器宕機(jī),資源管理器就會一直阻塞,導(dǎo)致數(shù)據(jù)庫無法使用。
同步阻塞:在準(zhǔn)備就緒之后,資源管理器中的資源一直處于阻塞,直到提交完成,釋放資源。
數(shù)據(jù)不一致:兩階段提交協(xié)議雖然為分布式數(shù)據(jù)強(qiáng)一致性所設(shè)計(jì),但仍然存在數(shù)據(jù)不一致性的可能。比如:在第二階段中,假設(shè)協(xié)調(diào)者發(fā)出了事務(wù) Commit 的通知,但是因?yàn)榫W(wǎng)絡(luò)問題該通知僅被一部分參與者所收到并執(zhí)行了 Commit 操作,其余的參與者則因?yàn)闆]有收到通知一直處于阻塞狀態(tài),這時(shí)候就產(chǎn)生了數(shù)據(jù)的不一致性。
本地消息表(最終一致性)
在分布式事務(wù)操作的一方完成寫業(yè)務(wù)數(shù)據(jù)的操作之后向本地消息表發(fā)送一個(gè)消息,本地事務(wù)能保證這個(gè)消息一定會被寫入本地消息表中;
之后將本地消息表中的消息轉(zhuǎn)發(fā)到 Kafka 等消息隊(duì)列中,如果轉(zhuǎn)發(fā)成功則將消息從本地消息表中刪除,否則繼續(xù)重新轉(zhuǎn)發(fā);
消息消費(fèi)方處理這個(gè)消息,并完成自己的業(yè)務(wù)邏輯。此時(shí)如果本地事務(wù)處理成功,表明已經(jīng)處理成功了,如果處理失敗,那么就會重試執(zhí)行。如果是業(yè)務(wù)上面的失敗,可以給生產(chǎn)方發(fā)送一個(gè)業(yè)務(wù)補(bǔ)償消息,通知生產(chǎn)方進(jìn)行回滾等操作;
可靠消息的最終一致性代碼示例
DROP?TABLE?IF?EXISTS?`rp_transaction_message`;
?
CREATE?TABLE?`rp_transaction_message`?(
????`id`?VARCHAR?(50)?NOT?NULL?DEFAULT?''?COMMENT?'主鍵ID',
????`version`?INT?(11)?NOT?NULL?DEFAULT?'0'?COMMENT?'版本號',
????`editor`?VARCHAR?(100)?DEFAULT?NULL?COMMENT?'修改者',
????`creater`?VARCHAR?(100)?DEFAULT?NULL?COMMENT?'創(chuàng)建者',
????`edit_time`?datetime?DEFAULT?NULL?COMMENT?'最后修改時(shí)間',
????`create_time`?datetime?NOT?NULL?DEFAULT?'0000-00-00?00:00:00'?COMMENT?'創(chuàng)建時(shí)間',
????`message_id`?VARCHAR?(50)?NOT?NULL?DEFAULT?''?COMMENT?'消息ID',
????`message_body`?LONGTEXT?NOT?NULL?COMMENT?'消息內(nèi)容',
????`message_data_type`?VARCHAR?(50)?DEFAULT?NULL?COMMENT?'消息數(shù)據(jù)類型',
????`consumer_queue`?VARCHAR?(100)?NOT?NULL?DEFAULT?''?COMMENT?'消費(fèi)隊(duì)列',
????`message_send_times`?SMALLINT?(6)?NOT?NULL?DEFAULT?'0'?COMMENT?'消息重發(fā)次數(shù)',
????`areadly_dead`?VARCHAR?(20)?NOT?NULL?DEFAULT?''?COMMENT?'是否死亡',
????`status`?VARCHAR?(20)?NOT?NULL?DEFAULT?''?COMMENT?'狀態(tài)',
????`remark`?VARCHAR?(200)?DEFAULT?NULL?COMMENT?'備注',
????`field1`?VARCHAR?(200)?DEFAULT?NULL?COMMENT?'擴(kuò)展字段1',
????`field2`?VARCHAR?(200)?DEFAULT?NULL?COMMENT?'擴(kuò)展字段2',
????`field3`?VARCHAR?(200)?DEFAULT?NULL?COMMENT?'擴(kuò)展字段3',
????PRIMARY?KEY?(`id`),
????KEY?`AK_Key_2`?(`message_id`)
)?ENGINE?=?INNODB?DEFAULT?CHARSET?=?utf8;
?
public?interface?RpTransactionMessageService?{
?
????/**
?????*?預(yù)存儲消息.
?????*/
????public?int?saveMessageWaitingConfirm(RpTransactionMessage?rpTransactionMessage)?throws?MessageBizException;
?
????/**
?????*?確認(rèn)并發(fā)送消息.
?????*/
????public?void?confirmAndSendMessage(String?messageId)?throws?MessageBizException;
?
????/**
?????*?存儲并發(fā)送消息.
?????*/
????public?int?saveAndSendMessage(RpTransactionMessage?rpTransactionMessage)?throws?MessageBizException;
?
????/**
?????*?直接發(fā)送消息.
?????*/
????public?void?directSendMessage(RpTransactionMessage?rpTransactionMessage)?throws?MessageBizException;
?
????/**
?????*?重發(fā)消息.
?????*/
????public?void?reSendMessage(RpTransactionMessage?rpTransactionMessage)?throws?MessageBizException;
?
????/**
?????*?根據(jù)messageId重發(fā)某條消息.
?????*/
????public?void?reSendMessageByMessageId(String?messageId)?throws?MessageBizException;
?
????/**
?????*?將消息標(biāo)記為死亡消息.
?????*/
????public?void?setMessageToAreadlyDead(String?messageId)?throws?MessageBizException;
?
????/**
?????*?根據(jù)消息ID獲取消息
?????*/
????public?RpTransactionMessage?getMessageByMessageId(String?messageId)?throws?MessageBizException;
?
????/**
?????*?根據(jù)消息ID刪除消息
?????*/
????public?void?deleteMessageByMessageId(String?messageId)?throws?MessageBizException;
?
????/**
?????*?重發(fā)某個(gè)消息隊(duì)列中的全部已死亡的消息.
?????*/
????public?void?reSendAllDeadMessageByQueueName(String?queueName,?int?batchSize)?throws?MessageBizException;
?
????/**
?????*?獲取分頁數(shù)據(jù)
?????*/
????PageBean?listPage(PageParam?pageParam,?Map?paramMap)?throws?MessageBizException;
?
}
@Service("rpTransactionMessageService")
public?class?RpTransactionMessageServiceImpl?implements?RpTransactionMessageService?{
?
????private?static?final?Log?log?=?LogFactory.getLog(RpTransactionMessageServiceImpl.class);
?
????@Autowired
????private?RpTransactionMessageDao?rpTransactionMessageDao;
?
????@Autowired
????private?JmsTemplate?notifyJmsTemplate;
?
????public?int?saveMessageWaitingConfirm(RpTransactionMessage?message)?{
????????if?(message?==?null)?{
????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"保存的消息為空");
????????}
????????if?(StringUtil.isEmpty(message.getConsumerQueue()))?{
????????????throw?new?MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL,?"消息的消費(fèi)隊(duì)列不能為空?");
????????}
????????message.setEditTime(new?Date());
????????message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
????????message.setAreadlyDead(PublicEnum.NO.name());
????????message.setMessageSendTimes(0);
????????return?rpTransactionMessageDao.insert(message);
????}
?
????public?void?confirmAndSendMessage(String?messageId)?{
????????final?RpTransactionMessage?message?=?getMessageByMessageId(messageId);
????????if?(message?==?null)?{
????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"根據(jù)消息id查找的消息為空");
????????}
????????message.setStatus(MessageStatusEnum.SENDING.name());
????????message.setEditTime(new?Date());
????????rpTransactionMessageDao.update(message);
????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
????????notifyJmsTemplate.send(new?MessageCreator()?{
????????????public?Message?createMessage(Session?session)?throws?JMSException?{
????????????????return?session.createTextMessage(message.getMessageBody());
????????????}
????????});
????}
?
????public?int?saveAndSendMessage(final?RpTransactionMessage?message)?{
????????if?(message?==?null)?{
????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"保存的消息為空");
????????}
????????if?(StringUtil.isEmpty(message.getConsumerQueue()))?{
????????????throw?new?MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL,?"消息的消費(fèi)隊(duì)列不能為空?");
????????}
????????message.setStatus(MessageStatusEnum.SENDING.name());
????????message.setAreadlyDead(PublicEnum.NO.name());
????????message.setMessageSendTimes(0);
????????message.setEditTime(new?Date());
????????int?result?=?rpTransactionMessageDao.insert(message);
????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
????????notifyJmsTemplate.send(new?MessageCreator()?{
????????????public?Message?createMessage(Session?session)?throws?JMSException?{
????????????????return?session.createTextMessage(message.getMessageBody());
????????????}
????????});
????????return?result;
????}
?
????public?void?directSendMessage(final?RpTransactionMessage?message)?{
????????if?(message?==?null)?{
????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"保存的消息為空");
????????}
????????if?(StringUtil.isEmpty(message.getConsumerQueue()))?{
????????????throw?new?MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL,?"消息的消費(fèi)隊(duì)列不能為空?");
????????}
????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
????????notifyJmsTemplate.send(new?MessageCreator()?{
????????????public?Message?createMessage(Session?session)?throws?JMSException?{
????????????????return?session.createTextMessage(message.getMessageBody());
????????????}
????????});
????}
?
????public?void?reSendMessage(final?RpTransactionMessage?message)?{
????????if?(message?==?null)?{
????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"保存的消息為空");
????????}
????????if?(StringUtil.isEmpty(message.getConsumerQueue()))?{
????????????throw?new?MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL,?"消息的消費(fèi)隊(duì)列不能為空?");
????????}
????????message.addSendTimes();
????????message.setEditTime(new?Date());
????????rpTransactionMessageDao.update(message);
????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
????????notifyJmsTemplate.send(new?MessageCreator()?{
????????????public?Message?createMessage(Session?session)?throws?JMSException?{
????????????????return?session.createTextMessage(message.getMessageBody());
????????????}
????????});
????}
?
????public?void?reSendMessageByMessageId(String?messageId)?{
????????final?RpTransactionMessage?message?=?getMessageByMessageId(messageId);
????????if?(message?==?null)?{
????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"根據(jù)消息id查找的消息為空");
????????}
????????int?maxTimes?=?Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
????????if?(message.getMessageSendTimes()?>=?maxTimes)?{
????????????message.setAreadlyDead(PublicEnum.YES.name());
????????}
????????message.setEditTime(new?Date());
????????message.setMessageSendTimes(message.getMessageSendTimes()?+?1);
????????rpTransactionMessageDao.update(message);
????????notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
????????notifyJmsTemplate.send(new?MessageCreator()?{
????????????public?Message?createMessage(Session?session)?throws?JMSException?{
????????????????return?session.createTextMessage(message.getMessageBody());
????????????}
????????});
????}
?
????public?void?setMessageToAreadlyDead(String?messageId)?{
????????RpTransactionMessage?message?=?getMessageByMessageId(messageId);
????????if?(message?==?null)?{
????????????throw?new?MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL,?"根據(jù)消息id查找的消息為空");
????????}
????????message.setAreadlyDead(PublicEnum.YES.name());
????????message.setEditTime(new?Date());
????????rpTransactionMessageDao.update(message);
????}
?
????public?RpTransactionMessage?getMessageByMessageId(String?messageId)?{
????????Map?paramMap?=?new?HashMap();
????????paramMap.put("messageId",?messageId);
????????return?rpTransactionMessageDao.getBy(paramMap);
????}
?
????public?void?deleteMessageByMessageId(String?messageId)?{
????????Map?paramMap?=?new?HashMap();
????????paramMap.put("messageId",?messageId);
????????rpTransactionMessageDao.delete(paramMap);
????}
?
????@SuppressWarnings("unchecked")
????public?void?reSendAllDeadMessageByQueueName(String?queueName,?int?batchSize)?{
????????log.info("==>reSendAllDeadMessageByQueueName");
????????int?numPerPage?=?1000;
????????if?(batchSize?>?0?&&?batchSize?100)?{
????????????numPerPage?=?100;
????????}?else?if?(batchSize?>?100?&&?batchSize?5000)?{
????????????numPerPage?=?batchSize;
????????}?else?if?(batchSize?>?5000)?{
????????????numPerPage?=?5000;
????????}?else?{
????????????numPerPage?=?1000;
????????}
????????int?pageNum?=?1;
????????Map?paramMap?=?new?HashMap();
????????paramMap.put("consumerQueue",?queueName);
????????paramMap.put("areadlyDead",?PublicEnum.YES.name());
????????paramMap.put("listPageSortType",?"ASC");
????????Map?messageMap?=?new?HashMap();
????????List 與常規(guī)MQ的ACK機(jī)制對比
Producer生成消息并發(fā)送給MQ(同步、異步);
MQ接收消息并將消息數(shù)據(jù)持久化到消息存儲(持久化操作為可選配置);
MQ向Producer返回消息的接收結(jié)果(返回值、異常);
Consumer監(jiān)聽并消費(fèi)MQ中的消息;
Consumer獲取到消息后執(zhí)行業(yè)務(wù)處理;
Consumer對已成功消費(fèi)的消息向MQ進(jìn)行ACK確認(rèn)(確認(rèn)后的消息將從MQ中刪除);
public?void?test1(){
????//1?數(shù)據(jù)庫操作
????//2?發(fā)送MQ消息
}
?
public?void?test1(){
????//1?發(fā)送MQ消息
????//2?數(shù)據(jù)庫操作
}
?
@Transactional
public?void?test1(){
????//1?發(fā)送MQ消息
????//2?數(shù)據(jù)庫操作
}
?
@Transactional
public?void?test1(){
????//1?數(shù)據(jù)庫操作
????//2?發(fā)送MQ消息
}
?
常規(guī)MQ隊(duì)列消息的處理流程無法實(shí)現(xiàn)消息發(fā)送一致性;
投遞消息的流程其實(shí)就是消息的消費(fèi)流程,可細(xì)化;
TCC (Try-Confirm-Cancel)補(bǔ)償模式(最終一致性)
Try 階段主要是對業(yè)務(wù)系統(tǒng)做檢測及資源預(yù)留
Confirm 階段主要是對業(yè)務(wù)系統(tǒng)做確認(rèn)提交,Try階段執(zhí)行成功并開始執(zhí)行 Confirm階段時(shí),默認(rèn) Confirm階段是不會出錯(cuò)的。即:只要Try成功,Confirm一定成功。
Cancel 階段主要是在業(yè)務(wù)執(zhí)行錯(cuò)誤,需要回滾的狀態(tài)下執(zhí)行的業(yè)務(wù)取消,預(yù)留資源釋放。
首先在 Try 階段,要先調(diào)用遠(yuǎn)程接口把 Smith 和 Bob 的錢
給 凍結(jié)起來。在 Confirm 階段,執(zhí)行遠(yuǎn)程調(diào)用的轉(zhuǎn)賬的操作,轉(zhuǎn)賬成功進(jìn)行解凍。
如果第2步執(zhí)行成功,那么轉(zhuǎn)賬成功,如果第二步執(zhí)行失敗,則調(diào)用遠(yuǎn)程凍結(jié)接口對應(yīng)的解凍方法 (Cancel)。
可靠消息最終一致(常用)
A系統(tǒng)先發(fā)送一個(gè)prepared消息到mq,如果這個(gè)prepared消息發(fā)送失敗那么就直接取消操作別執(zhí)行了
如果這個(gè)消息發(fā)送成功過了,那么接著執(zhí)行本地事務(wù),如果成功就告訴mq發(fā)送確認(rèn)消息,如果失敗就告訴mq回滾消息
如果發(fā)送了確認(rèn)消息,那么此時(shí)B系統(tǒng)會接收到確認(rèn)消息,然后執(zhí)行本地的事務(wù)
mq會自動定時(shí)
輪詢 所有prepared消息回調(diào)你的接口,問你,這個(gè)消息是不是本地事務(wù)處理失敗了,所有沒發(fā)送確認(rèn)消息?那是繼續(xù)重試還是回滾?一般來說這里你就可以查下數(shù)據(jù)庫看之前本地事務(wù)是否執(zhí)行,如果回滾了,那么這里也回滾吧。這個(gè)就是避免可能本地事務(wù)執(zhí)行成功了,別確認(rèn)消息發(fā)送失敗了。
最大努力通知
版權(quán)聲明:本文為博主原創(chuàng)文章,遵循?CC 4.0 BY-SA?版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接和本聲明。
本文鏈接:
https://blog.csdn.net/QAQFyl/article/details/113727579
鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布
??????
??長按上方微信二維碼?2 秒
感謝點(diǎn)贊支持下哈?







