分布式事務中使用RocketMQ的事務消息機制優(yōu)化事務的處理邏輯
點擊上方藍色字體,選擇“標星公眾號”
優(yōu)質文章,第一時間送達
1、事務消費介紹
我們經常支付寶轉賬余額寶,這是日常生活的一件普通小事,但是我們思考支付寶扣除轉賬的錢之后,如果系統(tǒng)掛掉怎么辦,這時余額寶賬戶并沒有增加相應的金額,數據就會出現不一致狀況了。
上述場景在各個類型的系統(tǒng)中都能找到相似影子,比如在電商系統(tǒng)中,當有用戶下單后,除了在訂單表插入一條記錄外,對應商品表的這個商品數量必須減1吧,怎么保證?!在搜索廣告系統(tǒng)中,當用戶點擊某廣告后,除了在點擊事件表中增加一條記錄外,還得去商家賬戶表中找到這個商家并扣除廣告費吧,怎么保證?!等等,相信大家或多或多少都能碰到相似情景。
本質上問題可以抽象為:當一個表數據更新后,怎么保證另一個表的數據也必須要更新成功。
如果是單機系統(tǒng)(數據庫實例也在同一個系統(tǒng)上)的話,我們可以用本地事務輕松解決:
還是以支付寶轉賬余額寶為例(比如轉賬10000塊錢),假設有
支付寶賬戶表:A(id,userId,amount)
余額寶賬戶表:B(id,userId,amount)
用戶的userId=1;從支付寶轉賬1萬塊錢到余額寶的動作分為兩步:
1)支付寶表扣除1萬:update A set amount=amount-10000 where userId=1;
2)余額寶表增加1萬:update B set amount=amount+10000 where userId=1;如何確保支付寶余額寶收支平衡呢?
有人說這個很簡單嘛,可以用事務解決。
Begin transaction
update A set amount=amount-10000 where userId=1;
update B set amount=amount+10000 where userId=1;
End transaction
commit;這樣確實能解決,如果你使用spring的話一個注解就能搞定上述事務功能。
@Transactional(rollbackFor=Exception.class)
public void update() {
//更新A表
updateATable();
//更新B表
updateBTable();
}如果系統(tǒng)規(guī)模較小,數據表都在一個數據庫實例上,上述本地事務方式可以很好地運行,但是如果系統(tǒng)規(guī)模較大,比如支付寶賬戶表和余額寶賬戶表顯然不會在同一個數據庫實例上,他們往往分布在不同的物理節(jié)點上,這時本地事務已經失去用武之地。
下面我們來看看比較主流的兩種方案:
2、分布式事務—————— 兩階段提交協(xié)議
兩階段提交協(xié)議(Two-phase Commit,2PC)經常被用來實現分布式事務。一般分為協(xié)調器TC和若干事務執(zhí)行者兩種角色,這里的事務執(zhí)行者就是具體的數據庫,協(xié)調器可以和事務執(zhí)行器在一臺機器上。

我們根據上面的圖來看看主要流程:
1) 我們的應用程序(client)發(fā)起一個開始請求到TC(transaction);
2) TC先將prepare消息寫到本地日志,之后向所有的Si發(fā)起prepare消息。以支付寶轉賬到余額寶為例,TC給A的prepare消息是通知支付寶數據庫相應賬目扣款1萬,TC給B的prepare消息是通知余額寶數據庫相應賬目增加1w。為什么在執(zhí)行任務前需要先寫本地日志,主要是為了故障后恢復用,本地日志起到現實生活中憑證的效果,如果沒有本地日志(憑證),出問題容易死無對證;
3) Si收到prepare消息后,執(zhí)行具體本機事務,但不會進行commit,如果成功返回yes,不成功返回no。同理,返回前都應把要返回的消息寫到日志里,當作憑證。
4) TC收集所有執(zhí)行器返回的消息,如果所有執(zhí)行器都返回yes,那么給所有執(zhí)行器發(fā)生送commit消息,執(zhí)行器收到commit后執(zhí)行本地事務的commit操作;如果有任一個執(zhí)行器返回no,那么給所有執(zhí)行器發(fā)送abort消息,執(zhí)行器收到abort消息后執(zhí)行事務abort操作。
注:TC或Si把發(fā)送或接收到的消息先寫到日志里,主要是為了故障后恢復用。如某一Si從故障中恢復后,先檢查本機的日志,如果已收到commit,則提交,如果abort則回滾。如果是yes,則再向TC詢問一下,確定下一步。如果什么都沒有,則很可能在prepare階段Si就崩潰了,因此需要回滾。
現如今實現基于兩階段提交的分布式事務也沒那么困難了,如果使用java,那么可以使用開源軟件atomikos(http://www.atomikos.com/),來快速實現。)
不過但凡使用過的上述兩階段提交的同學都可以發(fā)現性能實在是太差,根本不適合高并發(fā)的系統(tǒng)。為什么?
1)兩階段提交涉及多次節(jié)點間的網絡通信,通信時間太長!
2)事務時間相對于變長了,鎖定的資源的時間也變長了,造成資源等待時間也增加好多!正是由于分布式事務存在很嚴重的性能問題,大部分高并發(fā)服務都在避免使用,往往通過其他途徑來解決數據一致性問題。
3、使用消息隊列來避免分布式事務
如果仔細觀察生活的話,生活的很多場景已經給了我們提示。
比如在北京很有名的姚記炒肝點了炒肝并付了錢后,他們并不會直接把你點的炒肝給你,而是給你一張小票,然后讓你拿著小票到出貨區(qū)排隊去取。為什么他們要將付錢和取貨兩個動作分開呢?原因很多,其中一個很重要的原因是為了使他們接待能力增強(并發(fā)量更高)。
還是回到我們的問題,只要這張小票在,你最終是能拿到炒肝的。同理轉賬服務也是如此,當支付寶賬戶扣除1萬后,我們只要生成一個憑證(消息)即可,這個憑證(消息)上寫著“讓余額寶賬戶增加1萬”,只要這個憑證(消息)能可靠保存,我們最終是可以拿著這個憑證(消息)讓余額寶賬戶增加1萬的,即我們能依靠這個憑證(消息)完成最終一致性。
那么我們如何可靠保存憑證(消息)有兩種方法:
1)業(yè)務與消息耦合的方式
支付寶在完成扣款的同時,同時記錄消息數據,這個消息數據與業(yè)務數據保存在同一數據庫實例里(消息記錄表表名為message)。
Begin transaction
update A set amount=amount-10000 where userId=1;
insert into message(userId, amount,status) values(1, 10000, 1);
End transaction
commit;上述事務能保證只要支付寶賬戶里被扣了錢,消息一定能保存下來。
當上述事務提交成功后,我們通過實時消息服務將此消息通知余額寶,余額寶處理成功后發(fā)送回復成功消息,支付寶收到回復后刪除該條消息數據。
2)業(yè)務與消息解耦方式
上述保存消息的方式使得消息數據和業(yè)務數據緊耦合在一起,從架構上看不夠優(yōu)雅,而且容易誘發(fā)其他問題。為了解耦,可以采用以下方式。
a)支付寶在扣款事務提交之前,向實時消息服務請求發(fā)送消息,實時消息服務只記錄消息數據,而不真正發(fā)送,只有消息發(fā)送成功后才會提交事務;
b)當支付寶扣款事務被提交成功后,向實時消息服務確認發(fā)送。只有在得到確認發(fā)送指令后,實時消息服務才真正發(fā)送該消息;
c)當支付寶扣款事務提交失敗回滾后,向實時消息服務取消發(fā)送。在得到取消發(fā)送指令后,該消息將不會被發(fā)送;
d)對于那些未確認的消息或者取消的消息,需要有一個消息狀態(tài)確認系統(tǒng)定時去支付寶系統(tǒng)查詢這個消息的狀態(tài)并進行更新。為什么需要這一步驟,舉個例子:假設在第2步支付寶扣款事務被成功提交后,系統(tǒng)掛了,此時消息狀態(tài)并未被更新為“確認發(fā)送”,從而導致消息不能被發(fā)送。
優(yōu)點:消息數據獨立存儲,降低業(yè)務系統(tǒng)與消息系統(tǒng)間的耦合;
缺點:一次消息發(fā)送需要兩次請求;業(yè)務處理服務需要實現消息狀態(tài)回查接口。
4、那么如何解決消息重復投遞的問題?
還有一個很嚴重的問題就是消息重復投遞,以我們支付寶轉賬到余額寶為例,如果相同的消息被重復投遞兩次,那么我們余額寶賬戶將會增加2萬而不是1萬了(上面講順序消費是講過,這里再提一下)。
為什么相同的消息會被重復投遞?比如余額寶處理完消息msg后,發(fā)送了處理成功的消息給支付寶,正常情況下支付寶應該要刪除消息msg,但如果支付寶這時候悲劇的掛了,重啟后一看消息msg還在,就會繼續(xù)發(fā)送消息msg。
解決方法很簡單,在余額寶這邊增加消息應用狀態(tài)表(message_apply),通俗來說就是個賬本,用于記錄消息的消費情況,每次來一個消息,在真正執(zhí)行之前,先去消息應用狀態(tài)表中查詢一遍,如果找到說明是重復消息,丟棄即可,如果沒找到才執(zhí)行,同時插入到消息應用狀態(tài)表(同一事務) 。
For each msg in queue
Begin transaction
select count(*) as cnt from message_apply where msg_id=msg.msg_id;
if cnt==0 then
update B set amount=amount+10000 where userId=1;
insert into message_apply(msg_id) values(msg.msg_id);
end if
End transaction
commit;
End For為了方便大家理解,我們再來舉一個銀行轉賬的示例(和上一個例子差不多):
比如,Bob向Smith轉賬100塊。
在單機環(huán)境下,執(zhí)行事務的情況,大概是下面這個樣子:

當用戶增長到一定程度,Bob和Smith的賬戶及余額信息已經不在同一臺服務器上了,那么上面的流程就變成了這樣:

這時候你會發(fā)現,同樣是一個轉賬的業(yè)務,在集群環(huán)境下,耗時居然成倍的增長,這顯然是不能夠接受的。那如何來規(guī)避這個問題?
5、大事務 = 小事務 + 異步
將大事務拆分成多個小事務異步執(zhí)行。這樣基本上能夠將跨機事務的執(zhí)行效率優(yōu)化到與單機一致。轉賬的事務就可以分解成如下兩個小事務:

圖中執(zhí)行本地事務(Bob賬戶扣款)和發(fā)送異步消息應該保證同時成功或者同時失敗,也就是扣款成功了,發(fā)送消息一定要成功,如果扣款失敗了,就不能再發(fā)送消息。那問題是:我們是先扣款還是先發(fā)送消息呢?
首先看下先發(fā)送消息的情況,大致的示意圖如下:

存在的問題是:如果消息發(fā)送成功,但是扣款失敗,消費端就會消費此消息,進而向Smith賬戶加錢。
先發(fā)消息不行,那就先扣款吧,大致的示意圖如下:

存在的問題跟上面類似:如果扣款成功,發(fā)送消息失敗,就會出現Bob扣錢了,但是Smith賬戶未加錢。
可能大家會有很多的方法來解決這個問題,比如:直接將發(fā)消息放到Bob扣款的事務中去,如果發(fā)送失敗,拋出異常,事務回滾。這樣的處理方式也符合“恰好”不需要解決的原則。
RocketMQ支持事務消息,下面來看看RocketMQ是怎樣來實現的?

RocketMQ第一階段發(fā)送Prepared消息時,會拿到消息的地址,第二階段執(zhí)行本地事物,第三階段通過第一階段拿到的地址去訪問消息,并修改消息的狀態(tài)。
細心的你可能又發(fā)現問題了,如果確認消息發(fā)送失敗了怎么辦?RocketMQ會定期掃描消息集群中的事物消息,如果發(fā)現了Prepared消息,它會向消息發(fā)送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續(xù)發(fā)送確認消息呢?
RocketMQ會根據發(fā)送端設置的策略來決定是回滾還是繼續(xù)發(fā)送確認消息。這樣就保證了消息發(fā)送與本地事務同時成功或同時失敗。
6、Rocket事務流程處理分析
那我們來看下RocketMQ源碼,是如何處理事務消息的。
客戶端發(fā)送事務消息的部分(完整代碼請查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer)
// =============================發(fā)送事務消息的一系列準備工作========================================
// 未決事務,MQ服務器回查客戶端
// 也就是上文所說的,當RocketMQ發(fā)現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構造事務消息的生產者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設置事務決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務的處理邏輯,相當于示例中檢查Bob賬戶并扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構造MSG,省略構造參數
Message msg = new Message(......);
// 發(fā)送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();接著查看sendMessageInTransaction方法的源碼,總共分為3個階段:發(fā)送Prepared消息、執(zhí)行本地事務、發(fā)送確認消息。
// ================================事務消息的發(fā)送過程=============================================
public TransactionSendResult sendMessageInTransaction(.....) {
// 邏輯代碼,非實際代碼
// 1.發(fā)送消息
sendResult = this.send(msg);
// sendResult.getSendStatus() == SEND_OK
// 2.如果消息發(fā)送成功,處理與消息關聯的本地事務單元
LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
// 3.結束事務
this.endTransaction(sendResult, localTransactionState, localException);
}
endTransaction方法會將請求發(fā)往broker(mq server)去更新事務消息的最終狀態(tài):
根據sendResult找到Prepared消息 ,sendResult包含事務消息的ID
根據localTransaction更新消息的最終狀態(tài)
如果endTransaction方法執(zhí)行失敗,數據沒有發(fā)送到broker,導致事務消息的 狀態(tài)更新失敗,broker會有回查線程定時(默認1分鐘)掃描每個存儲事務狀態(tài)的表格文件,如果是已經提交或者回滾的消息直接跳過,如果是prepared狀態(tài)則會向Producer發(fā)起CheckTransaction請求,Producer會調用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調請求,而checkTransactionState會調用我們的事務設置的決斷方法來決定是回滾事務還是繼續(xù)執(zhí)行,最后調用endTransactionOneway讓broker來更新消息的最終狀態(tài)。
再回到轉賬的例子,如果Bob的賬戶的余額已經減少,且消息已經發(fā)送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題,解決超時問題的思路就是一直重試,直到消費端消費消息成功,整個過程中有可能會出現消息重復的問題,按照前面的思路解決即可。

消費事務消息
這樣基本上可以解決消費端超時問題,但是如果消費失敗怎么辦?阿里提供給我們的解決方法是:人工解決。大家可以考慮一下,按照事務的流程,因為某種原因Smith加款失敗,那么需要回滾整個流程。如果消息系統(tǒng)要實現這個回滾流程的話,系統(tǒng)復雜度將大大提升,且很容易出現Bug,估計出現Bug的概率會比消費失敗的概率大很多。這也是RocketMQ目前暫時沒有解決這個問題的原因,在設計實現消息系統(tǒng)時,我們需要衡量是否值得花這么大的代價來解決這樣一個出現概率非常小的問題,這也是大家在解決疑難問題時需要多多思考的地方。
我們需要注意的是,在3.2.6版本中移除了事務消息的實現,所以此版本不支持事務消息。也就是說,消息失敗不會進行檢查。
7、交易事務處理示例
下面我們來看一個簡單的例子:
消息生產者:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");
//設置用于事務消息的處理線程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setExecutorService(executorService);
//設置事務監(jiān)聽器,監(jiān)聽器實現接口org.apache.rocketmq.client.producer.TransactionListener
//監(jiān)聽器中實現需要處理的交易業(yè)務邏輯的處理,以及MQ Broker中未確認的事務與業(yè)務的確認邏輯
producer.setTransactionListener(transactionListener);
producer.start();
//生成不同的Tag,用于模擬不同的處理場景
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
//組裝產生消息
Message msg =
new Message("TopicTransaction", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//以事務發(fā)送消息,并在事務消息被成功預寫入到RocketMQ中后,執(zhí)行用戶定義的交易邏輯,
//交易邏輯執(zhí)行成功后,再實現實現業(yè)務消息的提交邏輯
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
System.out.printf("%s%n", sendResult.getTransactionId());
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
業(yè)務實現類TransactionListenerImpl:
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 該方法會在消息成功預寫入RocketMQ后被執(zhí)行
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
System.out.println("開始處理業(yè)務邏輯...");
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
switch (status) {
case 0:
//LocalTransactionState.UNKNOW表示未知的事件,需要RocketMQ進一步服務業(yè)務進行確認該交易的處理
//結果,確認消息被調用的方法為下方的checkLocalTransaction。
//注:RocketMQ與業(yè)務確認消息的執(zhí)行狀態(tài)的功能已經被移除了,在早期3.0.8的版本中有該功能,因而如果
//返回的狀態(tài)為UNKNOW,則該消息不會被提交
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
/**
* 該方法用于RocketMQ與業(yè)務確認未提交事務的消息的狀態(tài),不過該方法已經的實現在RocketMQ中已經
* 被刪除了,因而其功能也就沒有意義了。
* 不過如果使用阿里云的企業(yè)的RocketMQ服務,該功能會起作用。
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
int mod = msg.getTransactionId().hashCode() % 2;
if (null != status) {
switch (mod) {
case 0:
return LocalTransactionState.ROLLBACK_MESSAGE;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
消息消費者:
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_producer");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
* 如果非第一次啟動,那么按照上次消費的位置繼續(xù)消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTransaction", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
private Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 設置自動提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println("獲取到消息開始消費:"+msg + " , content : " + new String(msg.getBody()));
}
try {
// 模擬業(yè)務處理
TimeUnit.SECONDS.sleep(random.nextInt(5));
} catch (Exception e) {
e.printStackTrace();
//返回處理失敗,該消息后續(xù)可以繼續(xù)被消費
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
//返回處理成功,該消息就不會再次投遞過來了
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("consumer start ! ");
}
}
我們先啟動消費端,然后啟動生產端:
在運行之前,我們先來看一下,web控制臺的消息:

Topic transaction_producer中沒有未消費的消息,下面開始執(zhí)行代碼邏輯。
生產端:

通過日志可以看到其發(fā)送了四條消息,交易邏輯被調用了4次,其中只有一條消息反饋的結果為COMMIT_MESSAGE。
再次查看控制臺:

可以看到其Topic transaction_producer只有一條待消費的消息,這個和發(fā)送端只一條消息被COMMIT的結論相符合。
消費端:

啟動消費端,在控制臺只看到消費了一條消息。
生產者總共生產了四條消息,原因如下:
這就是為什么我們生產了四條消息,最后卻只消費了一條,再次確認業(yè)務實現類TransactionListenerImpl中的方法checkLocalTransaction被沒有調用。
8、如何保證扣錢與加錢的事務的最終一致性
在上面的轉賬交易邏輯中,存在兩個問題:
1)如果沒有使用RocketMQ的企業(yè)版本,那就可能會發(fā)生扣錢的事務成功了,但是扣錢的消息由于生產方發(fā)生了故障,導致交易消息沒有在扣錢的事務提交成功后往RocketMQ中確認該條消息可以被提交,就會導致該條消息不會遞交給消費方,導致Bob的錢被扣了,但是Smith的錢卻沒有增加。
2)生產方的全部邏輯都處理完成了,扣錢的事務在數據庫中被成功的提交,扣錢的消息在RocketMQ被成功的確認,但是消費方在消費消息的時候,自己本身發(fā)生了故障,或者處理該條消息發(fā)生了邏輯錯誤,導致Smith的錢沒有被正確的加上。
以上兩個問題雖然發(fā)生的機率都很低,但是只要存在著發(fā)生的機率就會一定在某個時間點發(fā)生,只是故障發(fā)生時間點的早晚問題。在金融系統(tǒng)中,每日都會跑系統(tǒng)日志執(zhí)行對賬操作,用于核對當日總共的支付與收入是否是平衡的、每個單筆交易結果是否都滿足借貸平衡等,因而為了避免以上兩個問題的發(fā)生,我的處理方式還是引入金融系統(tǒng)對賬的業(yè)務邏輯來進行處理。
其業(yè)務處理邏輯如下:

在發(fā)送交易事務消息過后,發(fā)送一個交易對賬消息到對賬Topic中,該對賬消息為非事務消息,發(fā)送成功即表示成功保存到了RocketMQ中,該交易對賬消息不會用于消費者消費,后續(xù)的交易對賬系統(tǒng)會消費該隊列中的對賬信息,其分別會和交易的生產方和消費方進行交易核對,核對邏輯如下:

交易對賬系統(tǒng)首先和交易的消費方進行核對,如果消費方消費成功,則可以說明整個交易結果滿足最終一致性,因為消息是生產者成功處理后,然后再發(fā)送的交易確認消息,因而只要產生了事務確認的交易消息,則可以肯定生產方已經正常執(zhí)行完了扣款的邏輯。
只有交易消息在消費方處理失敗或者消息方沒有消費該消息的情況下,才需要再次和生產方進行確認,如果生產方成功執(zhí)行了扣款操作,則需要回滾這筆扣款交易;如果沒有扣除成功,則表示兩邊都沒有消費這邊交易,就不用做任何操作了。
————————————————
版權聲明:本文為CSDN博主「馮立彬」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權協(xié)議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:
https://blog.csdn.net/fenglibing/article/details/92417739
鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布
??????
??長按上方微信二維碼 2 秒
感謝點贊支持下哈 
