萬字長文詳解Shardingsphere對XA分布式事務(wù)的支持
點(diǎn)擊上方藍(lán)色“程序猿DD”,選擇“設(shè)為星標(biāo)”
回復(fù)“資源”獲取獨(dú)家整理的學(xué)習(xí)資料!

萬字長文詳解Shardingsphere對XA分布式事務(wù)的支持
Apache ShardingSphere 是一套開源的分布式數(shù)據(jù)庫中間件解決方案組成的生態(tài)圈,它由 JDBC、Proxy 和 Sidecar(規(guī)劃中)這 3 款相互獨(dú)立,卻又能夠混合部署配合使用的產(chǎn)品組成。它們均提供標(biāo)準(zhǔn)化的數(shù)據(jù)分片、分布式事務(wù)和數(shù)據(jù)庫治理功能,可適用于如 Java 同構(gòu)、異構(gòu)語言、云原生等各種多樣化的應(yīng)用場景。
ShardingSphere 已于2020年4月16日成為 Apache 軟件基金會的頂級項(xiàng)目。
分布式系統(tǒng)CAP理論

一致性(Consistency)
一致性指?
all nodes see the same data at the same time,即更新操作成功并返回客戶端完成后,所有節(jié)點(diǎn)在同一時(shí)間的數(shù)據(jù)完全一致,不能存在中間狀態(tài)。關(guān)于一致性,如果用戶時(shí)刻看到的數(shù)據(jù)都是一致的,那么稱之為強(qiáng)一致性。如果允許存在中間狀態(tài),只要求經(jīng)過一段時(shí)間后,數(shù)據(jù)最終是一致的,則稱之為最終一致性。此外,如果允許存在部分?jǐn)?shù)據(jù)不一致,那么就稱之為弱一致性
可用性(Availability)
可用性是指系統(tǒng)提供的服務(wù)必須一直處于可用的狀態(tài),對于用戶的每一個(gè)操作請求總是能夠在有限的時(shí)間內(nèi)返回結(jié)果。
有限的時(shí)間內(nèi)是指:對于用戶的一個(gè)操作請求,系統(tǒng)必須能夠在指定的時(shí)間內(nèi)返回對應(yīng)的處理結(jié)果,如果超過了這個(gè)時(shí)間范圍,那么系統(tǒng)就被認(rèn)為是不可用的。返回結(jié)果是可用性的另一個(gè)非常重要的指標(biāo),它要求系統(tǒng)在完成對用戶請求的處理后,返回一個(gè)正常的響應(yīng)結(jié)果,不論這個(gè)結(jié)果是成功還是失敗。
分區(qū)容錯(cuò)性(Partition tolerance )
布式系統(tǒng)在遇到任何網(wǎng)絡(luò)分區(qū)故障的時(shí)候,仍然需要能夠保證對外提供滿足一致性和可用性的服務(wù),除非是整個(gè)網(wǎng)絡(luò)環(huán)境都發(fā)生了故障。
X/Open DTP模型與XA規(guī)范
X/Open,即現(xiàn)在的open group,是一個(gè)獨(dú)立的組織,主要負(fù)責(zé)制定各種行業(yè)技術(shù)標(biāo)準(zhǔn)。官網(wǎng)地址:http://www.opengroup.org/。X/Open組織主要由各大知名公司或者廠商進(jìn)行支持,這些組織不光遵循X/Open組織定義的行業(yè)技術(shù)標(biāo)準(zhǔn),也參與到標(biāo)準(zhǔn)的制定。下圖展示了open group目前主要成員(官網(wǎng)截圖):

DTP模型

應(yīng)用程序(Application Program ,簡稱AP):用于定義事務(wù)邊界(即定義事務(wù)的開始和結(jié)束),并且在事務(wù)邊界內(nèi)對資源進(jìn)行操作。
資源管理器(Resource Manager,簡稱RM,一般也稱為事務(wù)參與者):如數(shù)據(jù)庫、文件系統(tǒng)等,并提供訪問資源的方式。
事務(wù)管理器(Transaction Manager ,簡稱TM,一般也稱為事務(wù)協(xié)調(diào)者):負(fù)責(zé)分配事務(wù)唯一標(biāo)識,監(jiān)控事務(wù)的執(zhí)行進(jìn)度,并負(fù)責(zé)事務(wù)的提交、回滾等。
XA規(guī)范

這里的接口規(guī)范特別多,我們只要來講講幾個(gè)最重要的。
xa_start?: 在?RM端調(diào)用此接口開啟一個(gè)XA事務(wù),后面需要接上XID?作為參數(shù)。xa_end?: 取消當(dāng)前線程與事務(wù)的關(guān)聯(lián), 與?xa_start是配對使用。xa_prepare?: 詢問RM?是否已經(jīng)準(zhǔn)備好了提交事務(wù)。xa_commit?: 通知RM?提交事務(wù)分支。xa_rollback?: 通知RM?提交回滾事務(wù)分支。
XA二階段提交
階段一?:TM通知各個(gè)RM準(zhǔn)備提交它們的事務(wù)分支。如果RM判斷自己進(jìn)行的工作可以被提交,那就就對工作內(nèi)容進(jìn)行持久化,再給TM肯定答復(fù);要是發(fā)生了其他情況,那給TM的都是否定答復(fù)。在發(fā)送了否定答復(fù)并回滾了已經(jīng)的工作后,RM就可以丟棄這個(gè)事務(wù)分支信息。階段二?:TM根據(jù)階段1各個(gè)RM prepare的結(jié)果,決定是提交還是回滾事務(wù)。如果所有的RM都prepare成功,那么TM通知所有的RM進(jìn)行提交;如果有RM prepare失敗的話,則TM通知所有RM回滾自己的事務(wù)分支。
MySQL對XA協(xié)議的支持
MySQL?從5.0.3開始支持XA分布式事務(wù),且只有InnoDB存儲引擎支持XA事務(wù)。?MySQL?在DTP模型中也是屬于資源管理器RM。
MySQL XA 事務(wù)的 SQL語法
XA?START?xid????//開啟XA事務(wù),xid是一個(gè)唯一值,表示事務(wù)分支標(biāo)識符
XA?END?xid??//結(jié)束一個(gè)XA事務(wù),
XA?PREPARE?xid?準(zhǔn)備提交
XA?COMMIT?xid [ONE PHASE]?//提交事務(wù)。兩階段提交協(xié)議中,如果只有一個(gè)RM參與,那么可以優(yōu)化為一階段提交
XA?ROLLBACK?xid??//回滾
XA?RECOVER?[CONVERT?XID]??//列出所有處于PREPARE階段的XA事務(wù)
MySQL xid詳解
mysql中使用xid來作為一個(gè)事務(wù)分支的標(biāo)識符。通過C語言進(jìn)行描述,如下:
/?
??Transaction?branch?identification:?XID?and?NULLXID:
?/
#define?XIDDATASIZE?128??/??size?in?bytes??/
#define?MAXGTRIDSIZE?64??/??maximum?size?in?bytes?of?gtrid??/
#define?MAXBQUALSIZE?64??/??maximum?size?in?bytes?of?bqual??/
struct?xid_t?{
????long?formatID;?????/*?format?identifier?*/
????long?gtrid_length;?/*?value?1-64?*/
????long?bqual_length;?/*?value?1-64?*/
????char?data[XIDDATASIZE];
????};
/?
??A?value?of?-1?in?formatID?means?that?the?XID?is?null.
?/
typedef?struct?xid_t?XID;
/?
??Declarations?of?routines?by?which?RMs?call?TMs:
?/
extern?int?ax_reg(int,?XID??,?long);
extern?int?ax_unreg(int,?long);
gtrid?:全局事務(wù)標(biāo)識符(global transaction identifier),最大不能超過64字節(jié)。bqual?:分支限定符(branch qualifier),最大不能超過64字節(jié)。formatId:記錄gtrid、bqual的格式,類似于memcached中flags字段的作用。data?:xid的值,其是 gtrid和bqual拼接后的內(nèi)容。。
MySQL XA事務(wù)狀態(tài)

JTA規(guī)范
JTA(Java Transaction API):為J2EE平臺提供了分布式事務(wù)服務(wù)(distributed transaction)的能力。某種程度上,可以認(rèn)為JTA規(guī)范是XA規(guī)范的Java版,其把XA規(guī)范中規(guī)定的DTP模型交互接口抽象成Java接口中的方法,并規(guī)定每個(gè)方法要實(shí)現(xiàn)什么樣的功能。

JTA 定義的接口
javax.transaction.TransactionManager?: 事務(wù)管理器,負(fù)責(zé)事務(wù)的begin,?commit,rollback?等命令。javax.transaction.UserTransaction:用于聲明一個(gè)分布式事務(wù)。javax.transaction.TransactionSynchronizationRegistry:事務(wù)同步注冊javax.transaction.xa.XAResource:定義RM提供給TM操作的接口javax.transaction.xa.Xid:事務(wù)xid接口。
TM provider:
實(shí)現(xiàn)UserTransaction、TransactionManager、Transaction、TransactionSynchronizationRegistry、Synchronization、Xid接口,通過與XAResource接口交互來實(shí)現(xiàn)分布式事務(wù)。
RM provider:
XAResource接口需要由資源管理器者來實(shí)現(xiàn),XAResource接口中定義了一些方法,這些方法將會被TM進(jìn)行調(diào)用,如:
start方法:開啟事務(wù)分支
end方法:結(jié)束事務(wù)分支
prepare方法:準(zhǔn)備提交
commit方法:提交
rollback方法:回滾
recover方法:列出所有處于PREPARED狀態(tài)的事務(wù)分支
ShardingSphere對XA分布式事務(wù)的支持
ShardingSphere針對XA分布式事務(wù)的接口以及JTA規(guī)范,提供了標(biāo)準(zhǔn)的,基于SPI實(shí)現(xiàn)的org.apache.shardingsphere.transaction.spi.ShardingTransactionManager。
public?interface?ShardingTransactionManager?extends?AutoCloseable?{
????/**
?????*?Initialize?sharding?transaction?manager.
?????*
?????*?@param?databaseType?database?type
?????*?@param?resourceDataSources?resource?data?sources
?????*/
????void?init(DatabaseType?databaseType,?Collection?resourceDataSources) ;
????/**
?????*?Get?transaction?type.
?????*
?????*?@return?transaction?type
?????*/
????TransactionType?getTransactionType();
????/**
?????*?Judge?is?in?transaction?or?not.
?????*
?????*?@return?in?transaction?or?not
?????*/
????boolean?isInTransaction();
????/**
?????*?Get?transactional?connection.
?????*
?????*?@param?dataSourceName?data?source?name
?????*?@return?connection
?????*?@throws?SQLException?SQL?exception
?????*/
????Connection?getConnection(String?dataSourceName)?throws?SQLException;
????/**
?????*?Begin?transaction.
?????*/
????void?begin();
????/**
?????*?Commit?transaction.
?????*/
????void?commit();
????/**
?????*?Rollback?transaction.
?????*/
????void?rollback();
}
對于XA分布式事務(wù)的支持的具體實(shí)現(xiàn)類為 :org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager。在此類中,會調(diào)用基于SPI實(shí)現(xiàn)的org.apache.shardingsphere.transaction.xa.spi.XATransactionManager,來進(jìn)行XA事務(wù)的管理操作。
總結(jié)
我們了解了分布式事務(wù)的CAP理論,了解了X/Open的DTP模型,以及XA的接口規(guī)范,MySQL對XA協(xié)議的支持。最好我們講解了JTA的規(guī)范,以及ShardingSphere對XA事務(wù)進(jìn)行整合的時(shí)候定義的SPI接口,這些都是很重要的理論基礎(chǔ),接下來,我們將詳細(xì)來講解基于AtomkikosXATransactionManager的具體實(shí)現(xiàn),以及源碼解析。
Shardingsphere整合Atomikos對XA分布式事務(wù)的源碼解析
Atomikos(https://www.atomikos.com/),其實(shí)是一家公司的名字,提供了基于JTA規(guī)范的XA分布式事務(wù)TM的實(shí)現(xiàn)。其旗下最著名的產(chǎn)品就是事務(wù)管理器。產(chǎn)品分兩個(gè)版本:
TransactionEssentials:開源的免費(fèi)產(chǎn)品;
ExtremeTransactions:上商業(yè)版,需要收費(fèi)。
這兩個(gè)產(chǎn)品的關(guān)系如下圖所示:

ExtremeTransactions在TransactionEssentials的基礎(chǔ)上額外提供了以下功能(重要的):
支持TCC:這是一種柔性事務(wù)
支持通過RMI、IIOP、SOAP這些遠(yuǎn)程過程調(diào)用技術(shù),進(jìn)行事務(wù)傳播。
事務(wù)日志云存儲,云端對事務(wù)進(jìn)行恢復(fù),并且提供了完善的管理后臺。
org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager詳解
我們簡單的來回顧下org.apache.shardingsphere.transaction.spi.ShardingTransactionManager
public?interface?ShardingTransactionManager?extends?AutoCloseable?{
????/**
?????*?Initialize?sharding?transaction?manager.
?????*
?????*?@param?databaseType?database?type
?????*?@param?resourceDataSources?resource?data?sources
?????*/
????void?init(DatabaseType?databaseType,?Collection?resourceDataSources) ;
????/**
?????*?Get?transaction?type.
?????*
?????*?@return?transaction?type
?????*/
????TransactionType?getTransactionType();
????/**
?????*?Judge?is?in?transaction?or?not.
?????*
?????*?@return?in?transaction?or?not
?????*/
????boolean?isInTransaction();
????/**
?????*?Get?transactional?connection.
?????*
?????*?@param?dataSourceName?data?source?name
?????*?@return?connection
?????*?@throws?SQLException?SQL?exception
?????*/
????Connection?getConnection(String?dataSourceName)?throws?SQLException;
????/**
?????*?Begin?transaction.
?????*/
????void?begin();
????/**
?????*?Commit?transaction.
?????*/
????void?commit();
????/**
?????*?Rollback?transaction.
?????*/
????void?rollback();
}
我們重點(diǎn)縣關(guān)注init方法,從它的命名,你就應(yīng)該能夠看出來,這是整個(gè)框架的初始化方法,讓我們來看看它是如何進(jìn)行初始化的。
?private?final?Map?cachedDataSources?=?new?HashMap<>();
?private?final?XATransactionManager?xaTransactionManager?=?XATransactionManagerLoader.getInstance().getTransactionManager();
????@Override
????public?void?init(final?DatabaseType?databaseType,?final?Collection?resourceDataSources) ?{
????????for?(ResourceDataSource?each?:?resourceDataSources)?{
????????????cachedDataSources.put(each.getOriginalName(),?new?XATransactionDataSource(databaseType,?each.getUniqueResourceName(),?each.getDataSource(),?xaTransactionManager));
????????}
????????xaTransactionManager.init();
????}
首先SPI的方式加載XATransactionManager的具體實(shí)現(xiàn)類,這里返回的就是
org.apache.shardingsphere.transaction.xa.atomikos.manager.AtomikosTransactionManager。我們在關(guān)注下?
new XATransactionDataSource()?, 進(jìn)入?org.apache.shardingsphere.transaction.xa.jta.datasource。XATransactionDataSource類的構(gòu)造方法。
public?XATransactionDataSource(final?DatabaseType?databaseType,?final?String?resourceName,?final?DataSource?dataSource,?final?XATransactionManager?xaTransactionManager)?{
????????this.databaseType?=?databaseType;
????????this.resourceName?=?resourceName;
????????this.dataSource?=?dataSource;
????????if?(!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName()))?{
????????????//?重點(diǎn)關(guān)注?1?,返回了xaDatasource
????????????xaDataSource?=?XADataSourceFactory.build(databaseType,?dataSource);
????????????this.xaTransactionManager?=?xaTransactionManager;
????????????//?重點(diǎn)關(guān)注2?注冊資源
????????????xaTransactionManager.registerRecoveryResource(resourceName,?xaDataSource);
????????}
????}
我們重點(diǎn)來關(guān)注? XADataSourceFactory.build(databaseType, dataSource),從名字我們就可以看出,這應(yīng)該是返回JTA規(guī)范里面的XADataSource,在ShardingSphere里面很多的功能,可以從代碼風(fēng)格的命名上就能猜出來,這就是優(yōu)雅代碼(吹一波)。不多逼逼,我們進(jìn)入該方法。
public?final?class?XADataSourceFactory?{
????public?static?XADataSource?build(final?DatabaseType?databaseType,?final?DataSource?dataSource)?{
????????return?new?DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource);
????}
}
首先又是一個(gè)SPI定義的? XADataSourceDefinitionFactory,它根據(jù)不同的數(shù)據(jù)庫類型,來加載不同的方言。然后我們進(jìn)入?swap方法。
?public?XADataSource?swap(final?DataSource?dataSource)?{
????????XADataSource?result?=?createXADataSource();
????????setProperties(result,?getDatabaseAccessConfiguration(dataSource));
????????return?result;
????}
很簡明,第一步創(chuàng)建,
XADataSource,第二步給它設(shè)置屬性(包含數(shù)據(jù)的連接,用戶名密碼等),然后返回。返回?
XATransactionDataSource?類,關(guān)注xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource);?從名字可以看出,這是注冊事務(wù)恢復(fù)資源。這個(gè)我們在事務(wù)恢復(fù)的時(shí)候詳解。返回?
XAShardingTransactionManager.init()?,我們重點(diǎn)來關(guān)注:?xaTransactionManager.init();,最后進(jìn)入AtomikosTransactionManager.init()。流程圖如下:

代碼:
public?final?class?AtomikosTransactionManager?implements?XATransactionManager?{
????private?final?UserTransactionManager?transactionManager?=?new?UserTransactionManager();
????private?final?UserTransactionService?userTransactionService?=?new?UserTransactionServiceImp();
????@Override
????public?void?init()?{
????????userTransactionService.init();
????}
}
進(jìn)入 UserTransactionServiceImp.init()
private?void?initialize()?{
???????//添加恢復(fù)資源?不用關(guān)心
??for?(RecoverableResource?resource?:?resources_)?{
???Configuration.addResource?(?resource?);
??}
??for?(LogAdministrator?logAdministrator?:?logAdministrators_)?{
???Configuration.addLogAdministrator?(?logAdministrator?);
??}
?????????//注冊插件?不用關(guān)心
????????for?(TransactionServicePlugin?nxt?:?tsListeners_)?{
?????????Configuration.registerTransactionServicePlugin?(?nxt?);
??}
????????//獲取配置屬性?重點(diǎn)關(guān)心
????????ConfigProperties?configProps?=?Configuration.getConfigProperties();
????????configProps.applyUserSpecificProperties(properties_);
????????//進(jìn)行初始化
????????Configuration.init();
?}
我們重點(diǎn)關(guān)注,獲取配置屬性。最后進(jìn)入 com.atomikos.icatch.provider.imp.AssemblerImp.initializeProperties()方法。
?@Override
?public?ConfigProperties?initializeProperties()?{
???//讀取classpath下的默認(rèn)配置transactions-defaults.properties
????????Properties?defaults?=?new?Properties();
????????loadPropertiesFromClasspath(defaults,?DEFAULT_PROPERTIES_FILE_NAME);
????????//讀取classpath下,transactions.properties配置,覆蓋transactions-defaults.properties中相同key的值
????????Properties?transactionsProperties?=?new?Properties(defaults);
????????loadPropertiesFromClasspath(transactionsProperties,?TRANSACTIONS_PROPERTIES_FILE_NAME);
????????//讀取classpath下,jta.properties,覆蓋transactions-defaults.properties、transactions.properties中相同key的值
????????Properties?jtaProperties?=?new?Properties(transactionsProperties);
????????loadPropertiesFromClasspath(jtaProperties,?JTA_PROPERTIES_FILE_NAME);
????????//讀取通過java?-Dcom.atomikos.icatch.file方式指定的自定義配置文件路徑,覆蓋之前的同名配置
????????Properties?customProperties?=?new?Properties(jtaProperties);
????????loadPropertiesFromCustomFilePath(customProperties);
????????//最終構(gòu)造一個(gè)ConfigProperties對象,來表示實(shí)際要使用的配置
????????Properties?finalProperties?=?new?Properties(customProperties);
????????return?new?ConfigProperties(finalProperties);
?}
接下來重點(diǎn)關(guān)注,? Configuration.init(), 進(jìn)行初始化。
ublic?static?synchronized?boolean?init()?{
??boolean?startupInitiated?=?false;
??if?(service_?==?null)?{
???startupInitiated?=?true;
???????????//SPI方式加載插件注冊,無需過多關(guān)心
???addAllTransactionServicePluginServicesFromClasspath();
???ConfigProperties?configProperties?=?getConfigProperties();
??????????//調(diào)用插件的beforeInit方法進(jìn)行初始化話,無需過多關(guān)心
???notifyBeforeInit(configProperties);
??????????//進(jìn)行事務(wù)日志恢復(fù)的初始化,很重要,接下來詳解
???assembleSystemComponents(configProperties);
?????????//進(jìn)入系統(tǒng)注解的初始化,一般重要
???initializeSystemComponents(configProperties);
???notifyAfterInit();
???if?(configProperties.getForceShutdownOnVmExit())?{
????addShutdownHook(new?ForceShutdownHook());
???}
??}
??return?startupInitiated;
?}
我們先來關(guān)注? assembleSystemComponents(configProperties);?進(jìn)入它,進(jìn)入com.atomikos.icatch.provider.imp.AssemblerImp.assembleTransactionService()方法:
@Override
?public?TransactionServiceProvider?assembleTransactionService(
???ConfigProperties?configProperties)?{
??RecoveryLog?recoveryLog?=null;
???????//打印日志
??logProperties(configProperties.getCompletedProperties());
???????//生成唯一名字
??String?tmUniqueName?=?configProperties.getTmUniqueName();
??long?maxTimeout?=?configProperties.getMaxTimeout();
??int?maxActives?=?configProperties.getMaxActives();
??boolean?threaded2pc?=?configProperties.getThreaded2pc();
??????//SPI方式加載OltpLog?,這是最重要的擴(kuò)展地方,如果用戶沒有SPI的方式去擴(kuò)展那么就為null
??OltpLog?oltpLog?=?createOltpLogFromClasspath();
??if?(oltpLog?==?null)?{
???LOGGER.logInfo("Using?default?(local)?logging?and?recovery...");
????????????????????????//創(chuàng)建事務(wù)日志存儲資源
???Repository?repository?=?createRepository(configProperties);
???oltpLog?=?createOltpLog(repository);
???//????Assemble?recoveryLog
???recoveryLog?=?createRecoveryLog(repository);
??}
??StateRecoveryManagerImp?recoveryManager?=?new?StateRecoveryManagerImp();
??recoveryManager.setOltpLog(oltpLog);
???????????//生成唯一id生成器,以后生成XID會用的到
??UniqueIdMgr?idMgr?=?new?UniqueIdMgr?(?tmUniqueName?);
??int?overflow?=?idMgr.getMaxIdLengthInBytes()?-?MAX_TID_LENGTH;
??if?(?overflow?>?0?)?{
???//?see?case?73086
???String?msg?=?"Value?too?long?:?"?+?tmUniqueName;
???LOGGER.logFatal?(?msg?);
???throw?new?SysException(msg);
??}
??return?new?TransactionServiceImp(tmUniqueName,?recoveryManager,?idMgr,?maxTimeout,?maxActives,?!threaded2pc,?recoveryLog);
?}
我們重點(diǎn)來分析 createOltpLogFromClasspath(), 采用SPI的加載方式來獲取,默認(rèn)這里會返回?null, 什么意思呢?就是當(dāng)沒有擴(kuò)展的時(shí)候,atomikos,會創(chuàng)建框架自定義的資源,來存儲事務(wù)日志。
private?OltpLog?createOltpLogFromClasspath()?{
??OltpLog?ret?=?null;
??ServiceLoader?loader?=?ServiceLoader.load(OltpLogFactory.class,Configuration.class.getClassLoader());
??int?i?=?0;
????????for?(OltpLogFactory?l?:?loader?)?{
???ret?=?l.createOltpLog();
???i++;
??}
????????if?(i?>?1)?{
???String?msg?=?"More?than?one?OltpLogFactory?found?in?classpath?-?error?in?configuration!";
???LOGGER.logFatal(msg);
???throw?new?SysException(msg);
????????}
????????return?ret;
?}
我們跟著進(jìn)入? Repository repository = createRepository(configProperties);
?private?CachedRepository?createCoordinatorLogEntryRepository(
???ConfigProperties?configProperties)?throws?LogException?{
????????//創(chuàng)建內(nèi)存資源存儲
??InMemoryRepository?inMemoryCoordinatorLogEntryRepository?=?new?InMemoryRepository();
???????//進(jìn)行初始化
??inMemoryCoordinatorLogEntryRepository.init();
???????//創(chuàng)建使用文件存儲資源作為backup
??FileSystemRepository?backupCoordinatorLogEntryRepository?=?new?FileSystemRepository();
???????//進(jìn)行初始化
??backupCoordinatorLogEntryRepository.init();
??????//內(nèi)存與file資源進(jìn)行合并
??CachedRepository?repository?=?new?CachedRepository(inMemoryCoordinatorLogEntryRepository,?backupCoordinatorLogEntryRepository);
??repository.init();
??return?repository;
?}
這里就會創(chuàng)建出?
CachedRepository,里面包含了?InMemoryRepository?與?FileSystemRepository回到主線?
com.atomikos.icatch.config.Configuration.init(), 最后來分析下notifyAfterInit();
?private?static?void?notifyAfterInit()?{
?????????//進(jìn)行插件的初始化
??for?(TransactionServicePlugin?p?:?tsListenersList_)?{
???p.afterInit();
??}
??for?(LogAdministrator?a?:?logAdministrators_)?{
???a.registerLogControl(service_.getLogControl());
??}
?????????//設(shè)置事務(wù)恢復(fù)服務(wù),進(jìn)行事務(wù)的恢復(fù)
??for?(RecoverableResource?r?:?resourceList_?)?{
???r.setRecoveryService(recoveryService_);
??}
?}
插件的初始化會進(jìn)入 com.atomikos.icatch.jta.JtaTransactionServicePlugin.afterInit()
?public?void?afterInit()?{
??TransactionManagerImp.installTransactionManager(Configuration.getCompositeTransactionManager(),?autoRegisterResources);
??????????//如果我們自定義擴(kuò)展了?OltpLog?,這里就會返回null,如果是null,那么XaResourceRecoveryManager就是null
??RecoveryLog?recoveryLog?=?Configuration.getRecoveryLog();
??long?maxTimeout?=?Configuration.getConfigProperties().getMaxTimeout();
??if?(recoveryLog?!=?null)?{
???XaResourceRecoveryManager.installXaResourceRecoveryManager(new?DefaultXaRecoveryLog(recoveryLog,?maxTimeout),Configuration.getConfigProperties().getTmUniqueName());
??}
?}
重點(diǎn)注意?
RecoveryLog recoveryLog = Configuration.getRecoveryLog();?,如果用戶采用SPI的方式,擴(kuò)展了com.atomikos.recovery.OltpLog?,這里就會返回 null。如果是null,則不會對?XaResourceRecoveryManager?進(jìn)行初始化。回到?
notifyAfterInit(), 我們來分析?setRecoveryService。
public?void?setRecoveryService?(?RecoveryService?recoveryService?)
????????????throws?ResourceException
????{
????????if?(?recoveryService?!=?null?)?{
????????????if?(?LOGGER.isTraceEnabled()?)?LOGGER.logTrace?(?"Installing?recovery?service?on?resource?"
????????????????????+?getName?()?);
????????????this.branchIdentifier=recoveryService.getName();
????????????recover();
????????}
????}
我們進(jìn)入? recover()?方法:
?public?void?recover()?{
?????XaResourceRecoveryManager?xaResourceRecoveryManager?=?XaResourceRecoveryManager.getInstance();
????????//null?for?LogCloud?recovery
?????if?(xaResourceRecoveryManager?!=?null)?{
??????try?{
????xaResourceRecoveryManager.recover(getXAResource());
???}?catch?(Exception?e)?{
????refreshXAResource();?//cf?case?156968
???}
?????}
????}
看到最關(guān)鍵的注釋了嗎,如果用戶采用 SPI的方式,擴(kuò)展了com.atomikos.recovery.OltpLog,那么XaResourceRecoveryManager?為null,則就會進(jìn)行云端恢復(fù),反之則進(jìn)行事務(wù)恢復(fù)。事務(wù)恢復(fù)很復(fù)雜,我們會單獨(dú)來講。
到這里atomikos的基本的初始化已經(jīng)完成。
atomikos事務(wù)begin流程
我們知道,本地的事務(wù),都會有一個(gè)?trainsaction.begin, 對應(yīng)XA分布式事務(wù)來說也不另外,我們再把思路切換回XAShardingTransactionManager.begin(), 會調(diào)用com.atomikos.icatch.jta.TransactionManagerImp.begin()。流程圖如下:

代碼:
??public?void?begin?(?int?timeout?)?throws?NotSupportedException,
????????????SystemException
????{
????????CompositeTransaction?ct?=?null;
????????ResumePreviousTransactionSubTxAwareParticipant?resumeParticipant?=?null;
????????ct?=?compositeTransactionManager.getCompositeTransaction();
????????if?(?ct?!=?null?&&?ct.getProperty?(??JTA_PROPERTY_NAME?)?==?null?)?{
????????????LOGGER.logWarning?(?"JTA:?temporarily?suspending?incompatible?transaction:?"?+?ct.getTid()?+
????????????????????"?(will?be?resumed?after?JTA?transaction?ends)"?);
????????????ct?=?compositeTransactionManager.suspend();
????????????resumeParticipant?=?new?ResumePreviousTransactionSubTxAwareParticipant?(?ct?);
????????}
????????try?{
??????//創(chuàng)建事務(wù)補(bǔ)償點(diǎn)
????????????ct?=?compositeTransactionManager.createCompositeTransaction?(?(?(?long?)?timeout?)?*?1000?);
????????????if?(?resumeParticipant?!=?null?)?ct.addSubTxAwareParticipant?(?resumeParticipant?);
????????????if?(?ct.isRoot?()?&&?getDefaultSerial?()?)
????????????????ct.setSerial?();
????????????ct.setProperty?(?JTA_PROPERTY_NAME?,?"true"?);
????????}?catch?(?SysException?se?)?{
?????????String?msg?=?"Error?in?begin()";
?????????LOGGER.logError(?msg?,?se?);
????????????throw?new?ExtendedSystemException?(?msg?,?se?);
????????}
????????recreateCompositeTransactionAsJtaTransaction(ct);
????}
這里我們主要關(guān)注? compositeTransactionManager.createCompositeTransaction(),
public?CompositeTransaction?createCompositeTransaction?(?long?timeout?)?throws?SysException
????{
????????CompositeTransaction?ct?=?null?,?ret?=?null;
????????ct?=?getCurrentTx?();
????????if?(?ct?==?null?)?{
????????????ret?=?getTransactionService().createCompositeTransaction?(?timeout?);
????????????if(LOGGER.isDebugEnabled()){
?????????????LOGGER.logDebug("createCompositeTransaction?(?"?+?timeout?+?"?):?"
????????????????????+?"created?new?ROOT?transaction?with?id?"?+?ret.getTid?());
????????????}
????????}?else?{
??????????if(LOGGER.isDebugEnabled())?LOGGER.logDebug("createCompositeTransaction?(?"?+?timeout?+?"?)");
????????????ret?=?ct.createSubTransaction?();
????????}
????????Thread?thread?=?Thread.currentThread?();
????????setThreadMappings?(?ret,?thread?);
????????return?ret;
????}
創(chuàng)建了事務(wù)補(bǔ)償點(diǎn),然后把他放到了用當(dāng)前線程作為key的Map當(dāng)中,這里思考, 為啥它不用 threadLocal。
到這里atomikos的事務(wù)begin流程已經(jīng)完成。大家可能有些疑惑,begin好像什么都沒有做,XA start 也沒調(diào)用?別慌,下一節(jié)繼續(xù)來講。
XATransactionDataSource getConnection() 流程
我們都知道想要執(zhí)行SQL語句,必須要獲取到數(shù)據(jù)庫的connection。讓我們再回到?XAShardingTransactionManager.getConnection()?最后會調(diào)用到org.apache.shardingsphere.transaction.xa.jta.datasourceXATransactionDataSource.getConnection()。流程圖如下:

代碼 :
?public?Connection?getConnection()?throws?SQLException,?SystemException,?RollbackException?{
??????//先檢查是否已經(jīng)有存在的connection,這一步很關(guān)心,也是XA的關(guān)鍵,因?yàn)閄A事務(wù),必須在同一個(gè)connection
????????if?(CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName()))?{
????????????return?dataSource.getConnection();
????????}
??????//獲取數(shù)據(jù)庫連接
????????Connection?result?=?dataSource.getConnection();
??????//轉(zhuǎn)成XAConnection,其實(shí)是同一個(gè)連接
????????XAConnection?xaConnection?=?XAConnectionFactory.createXAConnection(databaseType,?xaDataSource,?result);
??????//獲取JTA事務(wù)定義接口
????????Transaction?transaction?=?xaTransactionManager.getTransactionManager().getTransaction();
????????if?(!enlistedTransactions.get().contains(transaction))?{
??????//進(jìn)行資源注冊
????????????transaction.enlistResource(new?SingleXAResource(resourceName,?xaConnection.getXAResource()));
????????????transaction.registerSynchronization(new?Synchronization()?{
????????????????@Override
????????????????public?void?beforeCompletion()?{
????????????????????enlistedTransactions.get().remove(transaction);
????????????????}
????????????????@Override
????????????????public?void?afterCompletion(final?int?status)?{
????????????????????enlistedTransactions.get().clear();
????????????????}
????????????});
????????????enlistedTransactions.get().add(transaction);
????????}
????????return?result;
????}
首先第一步很關(guān)心,尤其是對shardingsphere來說,因?yàn)樵谝粋€(gè)事務(wù)里面,會有多個(gè)SQL語句,打到相同的數(shù)據(jù)庫,所以對相同的數(shù)據(jù)庫,必須獲取同一個(gè)XAConnection,這樣才能進(jìn)行XA事務(wù)的提交與回滾。
我們接下來關(guān)心?
transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));, 會進(jìn)入com.atomikos.icatch.jta.TransactionImp.enlistResource(), 代碼太長,截取一部分。
try?{
????restx?=?(XAResourceTransaction)?res
??????.getResourceTransaction(this.compositeTransaction);
????//?next,?we?MUST?set?the?xa?resource?again,
????//?because?ONLY?the?instance?we?got?as?argument
????//?is?available?for?use?now?!
????//?older?instances?(set?in?restx?from?previous?sibling)
????//?have?connections?that?may?be?in?reuse?already
????//?->old?xares?not?valid?except?for?2pc?operations
????restx.setXAResource(xares);
????restx.resume();
???}?catch?(ResourceException?re)?{
????throw?new?ExtendedSystemException(
??????"Unexpected?error?during?enlist",?re);
???}?catch?(RuntimeException?e)?{
????throw?e;
???}
???addXAResourceTransaction(restx,?xares);
我們直接看? restx.resume();
public?synchronized?void?resume()?throws?ResourceException?{
??int?flag?=?0;
??String?logFlag?=?"";
??if?(this.state.equals(TxState.LOCALLY_DONE))?{//?reused?instance
???flag?=?XAResource.TMJOIN;
???logFlag?=?"XAResource.TMJOIN";
??}?else?if?(!this.knownInResource)?{//?new?instance
???flag?=?XAResource.TMNOFLAGS;
???logFlag?=?"XAResource.TMNOFLAGS";
??}?else
???throw?new?IllegalStateException("Wrong?state?for?resume:?"
?????+?this.state);
??try?{
???if?(LOGGER.isDebugEnabled())?{
????LOGGER.logDebug("XAResource.start?(?"?+?this.xidToHexString
??????+?"?,?"?+?logFlag?+?"?)?on?resource?"
??????+?this.resourcename
??????+?"?represented?by?XAResource?instance?"
??????+?this.xaresource);
???}
???this.xaresource.start(this.xid,?flag);
??}?catch?(XAException?xaerr)?{
???String?msg?=?interpretErrorCode(this.resourcename,?"resume",
?????this.xid,?xaerr.errorCode);
???LOGGER.logWarning(msg,?xaerr);
???throw?new?ResourceException(msg,?xaerr);
??}
??setState(TxState.ACTIVE);
??this.knownInResource?=?true;
?}
哦多尅,看見了嗎,各位,看見了? this.xaresource.start(this.xid, flag);?了嗎????,我們進(jìn)去,假設(shè)我們使用的Mysql數(shù)據(jù)庫:
?public?void?start(Xid?xid,?int?flags)?throws?XAException?{
????????StringBuilder?commandBuf?=?new?StringBuilder(300);
????????commandBuf.append("XA?START?");
????????appendXid(commandBuf,?xid);
????????switch(flags)?{
????????case?0:
????????????break;
????????case?2097152:
????????????commandBuf.append("?JOIN");
????????????break;
????????case?134217728:
????????????commandBuf.append("?RESUME");
????????????break;
????????default:
????????????throw?new?XAException(-5);
????????}
????????this.dispatchCommand(commandBuf.toString());
????????this.underlyingConnection.setInGlobalTx(true);
????}
組裝 XA start Xid?SQL語句,進(jìn)行執(zhí)行。
到這里,我們總結(jié)下,在獲取數(shù)據(jù)庫連接的時(shí)候,我們執(zhí)行了XA協(xié)議接口中的?XA start xid
atomikos事務(wù)commit流程
好了,上面我們已經(jīng)開啟了事務(wù),現(xiàn)在我們來分析下事務(wù)commit流程,我們再把視角切換回XAShardingTransactionManager.commit(),最后我們會進(jìn)入com.atomikos.icatch.imp.CompositeTransactionImp.commit()?方法。流程圖如下:

代碼:
?public?void?commit?()?throws?HeurRollbackException,?HeurMixedException,
????????????HeurHazardException,?SysException,?SecurityException,
????????????RollbackException
????{
???????//首先更新下事務(wù)日志的狀態(tài)
????????doCommit?();
????????setSiblingInfoForIncoming1pcRequestFromRemoteClient();
????????if?(?isRoot?()?)?{
?????????//真正的commit操作
??????????coordinator.terminate?(?true?);
????????}
????}
我們關(guān)注 coordinator.terminate ( true );
?protected?void?terminate?(?boolean?commit?)?throws?HeurRollbackException,
????????????HeurMixedException,?SysException,?java.lang.SecurityException,
????????????HeurCommitException,?HeurHazardException,?RollbackException,
????????????IllegalStateException
????{
?????synchronized?(?fsm_?)?{
??????if?(?commit?)?{
?????????????????????//判斷有幾個(gè)參與者,如果只有一個(gè),直接提交
???????if?(?participants_.size?()?<=?1?)?{
????????commit?(?true?);
???????}?else?{
????????????????????????????????//否則,走XA?2階段提交流程,先prepare,?再提交
????????int?prepareResult?=?prepare?();
????????//?make?sure?to?only?do?commit?if?NOT?read?only
????????if?(?prepareResult?!=?Participant.READ_ONLY?)
?????????commit?(?false?);
???????}
??????}?else?{
???????rollback?();
??????}
?????}
????}
首先會判斷參與者的個(gè)數(shù),這里我們可以理解為MySQL的database數(shù)量,如果只有一個(gè),退化成一階段,直接提交。如果有多個(gè),則走標(biāo)準(zhǔn)的XA二階段提交流程。
我們來看?
prepare ();?流程,最后會走到com.atomikos.icatch.imp.PrepareMessage.send()?--->?com.atomikos.datasource.xa.XAResourceTransaction.prepare()
int?ret?=?0;
??terminateInResource();
??if?(TxState.ACTIVE?==?this.state)?{
???//?tolerate?non-delisting?apps/servers
???suspend();
??}
??//?duplicate?prepares?can?happen?for?siblings?in?serial?subtxs!!!
??//?in?that?case,?the?second?prepare?just?returns?READONLY
??if?(this.state?==?TxState.IN_DOUBT)
???return?Participant.READ_ONLY;
??else?if?(!(this.state?==?TxState.LOCALLY_DONE))
???throw?new?SysException("Wrong?state?for?prepare:?"?+?this.state);
??try?{
???//?refresh?xaresource?for?MQSeries:?seems?to?close?XAResource?after
???//?suspend???
???testOrRefreshXAResourceFor2PC();
???if?(LOGGER.isTraceEnabled())?{
????LOGGER.logTrace("About?to?call?prepare?on?XAResource?instance:?"
??????+?this.xaresource);
???}
???ret?=?this.xaresource.prepare(this.xid);
??}?catch?(XAException?xaerr)?{
???String?msg?=?interpretErrorCode(this.resourcename,?"prepare",
?????this.xid,?xaerr.errorCode);
???if?(XAException.XA_RBBASE?<=?xaerr.errorCode
?????&&?xaerr.errorCode?<=?XAException.XA_RBEND)?{
????LOGGER.logWarning(msg,?xaerr);?//?see?case?84253
????throw?new?RollbackException(msg);
???}?else?{
????LOGGER.logError(msg,?xaerr);
????throw?new?SysException(msg,?xaerr);
???}
??}
??setState(TxState.IN_DOUBT);
??if?(ret?==?XAResource.XA_RDONLY)?{
???if?(LOGGER.isDebugEnabled())?{
????LOGGER.logDebug("XAResource.prepare?(?"?+?this.xidToHexString
??????+?"?)?returning?XAResource.XA_RDONLY?"?+?"on?resource?"
??????+?this.resourcename
??????+?"?represented?by?XAResource?instance?"
??????+?this.xaresource);
???}
???return?Participant.READ_ONLY;
??}?else?{
???if?(LOGGER.isDebugEnabled())?{
????LOGGER.logDebug("XAResource.prepare?(?"?+?this.xidToHexString
??????+?"?)?returning?OK?"?+?"on?resource?"
??????+?this.resourcename
??????+?"?represented?by?XAResource?instance?"
??????+?this.xaresource);
???}
???return?Participant.READ_ONLY?+?1;
??}
終于,我們看到了這么一句? ret = this.xaresource.prepare(this.xid);?但是等等,我們之前不是說了,XA start xid?以后要先?XA end xid?嗎?答案就在?suspend();?里面。
public?synchronized?void?suspend()?throws?ResourceException?{
??//?BugzID:?20545
??//?State?may?be?IN_DOUBT?or?TERMINATED?when?a?connection?is?closed?AFTER
??//?commit!
??//?In?that?case,?don't?call?END?again,?and?also?don't?generate?any
??//?error!
??//?This?is?required?for?some?hibernate?connection?release?strategies.
??if?(this.state.equals(TxState.ACTIVE))?{
???try?{
????if?(LOGGER.isDebugEnabled())?{
?????LOGGER.logDebug("XAResource.end?(?"?+?this.xidToHexString
???????+?"?,?XAResource.TMSUCCESS?)?on?resource?"
???????+?this.resourcename
???????+?"?represented?by?XAResource?instance?"
???????+?this.xaresource);
????}
?????????????????//執(zhí)行了?xa?end?語句
????this.xaresource.end(this.xid,?XAResource.TMSUCCESS);
???}?catch?(XAException?xaerr)?{
????String?msg?=?interpretErrorCode(this.resourcename,?"end",
??????this.xid,?xaerr.errorCode);
????if?(LOGGER.isTraceEnabled())
?????LOGGER.logTrace(msg,?xaerr);
????//?don't?throw:?fix?for?case?102827
???}
???setState(TxState.LOCALLY_DONE);
??}
?}
到了這里,我們已經(jīng)執(zhí)行了 XA start xid -> XA end xid --> XA prepare xid, 接下來就是最后一步 commit
我們再回到? terminate(false)?方法,來看 commit()流程。其實(shí)和 prepare流程一樣,最后會走到?com.atomikos.datasource.xa.XAResourceTransaction.commit()。commit執(zhí)行完,數(shù)據(jù)提交
//繁雜代碼過多,就顯示核心的
this.xaresource.commit(this.xid,?onePhase);
思考:這里的參與者提交是在一個(gè)循環(huán)里面,一個(gè)一個(gè)提交的,如果之前的提交了,后面的參與者提交的時(shí)候,掛了,就會造成數(shù)據(jù)的不一致性。
Atomikos rollback() 流程

上面我們已經(jīng)分析了commit流程,其實(shí)rollback流程和commit流程一樣,我們在把目光切換回?org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager.rollback()?,最后會執(zhí)行到com.atomikos.icatch.imp.CompositeTransactionImp.rollback()。
????public?void?rollback?()?throws?IllegalStateException,?SysException
????{
????????//清空資源,更新事務(wù)日志狀態(tài)等
?????doRollback?();
????????if?(?isRoot?()?)?{
????????????try?{
????????????????coordinator.terminate?(?false?);
????????????}?catch?(?Exception?e?)?{
????????????????throw?new?SysException?(?"Unexpected?error?in?rollback:?"?+?e.getMessage?(),?e?);
????????????}
????????}
????}
重點(diǎn)關(guān)注? coordinator.terminate ( false );?,這個(gè)和 commit流程是一樣的,只不過在 commit流程里面,參數(shù)傳的是true。
?protected?void?terminate?(?boolean?commit?)?throws?HeurRollbackException,
????????????HeurMixedException,?SysException,?java.lang.SecurityException,
????????????HeurCommitException,?HeurHazardException,?RollbackException,
????????????IllegalStateException
????{
?????synchronized?(?fsm_?)?{
??????if?(?commit?)?{
???????if?(?participants_.size?()?<=?1?)?{
????????commit?(?true?);
???????}?else?{
????????int?prepareResult?=?prepare?();
????????//?make?sure?to?only?do?commit?if?NOT?read?only
????????if?(?prepareResult?!=?Participant.READ_ONLY?)
?????????commit?(?false?);
???????}
??????}?else?{
?????????????????//如果是false,走的是rollback
???????rollback?();
??????}
?????}
????}
我們重點(diǎn)關(guān)注? rollback()?,最后會走到com.atomikos.datasource.xa.XAResourceTransaction.rollback()。
public?synchronized?void?rollback()
???throws?HeurCommitException,?HeurMixedException,
???HeurHazardException,?SysException?{
??terminateInResource();
??if?(rollbackShouldDoNothing())?{
???return;
??}
??if?(this.state.equals(TxState.TERMINATED))?{
???return;
??}
??if?(this.state.equals(TxState.HEUR_MIXED))
???throw?new?HeurMixedException();
??if?(this.state.equals(TxState.HEUR_COMMITTED))
???throw?new?HeurCommitException();
??if?(this.xaresource?==?null)?{
???throw?new?HeurHazardException("XAResourceTransaction?"
?????+?getXid()?+?":?no?XAResource?to?rollback?");
??}
??try?{
???if?(this.state.equals(TxState.ACTIVE))?{?//?first?suspend?xid
????suspend();
???}
???//?refresh?xaresource?for?MQSeries:?seems?to?close?XAResource?after
???//?suspend???
???testOrRefreshXAResourceFor2PC();
???if?(LOGGER.isDebugEnabled())?{
????LOGGER.logDebug("XAResource.rollback?(?"?+?this.xidToHexString
??????+?"?)?"?+?"on?resource?"?+?this.resourcename
??????+?"?represented?by?XAResource?instance?"
??????+?this.xaresource);
???}
???this.xaresource.rollback(this.xid);
先在supend()方法里面執(zhí)行了?XA end xid?語句, 接下來執(zhí)行?this.xaresource.rollback(this.xid);?進(jìn)行數(shù)據(jù)的回滾。
Atomikos-recover 流程
說事務(wù)恢復(fù)流程之前,我們來討論下,會啥會出現(xiàn)事務(wù)恢復(fù)?XA二階段提交協(xié)議不是強(qiáng)一致性的嗎?要解答這個(gè)問題,我們就要來看看XA二階段協(xié)議有什么問題?
問題一 :單點(diǎn)故障
由于協(xié)調(diào)者的重要性,一旦協(xié)調(diào)者TM發(fā)生故障。參與者RM會一直阻塞下去。尤其在第二階段,協(xié)調(diào)者發(fā)生故障,那么所有的參與者還都處于鎖定事務(wù)資源的狀態(tài)中,而無法繼續(xù)完成事務(wù)操作。(如果是協(xié)調(diào)者掛掉,可以重新選舉一個(gè)協(xié)調(diào)者,但是無法解決因?yàn)閰f(xié)調(diào)者宕機(jī)導(dǎo)致的參與者處于阻塞狀態(tài)的問題)
問題二 :數(shù)據(jù)不一致
數(shù)據(jù)不一致。在二階段提交的階段二中,當(dāng)協(xié)調(diào)者向參與者發(fā)送commit請求之后,發(fā)生了局部網(wǎng)絡(luò)異?;蛘咴诎l(fā)送commit請求過程中協(xié)調(diào)者發(fā)生了故障,這回導(dǎo)致只有一部分參與者接受到了commit請求。而在這部分參與者接到commit請求之后就會執(zhí)行commit操作。但是其他部分未接到commit請求的機(jī)器則無法執(zhí)行事務(wù)提交。于是整個(gè)分布式系統(tǒng)便出現(xiàn)了數(shù)據(jù)不一致性的現(xiàn)象。
如何解決?
解決的方案簡單,就是我們在事務(wù)的操作的每一步,我們都需要對事務(wù)狀態(tài)的日志進(jìn)行人為的記錄,我們可以把日志記錄存儲在我們想存儲的地方,可以是本地存儲,也可以中心化的存儲。atomikos的開源版本,我們之前也分析了,它是使用內(nèi)存 + file的方式,存儲在本地,這樣的話,如果在一個(gè)集群系統(tǒng)里面,如果有節(jié)點(diǎn)宕機(jī),日志又存儲在本地,所以事務(wù)不能及時(shí)的恢復(fù)(需要重啟服務(wù))。
Atomikos 多場景下事務(wù)恢復(fù)。
Atomikos 提供了二種方式,來應(yīng)對不同場景下的異常情況。
場景一:服務(wù)節(jié)點(diǎn)不宕機(jī),因?yàn)槠渌脑?,產(chǎn)生需要事務(wù)恢復(fù)的情況。這個(gè)時(shí)候才要定時(shí)任務(wù)進(jìn)行恢復(fù)。具體的代碼? com.atomikos.icatch.imp.TransactionServiceImp.init()?方法,會初始化一個(gè)定時(shí)任務(wù),進(jìn)行事務(wù)的恢復(fù)。
public?synchronized?void?init?(?Properties?properties?)?throws?SysException
????{
????????shutdownInProgress_?=?false;
????????control_?=?new?com.atomikos.icatch.admin.imp.LogControlImp?(?(AdminLog)?this.recoveryLog?);
??ConfigProperties?configProperties?=?new?ConfigProperties(properties);
??long?recoveryDelay?=?configProperties.getRecoveryDelay();
????????recoveryTimer?=?new?PooledAlarmTimer(recoveryDelay);
????????recoveryTimer.addAlarmTimerListener(new?AlarmTimerListener()?{
???@Override
???public?void?alarm(AlarmTimer?timer)?{
????//進(jìn)行事務(wù)恢復(fù)
????performRecovery();
???}
??});
????????TaskManager.SINGLETON.executeTask(recoveryTimer);
????????initialized_?=?true;
????}
最終會進(jìn)入 com.atomikos.datasource.xa.XATransactionalResource.recover()?方法。
???public?void?recover()?{
?????XaResourceRecoveryManager?xaResourceRecoveryManager?=?XaResourceRecoveryManager.getInstance();
?????if?(xaResourceRecoveryManager?!=?null)?{?//null?for?LogCloud?recovery
??????try?{
????xaResourceRecoveryManager.recover(getXAResource());
???}?catch?(Exception?e)?{
????refreshXAResource();?//cf?case?156968
???}
?????}
????}
場景二: 當(dāng)服務(wù)節(jié)點(diǎn)宕機(jī)重啟動(dòng)過程中進(jìn)行事務(wù)的恢復(fù)。具體實(shí)現(xiàn)在 com.atomikos.datasource.xa.XATransactionalResource.setRecoveryService()方法里面
?@Override
?public?void?setRecoveryService?(?RecoveryService?recoveryService?)
????????????throws?ResourceException
????{
????????if?(?recoveryService?!=?null?)?{
????????????if?(?LOGGER.isTraceEnabled()?)?LOGGER.logTrace?(?"Installing?recovery?service?on?resource?"
????????????????????+?getName?()?);
????????????this.branchIdentifier=recoveryService.getName();
?????????//進(jìn)行事務(wù)恢復(fù)
????????????recover();
????????}
????}
com.atomikos.datasource.xa.XATransactionalResource.recover() 流程詳解。

主代碼:
?public?void?recover(XAResource?xaResource)?throws?XAException?{
??????//?根據(jù)XA?recovery?協(xié)議獲取?xid
??List?xidsToRecover?=?retrievePreparedXidsFromXaResource(xaResource);
??Collection?xidsToCommit;
??try?{
????????????//?xid?與日志記錄的xid進(jìn)行匹配
???xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();
???for?(XID?xid?:?xidsToRecover)?{
????if?(xidsToCommit.contains(xid))?{
????????????//執(zhí)行?XA?commit?xid?進(jìn)行提交
?????replayCommit(xid,?xaResource);
????}?else?{
?????attemptPresumedAbort(xid,?xaResource);
????}
???}
??}?catch?(LogException?couldNotRetrieveCommittingXids)?{
???LOGGER.logWarning("Transient?error?while?recovering?-?will?retry?later...",?couldNotRetrieveCommittingXids);
??}
?}
我們來看一下如何根據(jù)? XA recovery 協(xié)議獲取RM端存儲的xid。進(jìn)入方法?retrievePreparedXidsFromXaResource(xaResource), 最后進(jìn)入?com.atomikos.datasource.xa.RecoveryScan.recoverXids()方法。
public?static?List?recoverXids(XAResource?xaResource,?XidSelector?selector)?throws?XAException? {
??List?ret?=?new?ArrayList();
????????boolean?done?=?false;
????????int?flags?=?XAResource.TMSTARTRSCAN;
????????Xid[]?xidsFromLastScan?=?null;
????????List?allRecoveredXidsSoFar?=?new?ArrayList();
????????do?{
?????????xidsFromLastScan?=?xaResource.recover(flags);
????????????flags?=?XAResource.TMNOFLAGS;
????????????done?=?(xidsFromLastScan?==?null?||?xidsFromLastScan.length?==?0);
????????????if?(!done)?{
????????????????//?TEMPTATIVELY?SET?done?TO?TRUE
????????????????//?TO?TOLERATE?ORACLE?8.1.7?INFINITE
????????????????//?LOOP?(ALWAYS?RETURNS?SAME?RECOVER
????????????????//?SET).?IF?A?NEW?SET?OF?XIDS?IS?RETURNED
????????????????//?THEN?done?WILL?BE?RESET?TO?FALSE
????????????????done?=?true;
????????????????for?(?int?i?=?0;?i??????????????????XID?xid?=?new?XID?(?xidsFromLastScan[i]?);
????????????????????//?our?own?XID?implements?equals?and?hashCode?properly
????????????????????if?(!allRecoveredXidsSoFar.contains(xid))?{
????????????????????????//?a?new?xid?is?returned?->?we?can?not?be?in?a?recovery?loop?->?go?on
????????????????????????allRecoveredXidsSoFar.add(xid);
????????????????????????done?=?false;
????????????????????????if?(selector.selects(xid))?{
?????????????????????????ret.add(xid);
????????????????????????}
????????????????????}
????????????????}
????????????}
????????}?while?(!done);
??return?ret;
?}
我們重點(diǎn)關(guān)注 xidsFromLastScan = xaResource.recover(flags);?這個(gè)方法,如果我們使用MySQL,那么久會進(jìn)入 MysqlXAConnection.recover()方法。執(zhí)行?XA recovery xid?語句來獲取 xid。
?protected?static?Xid[]?recover(Connection?c,?int?flag)?throws?XAException?{
????????/*
?????????*?The?XA?RECOVER?statement?returns?information?for?those?XA?transactions?on?the?MySQL?server?that?are?in?the?PREPARED?state.?(See?Section?13.4.7.2,????XA
?????????*?Transaction?States???.)?The?output?includes?a?row?for?each?such?XA?transaction?on?the?server,?regardless?of?which?client?started?it.
?????????*
?????????*?XA?RECOVER?output?rows?look?like?this?(for?an?example?xid?value?consisting?of?the?parts?'abc',?'def',?and?7):
?????????*
?????????*?mysql>?XA?RECOVER;
?????????*?+----------+--------------+--------------+--------+
?????????*?|?formatID?|?gtrid_length?|?bqual_length?|?data?|
?????????*?+----------+--------------+--------------+--------+
?????????*?|?7?|?3?|?3?|?abcdef?|
?????????*?+----------+--------------+--------------+--------+
?????????*
?????????*?The?output?columns?have?the?following?meanings:
?????????*
?????????*?formatID?is?the?formatID?part?of?the?transaction?xid
?????????*?gtrid_length?is?the?length?in?bytes?of?the?gtrid?part?of?the?xid
?????????*?bqual_length?is?the?length?in?bytes?of?the?bqual?part?of?the?xid
?????????*?data?is?the?concatenation?of?the?gtrid?and?bqual?parts?of?the?xid
?????????*/
????????boolean?startRscan?=?((flag?&?TMSTARTRSCAN)?>?0);
????????boolean?endRscan?=?((flag?&?TMENDRSCAN)?>?0);
????????if?(!startRscan?&&?!endRscan?&&?flag?!=?TMNOFLAGS)?{
????????????throw?new?MysqlXAException(XAException.XAER_INVAL,?Messages.getString("MysqlXAConnection.001"),?null);
????????}
????????//
????????//?We?return?all?recovered?XIDs?at?once,?so?if?not??TMSTARTRSCAN,?return?no?new?XIDs
????????//
????????//?We?don't?attempt?to?maintain?state?to?check?for?TMNOFLAGS?"outside"?of?a?scan
????????//
????????if?(!startRscan)?{
????????????return?new?Xid[0];
????????}
????????ResultSet?rs?=?null;
????????Statement?stmt?=?null;
????????List?recoveredXidList?=?new?ArrayList();
????????try?{
????????????//?TODO:?Cache?this?for?lifetime?of?XAConnection
????????????stmt?=?c.createStatement();
????????????rs?=?stmt.executeQuery("XA?RECOVER");
????????????while?(rs.next())?{
????????????????final?int?formatId?=?rs.getInt(1);
????????????????int?gtridLength?=?rs.getInt(2);
????????????????int?bqualLength?=?rs.getInt(3);
????????????????byte[]?gtridAndBqual?=?rs.getBytes(4);
????????????????final?byte[]?gtrid?=?new?byte[gtridLength];
????????????????final?byte[]?bqual?=?new?byte[bqualLength];
????????????????if?(gtridAndBqual.length?!=?(gtridLength?+?bqualLength))?{
????????????????????throw?new?MysqlXAException(XAException.XA_RBPROTO,?Messages.getString("MysqlXAConnection.002"),?null);
????????????????}
????????????????System.arraycopy(gtridAndBqual,?0,?gtrid,?0,?gtridLength);
????????????????System.arraycopy(gtridAndBqual,?gtridLength,?bqual,?0,?bqualLength);
????????????????recoveredXidList.add(new?MysqlXid(gtrid,?bqual,?formatId));
????????????}
????????}?catch?(SQLException?sqlEx)?{
????????????throw?mapXAExceptionFromSQLException(sqlEx);
????????}?finally?{
????????????if?(rs?!=?null)?{
????????????????try?{
????????????????????rs.close();
????????????????}?catch?(SQLException?sqlEx)?{
????????????????????throw?mapXAExceptionFromSQLException(sqlEx);
????????????????}
????????????}
????????????if?(stmt?!=?null)?{
????????????????try?{
????????????????????stmt.close();
????????????????}?catch?(SQLException?sqlEx)?{
????????????????????throw?mapXAExceptionFromSQLException(sqlEx);
????????????????}
????????????}
????????}
????????int?numXids?=?recoveredXidList.size();
????????Xid[]?asXids?=?new?Xid[numXids];
????????Object[]?asObjects?=?recoveredXidList.toArray();
????????for?(int?i?=?0;?i?????????????asXids[i]?=?(Xid)?asObjects[i];
????????}
????????return?asXids;
????}
這里要注意如果Mysql的版本 <5.7.7 ,則不會有任何數(shù)據(jù),在以后的版本中Mysql進(jìn)行了修復(fù),因此如果我們想要使用MySQL充當(dāng)RM,版本必須 >= 5.7.7?,原因是:
MySQL 5.6版本在客戶端退出的時(shí)候,自動(dòng)把已經(jīng)prepare的事務(wù)回滾了,那么MySQL為什么要這樣做?這主要取決于MySQL的內(nèi)部實(shí)現(xiàn),MySQL 5.7以前的版本,對于prepare的事務(wù),MySQL是不會記錄binlog的(官方說是減少fsync,起到了優(yōu)化的作用)。只有當(dāng)分布式事務(wù)提交的時(shí)候才會把前面的操作寫入binlog信息,所以對于binlog來說,分布式事務(wù)與普通的事務(wù)沒有區(qū)別,而prepare以前的操作信息都保存在連接的IO_CACHE中,如果這個(gè)時(shí)候客戶端退出了,以前的binlog信息都會被丟失,再次重連后允許提交的話,會造成Binlog丟失,從而造成主從數(shù)據(jù)的不一致,所以官方在客戶端退出的時(shí)候直接把已經(jīng)prepare的事務(wù)都回滾了!
回到主線再從自己記錄的事務(wù)日志里面獲取XID
??Collection?xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();
我們來看下獲取事務(wù)日志里面的XID的 retrieveExpiredCommittingXidsFromLog()方法。然后進(jìn)入com.atomikos.recovery.imp.RecoveryLogImp.getCommittingParticipants()方法。
public?Collection?getCommittingParticipants()
???throws?LogReadException? {
??Collection?committingParticipants?=?new?HashSet();
??Collection?committingCoordinatorLogEntries?=?repository.findAllCommittingCoordinatorLogEntries();
??for?(CoordinatorLogEntry?coordinatorLogEntry?:?committingCoordinatorLogEntries)?{
???for?(ParticipantLogEntry?participantLogEntry?:?coordinatorLogEntry.participants)?{
????committingParticipants.add(participantLogEntry);
???}
??}
??return?committingParticipants;
?}
到這里我們來簡單介紹一下,事務(wù)日志的存儲結(jié)構(gòu)。首先是?CoordinatorLogEntry,這是一次XA事務(wù)的所有信息實(shí)體類。
public?class?CoordinatorLogEntry?implements?Serializable?{
??//全局事務(wù)id
??public?final?String?id;
???//是否已經(jīng)提交
?public?final?boolean?wasCommitted;
?/**
??*?Only?for?subtransactions,?null?otherwise.
??*/
?public?final?String?superiorCoordinatorId;
???//參與者集合
?public?final?ParticipantLogEntry[]?participants;
}
再來看一下參與者實(shí)體類? ParticipantLogEntry?:
public?class?ParticipantLogEntry?implements?Serializable?{
?private?static?final?long?serialVersionUID?=?1728296701394899871L;
?/**
??*?The?ID?of?the?global?transaction?as?known?by?the?transaction?core.
??*/
?public?final?String?coordinatorId;
?/**
??*?Identifies?the?participant?within?the?global?transaction.
??*/
?public?final?String?uri;
?/**
??*?When?does?this?participant?expire?(expressed?in?millis?since?Jan?1,?1970)?
??*/
?public?final?long?expires;
?/**
??*?Best-known?state?of?the?participant.
??*/
?public?final?TxState?state;
?/**
??*?For?diagnostic?purposes,?null?if?not?relevant.
??*/
?public?final?String?resourceName;
}
回到 com.atomikos.recovery.xa.DefaultXaRecoveryLog.getExpiredCommittingXids()?方法,可以到獲取了一次XA事務(wù)過程中,存儲的事務(wù)日志中的xid。
public?Set?getExpiredCommittingXids()?throws?LogReadException? {
??Set?ret?=?new?HashSet();
??Collection?entries?=?log.getCommittingParticipants();
??for?(ParticipantLogEntry?entry?:?entries)?{
???if?(expired(entry)?&&?!http(entry))?{
????XID?xid?=?new?XID(entry.coordinatorId,?entry.uri);
????ret.add(xid);
???}
??}
??return?ret;
?}
如果從RM中通過XA recovery取出的XID,包含在從事務(wù)日志中取出的XID,則進(jìn)行commit,否則進(jìn)行rollback.
List?xidsToRecover?=?retrievePreparedXidsFromXaResource(xaResource);
??Collection?xidsToCommit;
??try?{
???xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();
???for?(XID?xid?:?xidsToRecover)?{
????if?(xidsToCommit.contains(xid))?{
?????replayCommit(xid,?xaResource);
????}?else?{
?????attemptPresumedAbort(xid,?xaResource);
????}
???}
??}?catch?(LogException?couldNotRetrieveCommittingXids)?{
???LOGGER.logWarning("Transient?error?while?recovering?-?will?retry?later...",?couldNotRetrieveCommittingXids);
??}
replayCommit 方法如下:
private?void?replayCommit(XID?xid,?XAResource?xaResource)?{
??if?(LOGGER.isDebugEnabled())?LOGGER.logDebug("Replaying?commit?of?xid:?"?+?xid);
??try?{
??????//進(jìn)行事務(wù)提交
???xaResource.commit(xid,?false);
?????//更新事務(wù)日志
???log.terminated(xid);
??}?catch?(XAException?e)?{
???if?(alreadyHeuristicallyTerminatedByResource(e))?{
????handleHeuristicTerminationByResource(xid,?xaResource,?e,?true);
???}?else?if?(xidTerminatedInResourceByConcurrentCommit(e))?{
????log.terminated(xid);
???}?else?{
????LOGGER.logWarning("Transient?error?while?replaying?commit?-?will?retry?later...",?e);
???}
??}
?}
attemptPresumedAbort(xid, xaResource); 方法如下:
private?void?attemptPresumedAbort(XID?xid,?XAResource?xaResource)?{
??try?{
???log.presumedAborting(xid);
???if?(LOGGER.isDebugEnabled())?LOGGER.logDebug("Presumed?abort?of?xid:?"?+?xid);
???try?{
?????????//進(jìn)行回滾
????xaResource.rollback(xid);
????????//更新日志狀態(tài)
????log.terminated(xid);
???}?catch?(XAException?e)?{
????if?(alreadyHeuristicallyTerminatedByResource(e))?{
?????handleHeuristicTerminationByResource(xid,?xaResource,?e,?false);
????}?else?if?(xidTerminatedInResourceByConcurrentRollback(e))?{
?????log.terminated(xid);
????}?else?{
?????LOGGER.logWarning("Unexpected?exception?during?recovery?-?ignoring?to?retry?later...",?e);
????}
???}
??}?catch?(IllegalStateException?presumedAbortNotAllowedInCurrentLogState)?{
???//?ignore?to?retry?later?if?necessary
??}?catch?(LogException?logWriteException)?{
???LOGGER.logWarning("log?write?failed?for?Xid:?"+xid+",?ignoring?to?retry?later",?logWriteException);
??}
?}
總結(jié)
文章到此,已經(jīng)寫的很長很多了,我們分析了ShardingSphere對于XA方案,提供了一套SPI解決方案,對Atomikos進(jìn)行了整合,也分析了Atomikos初始化流程,開始事務(wù)流程,獲取連接流程,提交事務(wù)流程,回滾事務(wù)流程,事務(wù)恢復(fù)流程。希望對大家理解XA的原理有所幫助。
DD自研的滬牌代拍業(yè)務(wù),點(diǎn)擊直達(dá)
【往期推薦】
2020-11-27
2020-11-27
2020-11-26
2020-11-25
2020-11-24
掃一掃,關(guān)注我
一起學(xué)習(xí),一起進(jìn)步
每周贈書,福利不斷
﹀
﹀
﹀
深度內(nèi)容
推薦加入




