<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          萬字長文詳解Shardingsphere對XA分布式事務(wù)的支持

          共 25790字,需瀏覽 52分鐘

           ·

          2020-12-01 12:42

          點(diǎn)擊上方藍(lán)色“程序猿DD”,選擇“設(shè)為星標(biāo)”

          回復(fù)“資源”獲取獨(dú)家整理的學(xué)習(xí)資料!

          萬字長文詳解Shardingsphere對XA分布式事務(wù)的支持

          Apache ShardingSphere 是一套開源的分布式數(shù)據(jù)庫中間件解決方案組成的生態(tài)圈,它由 JDBC、Proxy 和 Sidecar(規(guī)劃中)這 3 款相互獨(dú)立,卻又能夠混合部署配合使用的產(chǎn)品組成。它們均提供標(biāo)準(zhǔn)化的數(shù)據(jù)分片、分布式事務(wù)和數(shù)據(jù)庫治理功能,可適用于如 Java 同構(gòu)、異構(gòu)語言、云原生等各種多樣化的應(yīng)用場景。

          ShardingSphere 已于2020年4月16日成為 Apache 軟件基金會的頂級項(xiàng)目。

          分布式系統(tǒng)CAP理論

          一致性(Consistency)
          • 一致性指?all nodes see the same data at the same time,即更新操作成功并返回客戶端完成后,所有節(jié)點(diǎn)在同一時(shí)間的數(shù)據(jù)完全一致,不能存在中間狀態(tài)。

          • 關(guān)于一致性,如果用戶時(shí)刻看到的數(shù)據(jù)都是一致的,那么稱之為強(qiáng)一致性。如果允許存在中間狀態(tài),只要求經(jīng)過一段時(shí)間后,數(shù)據(jù)最終是一致的,則稱之為最終一致性。此外,如果允許存在部分?jǐn)?shù)據(jù)不一致,那么就稱之為弱一致性

          可用性(Availability)
          • 可用性是指系統(tǒng)提供的服務(wù)必須一直處于可用的狀態(tài),對于用戶的每一個(gè)操作請求總是能夠在有限的時(shí)間內(nèi)返回結(jié)果。有限的時(shí)間內(nèi)是指:對于用戶的一個(gè)操作請求,系統(tǒng)必須能夠在指定的時(shí)間內(nèi)返回對應(yīng)的處理結(jié)果,如果超過了這個(gè)時(shí)間范圍,那么系統(tǒng)就被認(rèn)為是不可用的。

          • 返回結(jié)果是可用性的另一個(gè)非常重要的指標(biāo),它要求系統(tǒng)在完成對用戶請求的處理后,返回一個(gè)正常的響應(yīng)結(jié)果,不論這個(gè)結(jié)果是成功還是失敗。

          分區(qū)容錯(cuò)性(Partition tolerance )
          • 布式系統(tǒng)在遇到任何網(wǎng)絡(luò)分區(qū)故障的時(shí)候,仍然需要能夠保證對外提供滿足一致性和可用性的服務(wù),除非是整個(gè)網(wǎng)絡(luò)環(huán)境都發(fā)生了故障。

          X/Open DTP模型與XA規(guī)范

          X/Open,即現(xiàn)在的open group,是一個(gè)獨(dú)立的組織,主要負(fù)責(zé)制定各種行業(yè)技術(shù)標(biāo)準(zhǔn)。官網(wǎng)地址:http://www.opengroup.org/。X/Open組織主要由各大知名公司或者廠商進(jìn)行支持,這些組織不光遵循X/Open組織定義的行業(yè)技術(shù)標(biāo)準(zhǔn),也參與到標(biāo)準(zhǔn)的制定。下圖展示了open group目前主要成員(官網(wǎng)截圖):

          DTP模型

          • 應(yīng)用程序(Application Program ,簡稱AP):用于定義事務(wù)邊界(即定義事務(wù)的開始和結(jié)束),并且在事務(wù)邊界內(nèi)對資源進(jìn)行操作。

          • 資源管理器(Resource Manager,簡稱RM,一般也稱為事務(wù)參與者):如數(shù)據(jù)庫、文件系統(tǒng)等,并提供訪問資源的方式。

          • 事務(wù)管理器(Transaction Manager ,簡稱TM,一般也稱為事務(wù)協(xié)調(diào)者):負(fù)責(zé)分配事務(wù)唯一標(biāo)識,監(jiān)控事務(wù)的執(zhí)行進(jìn)度,并負(fù)責(zé)事務(wù)的提交、回滾等。

          XA規(guī)范

          這里的接口規(guī)范特別多,我們只要來講講幾個(gè)最重要的。

          • xa_start?: 在?RM端調(diào)用此接口開啟一個(gè)XA事務(wù),后面需要接上XID?作為參數(shù)。
          • xa_end?: 取消當(dāng)前線程與事務(wù)的關(guān)聯(lián), 與?xa_start是配對使用。
          • xa_prepare?: 詢問RM?是否已經(jīng)準(zhǔn)備好了提交事務(wù)。
          • xa_commit?: 通知RM?提交事務(wù)分支。
          • xa_rollback?: 通知RM?提交回滾事務(wù)分支。
          XA二階段提交
          • 階段一?:TM通知各個(gè)RM準(zhǔn)備提交它們的事務(wù)分支。如果RM判斷自己進(jìn)行的工作可以被提交,那就就對工作內(nèi)容進(jìn)行持久化,再給TM肯定答復(fù);要是發(fā)生了其他情況,那給TM的都是否定答復(fù)。在發(fā)送了否定答復(fù)并回滾了已經(jīng)的工作后,RM就可以丟棄這個(gè)事務(wù)分支信息。

          • 階段二?:TM根據(jù)階段1各個(gè)RM prepare的結(jié)果,決定是提交還是回滾事務(wù)。如果所有的RM都prepare成功,那么TM通知所有的RM進(jìn)行提交;如果有RM prepare失敗的話,則TM通知所有RM回滾自己的事務(wù)分支。

          MySQL對XA協(xié)議的支持

          MySQL?從5.0.3開始支持XA分布式事務(wù),且只有InnoDB存儲引擎支持XA事務(wù)。?MySQL?在DTP模型中也是屬于資源管理器RM。

          MySQL XA 事務(wù)的 SQL語法
          XA?START?xid????//開啟XA事務(wù),xid是一個(gè)唯一值,表示事務(wù)分支標(biāo)識符
          XA?END?xid??//結(jié)束一個(gè)XA事務(wù),
          XA?PREPARE?xid?準(zhǔn)備提交
          XA?COMMIT?xid [ONE PHASE]?//提交事務(wù)。兩階段提交協(xié)議中,如果只有一個(gè)RM參與,那么可以優(yōu)化為一階段提交
          XA?ROLLBACK?xid??//回滾
          XA?RECOVER?[CONVERT?XID]??//列出所有處于PREPARE階段的XA事務(wù)
          MySQL xid詳解

          mysql中使用xid來作為一個(gè)事務(wù)分支的標(biāo)識符。通過C語言進(jìn)行描述,如下:

          /?
          ??Transaction?branch?identification:?XID?and?NULLXID:
          ?/
          #define?XIDDATASIZE?128??/??size?in?bytes??/
          #define?MAXGTRIDSIZE?64??/??maximum?size?in?bytes?of?gtrid??/
          #define?MAXBQUALSIZE?64??/??maximum?size?in?bytes?of?bqual??/
          struct?xid_t?{
          ????long?formatID;?????/*?format?identifier?*/
          ????long?gtrid_length;?/*?value?1-64?*/
          ????long?bqual_length;?/*?value?1-64?*/
          ????char?data[XIDDATASIZE];
          ????};
          /?
          ??A?value?of?-1?in?formatID?means?that?the?XID?is?null.
          ?/
          typedef?struct?xid_t?XID;
          /?
          ??Declarations?of?routines?by?which?RMs?call?TMs:
          ?/
          extern?int?ax_reg(int,?XID??,?long);
          extern?int?ax_unreg(int,?long);
          • gtrid?:全局事務(wù)標(biāo)識符(global transaction identifier),最大不能超過64字節(jié)。
          • bqual?:分支限定符(branch qualifier),最大不能超過64字節(jié)。
          • formatId:記錄gtrid、bqual的格式,類似于memcached中flags字段的作用。
          • data?:xid的值,其是 gtrid和bqual拼接后的內(nèi)容。。
          MySQL XA事務(wù)狀態(tài)

          JTA規(guī)范

          JTA(Java Transaction API):為J2EE平臺提供了分布式事務(wù)服務(wù)(distributed transaction)的能力。某種程度上,可以認(rèn)為JTA規(guī)范是XA規(guī)范的Java版,其把XA規(guī)范中規(guī)定的DTP模型交互接口抽象成Java接口中的方法,并規(guī)定每個(gè)方法要實(shí)現(xiàn)什么樣的功能。

          JTA 定義的接口
          • javax.transaction.TransactionManager?: 事務(wù)管理器,負(fù)責(zé)事務(wù)的begin,?commit,rollback?等命令。

          • javax.transaction.UserTransaction:用于聲明一個(gè)分布式事務(wù)。

          • javax.transaction.TransactionSynchronizationRegistry:事務(wù)同步注冊

          • javax.transaction.xa.XAResource:定義RM提供給TM操作的接口

          • javax.transaction.xa.Xid:事務(wù)xid接口。

          TM provider:
          • 實(shí)現(xiàn)UserTransaction、TransactionManager、Transaction、TransactionSynchronizationRegistry、Synchronization、Xid接口,通過與XAResource接口交互來實(shí)現(xiàn)分布式事務(wù)。
          RM provider:
          • XAResource接口需要由資源管理器者來實(shí)現(xiàn),XAResource接口中定義了一些方法,這些方法將會被TM進(jìn)行調(diào)用,如:

            • start方法:開啟事務(wù)分支

            • end方法:結(jié)束事務(wù)分支

            • prepare方法:準(zhǔn)備提交

            • commit方法:提交

            • rollback方法:回滾

            • recover方法:列出所有處于PREPARED狀態(tài)的事務(wù)分支

          ShardingSphere對XA分布式事務(wù)的支持

          ShardingSphere針對XA分布式事務(wù)的接口以及JTA規(guī)范,提供了標(biāo)準(zhǔn)的,基于SPI實(shí)現(xiàn)的org.apache.shardingsphere.transaction.spi.ShardingTransactionManager

          public?interface?ShardingTransactionManager?extends?AutoCloseable?{

          ????/**
          ?????*?Initialize?sharding?transaction?manager.
          ?????*
          ?????*?@param?databaseType?database?type
          ?????*?@param?resourceDataSources?resource?data?sources
          ?????*/

          ????void?init(DatabaseType?databaseType,?Collection?resourceDataSources);

          ????/**
          ?????*?Get?transaction?type.
          ?????*
          ?????*?@return?transaction?type
          ?????*/

          ????TransactionType?getTransactionType();

          ????/**
          ?????*?Judge?is?in?transaction?or?not.
          ?????*
          ?????*?@return?in?transaction?or?not
          ?????*/

          ????boolean?isInTransaction();

          ????/**
          ?????*?Get?transactional?connection.
          ?????*
          ?????*?@param?dataSourceName?data?source?name
          ?????*?@return?connection
          ?????*?@throws?SQLException?SQL?exception
          ?????*/

          ????Connection?getConnection(String?dataSourceName)?throws?SQLException;

          ????/**
          ?????*?Begin?transaction.
          ?????*/

          ????void?begin();

          ????/**
          ?????*?Commit?transaction.
          ?????*/

          ????void?commit();

          ????/**
          ?????*?Rollback?transaction.
          ?????*/

          ????void?rollback();
          }

          對于XA分布式事務(wù)的支持的具體實(shí)現(xiàn)類為 :org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager。在此類中,會調(diào)用基于SPI實(shí)現(xiàn)的org.apache.shardingsphere.transaction.xa.spi.XATransactionManager,來進(jìn)行XA事務(wù)的管理操作。

          總結(jié)

          我們了解了分布式事務(wù)的CAP理論,了解了X/Open的DTP模型,以及XA的接口規(guī)范,MySQL對XA協(xié)議的支持。最好我們講解了JTA的規(guī)范,以及ShardingSphere對XA事務(wù)進(jìn)行整合的時(shí)候定義的SPI接口,這些都是很重要的理論基礎(chǔ),接下來,我們將詳細(xì)來講解基于AtomkikosXATransactionManager的具體實(shí)現(xiàn),以及源碼解析。

          Shardingsphere整合Atomikos對XA分布式事務(wù)的源碼解析

          Atomikos(https://www.atomikos.com/),其實(shí)是一家公司的名字,提供了基于JTA規(guī)范的XA分布式事務(wù)TM的實(shí)現(xiàn)。其旗下最著名的產(chǎn)品就是事務(wù)管理器。產(chǎn)品分兩個(gè)版本:

          • TransactionEssentials:開源的免費(fèi)產(chǎn)品;

          • ExtremeTransactions:上商業(yè)版,需要收費(fèi)。

          這兩個(gè)產(chǎn)品的關(guān)系如下圖所示:

          ExtremeTransactions在TransactionEssentials的基礎(chǔ)上額外提供了以下功能(重要的):

          • 支持TCC:這是一種柔性事務(wù)

          • 支持通過RMI、IIOP、SOAP這些遠(yuǎn)程過程調(diào)用技術(shù),進(jìn)行事務(wù)傳播。

          • 事務(wù)日志云存儲,云端對事務(wù)進(jìn)行恢復(fù),并且提供了完善的管理后臺。

          org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager詳解

          我們簡單的來回顧下org.apache.shardingsphere.transaction.spi.ShardingTransactionManager

          public?interface?ShardingTransactionManager?extends?AutoCloseable?{

          ????/**
          ?????*?Initialize?sharding?transaction?manager.
          ?????*
          ?????*?@param?databaseType?database?type
          ?????*?@param?resourceDataSources?resource?data?sources
          ?????*/

          ????void?init(DatabaseType?databaseType,?Collection?resourceDataSources);

          ????/**
          ?????*?Get?transaction?type.
          ?????*
          ?????*?@return?transaction?type
          ?????*/

          ????TransactionType?getTransactionType();

          ????/**
          ?????*?Judge?is?in?transaction?or?not.
          ?????*
          ?????*?@return?in?transaction?or?not
          ?????*/

          ????boolean?isInTransaction();

          ????/**
          ?????*?Get?transactional?connection.
          ?????*
          ?????*?@param?dataSourceName?data?source?name
          ?????*?@return?connection
          ?????*?@throws?SQLException?SQL?exception
          ?????*/

          ????Connection?getConnection(String?dataSourceName)?throws?SQLException;

          ????/**
          ?????*?Begin?transaction.
          ?????*/

          ????void?begin();

          ????/**
          ?????*?Commit?transaction.
          ?????*/

          ????void?commit();

          ????/**
          ?????*?Rollback?transaction.
          ?????*/

          ????void?rollback();
          }

          我們重點(diǎn)縣關(guān)注init方法,從它的命名,你就應(yīng)該能夠看出來,這是整個(gè)框架的初始化方法,讓我們來看看它是如何進(jìn)行初始化的。


          ?private?final?Map?cachedDataSources?=?new?HashMap<>();

          ?private?final?XATransactionManager?xaTransactionManager?=?XATransactionManagerLoader.getInstance().getTransactionManager();

          ????@Override
          ????public?void?init(final?DatabaseType?databaseType,?final?Collection?resourceDataSources)?{
          ????????for?(ResourceDataSource?each?:?resourceDataSources)?{
          ????????????cachedDataSources.put(each.getOriginalName(),?new?XATransactionDataSource(databaseType,?each.getUniqueResourceName(),?each.getDataSource(),?xaTransactionManager));
          ????????}
          ????????xaTransactionManager.init();
          ????}
          • 首先SPI的方式加載XATransactionManager的具體實(shí)現(xiàn)類,這里返回的就是org.apache.shardingsphere.transaction.xa.atomikos.manager.AtomikosTransactionManager

          • 我們在關(guān)注下?new XATransactionDataSource()?, 進(jìn)入?org.apache.shardingsphere.transaction.xa.jta.datasource。XATransactionDataSource類的構(gòu)造方法。

          public?XATransactionDataSource(final?DatabaseType?databaseType,?final?String?resourceName,?final?DataSource?dataSource,?final?XATransactionManager?xaTransactionManager)?{
          ????????this.databaseType?=?databaseType;
          ????????this.resourceName?=?resourceName;
          ????????this.dataSource?=?dataSource;
          ????????if?(!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName()))?{
          ????????????//?重點(diǎn)關(guān)注?1?,返回了xaDatasource
          ????????????xaDataSource?=?XADataSourceFactory.build(databaseType,?dataSource);
          ????????????this.xaTransactionManager?=?xaTransactionManager;
          ????????????//?重點(diǎn)關(guān)注2?注冊資源
          ????????????xaTransactionManager.registerRecoveryResource(resourceName,?xaDataSource);
          ????????}
          ????}
          • 我們重點(diǎn)來關(guān)注?XADataSourceFactory.build(databaseType, dataSource),從名字我們就可以看出,這應(yīng)該是返回JTA規(guī)范里面的XADataSource,在ShardingSphere里面很多的功能,可以從代碼風(fēng)格的命名上就能猜出來,這就是優(yōu)雅代碼(吹一波)。不多逼逼,我們進(jìn)入該方法。
          public?final?class?XADataSourceFactory?{

          ????public?static?XADataSource?build(final?DatabaseType?databaseType,?final?DataSource?dataSource)?{
          ????????return?new?DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource);
          ????}
          }
          • 首先又是一個(gè)SPI定義的?XADataSourceDefinitionFactory,它根據(jù)不同的數(shù)據(jù)庫類型,來加載不同的方言。然后我們進(jìn)入?swap方法。
          ?public?XADataSource?swap(final?DataSource?dataSource)?{
          ????????XADataSource?result?=?createXADataSource();
          ????????setProperties(result,?getDatabaseAccessConfiguration(dataSource));
          ????????return?result;
          ????}
          • 很簡明,第一步創(chuàng)建,XADataSource,第二步給它設(shè)置屬性(包含數(shù)據(jù)的連接,用戶名密碼等),然后返回。

          • 返回?XATransactionDataSource?類,關(guān)注xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource);?從名字可以看出,這是注冊事務(wù)恢復(fù)資源。這個(gè)我們在事務(wù)恢復(fù)的時(shí)候詳解。

          • 返回?XAShardingTransactionManager.init()?,我們重點(diǎn)來關(guān)注:?xaTransactionManager.init();,最后進(jìn)入AtomikosTransactionManager.init()。流程圖如下:

          代碼:

          public?final?class?AtomikosTransactionManager?implements?XATransactionManager?{

          ????private?final?UserTransactionManager?transactionManager?=?new?UserTransactionManager();

          ????private?final?UserTransactionService?userTransactionService?=?new?UserTransactionServiceImp();

          ????@Override
          ????public?void?init()?{
          ????????userTransactionService.init();
          ????}

          }
          • 進(jìn)入UserTransactionServiceImp.init()
          private?void?initialize()?{
          ???????//添加恢復(fù)資源?不用關(guān)心
          ??for?(RecoverableResource?resource?:?resources_)?{
          ???Configuration.addResource?(?resource?);
          ??}
          ??for?(LogAdministrator?logAdministrator?:?logAdministrators_)?{
          ???Configuration.addLogAdministrator?(?logAdministrator?);
          ??}
          ?????????//注冊插件?不用關(guān)心
          ????????for?(TransactionServicePlugin?nxt?:?tsListeners_)?{
          ?????????Configuration.registerTransactionServicePlugin?(?nxt?);
          ??}
          ????????//獲取配置屬性?重點(diǎn)關(guān)心
          ????????ConfigProperties?configProps?=?Configuration.getConfigProperties();
          ????????configProps.applyUserSpecificProperties(properties_);
          ????????//進(jìn)行初始化
          ????????Configuration.init();
          ?}
          • 我們重點(diǎn)關(guān)注,獲取配置屬性。最后進(jìn)入com.atomikos.icatch.provider.imp.AssemblerImp.initializeProperties()方法。
          ?@Override
          ?public?ConfigProperties?initializeProperties()?{
          ???//讀取classpath下的默認(rèn)配置transactions-defaults.properties
          ????????Properties?defaults?=?new?Properties();
          ????????loadPropertiesFromClasspath(defaults,?DEFAULT_PROPERTIES_FILE_NAME);
          ????????//讀取classpath下,transactions.properties配置,覆蓋transactions-defaults.properties中相同key的值
          ????????Properties?transactionsProperties?=?new?Properties(defaults);
          ????????loadPropertiesFromClasspath(transactionsProperties,?TRANSACTIONS_PROPERTIES_FILE_NAME);
          ????????//讀取classpath下,jta.properties,覆蓋transactions-defaults.properties、transactions.properties中相同key的值
          ????????Properties?jtaProperties?=?new?Properties(transactionsProperties);
          ????????loadPropertiesFromClasspath(jtaProperties,?JTA_PROPERTIES_FILE_NAME);

          ????????//讀取通過java?-Dcom.atomikos.icatch.file方式指定的自定義配置文件路徑,覆蓋之前的同名配置
          ????????Properties?customProperties?=?new?Properties(jtaProperties);
          ????????loadPropertiesFromCustomFilePath(customProperties);
          ????????//最終構(gòu)造一個(gè)ConfigProperties對象,來表示實(shí)際要使用的配置
          ????????Properties?finalProperties?=?new?Properties(customProperties);
          ????????return?new?ConfigProperties(finalProperties);
          ?}
          • 接下來重點(diǎn)關(guān)注,?Configuration.init(), 進(jìn)行初始化。
          ublic?static?synchronized?boolean?init()?{
          ??boolean?startupInitiated?=?false;
          ??if?(service_?==?null)?{
          ???startupInitiated?=?true;
          ???????????//SPI方式加載插件注冊,無需過多關(guān)心
          ???addAllTransactionServicePluginServicesFromClasspath();
          ???ConfigProperties?configProperties?=?getConfigProperties();
          ??????????//調(diào)用插件的beforeInit方法進(jìn)行初始化話,無需過多關(guān)心
          ???notifyBeforeInit(configProperties);
          ??????????//進(jìn)行事務(wù)日志恢復(fù)的初始化,很重要,接下來詳解
          ???assembleSystemComponents(configProperties);
          ?????????//進(jìn)入系統(tǒng)注解的初始化,一般重要
          ???initializeSystemComponents(configProperties);
          ???notifyAfterInit();
          ???if?(configProperties.getForceShutdownOnVmExit())?{
          ????addShutdownHook(new?ForceShutdownHook());
          ???}
          ??}
          ??return?startupInitiated;
          ?}
          • 我們先來關(guān)注?assembleSystemComponents(configProperties);?進(jìn)入它,進(jìn)入com.atomikos.icatch.provider.imp.AssemblerImp.assembleTransactionService()方法:
          @Override
          ?public?TransactionServiceProvider?assembleTransactionService(
          ???ConfigProperties?configProperties)
          ?
          {
          ??RecoveryLog?recoveryLog?=null;
          ???????//打印日志
          ??logProperties(configProperties.getCompletedProperties());
          ???????//生成唯一名字
          ??String?tmUniqueName?=?configProperties.getTmUniqueName();

          ??long?maxTimeout?=?configProperties.getMaxTimeout();
          ??int?maxActives?=?configProperties.getMaxActives();
          ??boolean?threaded2pc?=?configProperties.getThreaded2pc();
          ??????//SPI方式加載OltpLog?,這是最重要的擴(kuò)展地方,如果用戶沒有SPI的方式去擴(kuò)展那么就為null
          ??OltpLog?oltpLog?=?createOltpLogFromClasspath();
          ??if?(oltpLog?==?null)?{
          ???LOGGER.logInfo("Using?default?(local)?logging?and?recovery...");
          ????????????????????????//創(chuàng)建事務(wù)日志存儲資源
          ???Repository?repository?=?createRepository(configProperties);
          ???oltpLog?=?createOltpLog(repository);
          ???//????Assemble?recoveryLog
          ???recoveryLog?=?createRecoveryLog(repository);
          ??}
          ??StateRecoveryManagerImp?recoveryManager?=?new?StateRecoveryManagerImp();
          ??recoveryManager.setOltpLog(oltpLog);
          ???????????//生成唯一id生成器,以后生成XID會用的到
          ??UniqueIdMgr?idMgr?=?new?UniqueIdMgr?(?tmUniqueName?);
          ??int?overflow?=?idMgr.getMaxIdLengthInBytes()?-?MAX_TID_LENGTH;
          ??if?(?overflow?>?0?)?{
          ???//?see?case?73086
          ???String?msg?=?"Value?too?long?:?"?+?tmUniqueName;
          ???LOGGER.logFatal?(?msg?);
          ???throw?new?SysException(msg);
          ??}
          ??return?new?TransactionServiceImp(tmUniqueName,?recoveryManager,?idMgr,?maxTimeout,?maxActives,?!threaded2pc,?recoveryLog);
          ?}
          • 我們重點(diǎn)來分析createOltpLogFromClasspath(), 采用SPI的加載方式來獲取,默認(rèn)這里會返回?null, 什么意思呢?就是當(dāng)沒有擴(kuò)展的時(shí)候,atomikos,會創(chuàng)建框架自定義的資源,來存儲事務(wù)日志。
          private?OltpLog?createOltpLogFromClasspath()?{
          ??OltpLog?ret?=?null;
          ??ServiceLoader?loader?=?ServiceLoader.load(OltpLogFactory.class,Configuration.class.getClassLoader());
          ??int?i?=?0;
          ????????for?(OltpLogFactory?l?:?loader?)?{
          ???ret?=?l.createOltpLog();
          ???i++;
          ??}
          ????????if?(i?>?1)?{
          ???String?msg?=?"More?than?one?OltpLogFactory?found?in?classpath?-?error?in?configuration!";
          ???LOGGER.logFatal(msg);
          ???throw?new?SysException(msg);
          ????????}
          ????????return?ret;
          ?}
          • 我們跟著進(jìn)入?Repository repository = createRepository(configProperties);
          ?private?CachedRepository?createCoordinatorLogEntryRepository(
          ???ConfigProperties?configProperties)
          ?throws?LogException?
          {
          ????????//創(chuàng)建內(nèi)存資源存儲
          ??InMemoryRepository?inMemoryCoordinatorLogEntryRepository?=?new?InMemoryRepository();
          ???????//進(jìn)行初始化
          ??inMemoryCoordinatorLogEntryRepository.init();
          ???????//創(chuàng)建使用文件存儲資源作為backup
          ??FileSystemRepository?backupCoordinatorLogEntryRepository?=?new?FileSystemRepository();
          ???????//進(jìn)行初始化
          ??backupCoordinatorLogEntryRepository.init();
          ??????//內(nèi)存與file資源進(jìn)行合并
          ??CachedRepository?repository?=?new?CachedRepository(inMemoryCoordinatorLogEntryRepository,?backupCoordinatorLogEntryRepository);
          ??repository.init();
          ??return?repository;
          ?}
          • 這里就會創(chuàng)建出?CachedRepository,里面包含了?InMemoryRepository?與?FileSystemRepository

          • 回到主線?com.atomikos.icatch.config.Configuration.init(), 最后來分析下notifyAfterInit();

          ?private?static?void?notifyAfterInit()?{
          ?????????//進(jìn)行插件的初始化
          ??for?(TransactionServicePlugin?p?:?tsListenersList_)?{
          ???p.afterInit();
          ??}
          ??for?(LogAdministrator?a?:?logAdministrators_)?{
          ???a.registerLogControl(service_.getLogControl());
          ??}
          ?????????//設(shè)置事務(wù)恢復(fù)服務(wù),進(jìn)行事務(wù)的恢復(fù)
          ??for?(RecoverableResource?r?:?resourceList_?)?{
          ???r.setRecoveryService(recoveryService_);
          ??}

          ?}
          • 插件的初始化會進(jìn)入com.atomikos.icatch.jta.JtaTransactionServicePlugin.afterInit()
          ?public?void?afterInit()?{
          ??TransactionManagerImp.installTransactionManager(Configuration.getCompositeTransactionManager(),?autoRegisterResources);
          ??????????//如果我們自定義擴(kuò)展了?OltpLog?,這里就會返回null,如果是null,那么XaResourceRecoveryManager就是null
          ??RecoveryLog?recoveryLog?=?Configuration.getRecoveryLog();
          ??long?maxTimeout?=?Configuration.getConfigProperties().getMaxTimeout();
          ??if?(recoveryLog?!=?null)?{
          ???XaResourceRecoveryManager.installXaResourceRecoveryManager(new?DefaultXaRecoveryLog(recoveryLog,?maxTimeout),Configuration.getConfigProperties().getTmUniqueName());
          ??}

          ?}
          • 重點(diǎn)注意?RecoveryLog recoveryLog = Configuration.getRecoveryLog();?,如果用戶采用SPI的方式,擴(kuò)展了com.atomikos.recovery.OltpLog?,這里就會返回 null。如果是null,則不會對?XaResourceRecoveryManager?進(jìn)行初始化。

          • 回到?notifyAfterInit(), 我們來分析?setRecoveryService。

          public?void?setRecoveryService?(?RecoveryService?recoveryService?)
          ????????????throws?ResourceException
          ????
          {

          ????????if?(?recoveryService?!=?null?)?{
          ????????????if?(?LOGGER.isTraceEnabled()?)?LOGGER.logTrace?(?"Installing?recovery?service?on?resource?"
          ????????????????????+?getName?()?);
          ????????????this.branchIdentifier=recoveryService.getName();
          ????????????recover();
          ????????}
          ????}
          • 我們進(jìn)入?recover()?方法:
          ?public?void?recover()?{
          ?????XaResourceRecoveryManager?xaResourceRecoveryManager?=?XaResourceRecoveryManager.getInstance();
          ????????//null?for?LogCloud?recovery
          ?????if?(xaResourceRecoveryManager?!=?null)?{
          ??????try?{
          ????xaResourceRecoveryManager.recover(getXAResource());
          ???}?catch?(Exception?e)?{
          ????refreshXAResource();?//cf?case?156968
          ???}

          ?????}
          ????}
          • 看到最關(guān)鍵的注釋了嗎,如果用戶采用SPI的方式,擴(kuò)展了com.atomikos.recovery.OltpLog,那么XaResourceRecoveryManager?為null,則就會進(jìn)行云端恢復(fù),反之則進(jìn)行事務(wù)恢復(fù)。事務(wù)恢復(fù)很復(fù)雜,我們會單獨(dú)來講。

          到這里atomikos的基本的初始化已經(jīng)完成。

          atomikos事務(wù)begin流程

          我們知道,本地的事務(wù),都會有一個(gè)?trainsaction.begin, 對應(yīng)XA分布式事務(wù)來說也不另外,我們再把思路切換回XAShardingTransactionManager.begin(), 會調(diào)用com.atomikos.icatch.jta.TransactionManagerImp.begin()。流程圖如下:

          代碼:

          ??public?void?begin?(?int?timeout?)?throws?NotSupportedException,
          ????????????SystemException
          ????
          {
          ????????CompositeTransaction?ct?=?null;
          ????????ResumePreviousTransactionSubTxAwareParticipant?resumeParticipant?=?null;

          ????????ct?=?compositeTransactionManager.getCompositeTransaction();
          ????????if?(?ct?!=?null?&&?ct.getProperty?(??JTA_PROPERTY_NAME?)?==?null?)?{
          ????????????LOGGER.logWarning?(?"JTA:?temporarily?suspending?incompatible?transaction:?"?+?ct.getTid()?+
          ????????????????????"?(will?be?resumed?after?JTA?transaction?ends)"?);
          ????????????ct?=?compositeTransactionManager.suspend();
          ????????????resumeParticipant?=?new?ResumePreviousTransactionSubTxAwareParticipant?(?ct?);
          ????????}

          ????????try?{
          ??????//創(chuàng)建事務(wù)補(bǔ)償點(diǎn)
          ????????????ct?=?compositeTransactionManager.createCompositeTransaction?(?(?(?long?)?timeout?)?*?1000?);
          ????????????if?(?resumeParticipant?!=?null?)?ct.addSubTxAwareParticipant?(?resumeParticipant?);
          ????????????if?(?ct.isRoot?()?&&?getDefaultSerial?()?)
          ????????????????ct.setSerial?();
          ????????????ct.setProperty?(?JTA_PROPERTY_NAME?,?"true"?);
          ????????}?catch?(?SysException?se?)?{
          ?????????String?msg?=?"Error?in?begin()";
          ?????????LOGGER.logError(?msg?,?se?);
          ????????????throw?new?ExtendedSystemException?(?msg?,?se?);
          ????????}
          ????????recreateCompositeTransactionAsJtaTransaction(ct);
          ????}
          • 這里我們主要關(guān)注?compositeTransactionManager.createCompositeTransaction(),
          public?CompositeTransaction?createCompositeTransaction?(?long?timeout?)?throws?SysException
          ????
          {
          ????????CompositeTransaction?ct?=?null?,?ret?=?null;

          ????????ct?=?getCurrentTx?();
          ????????if?(?ct?==?null?)?{
          ????????????ret?=?getTransactionService().createCompositeTransaction?(?timeout?);
          ????????????if(LOGGER.isDebugEnabled()){
          ?????????????LOGGER.logDebug("createCompositeTransaction?(?"?+?timeout?+?"?):?"
          ????????????????????+?"created?new?ROOT?transaction?with?id?"?+?ret.getTid?());
          ????????????}
          ????????}?else?{
          ??????????if(LOGGER.isDebugEnabled())?LOGGER.logDebug("createCompositeTransaction?(?"?+?timeout?+?"?)");
          ????????????ret?=?ct.createSubTransaction?();

          ????????}

          ????????Thread?thread?=?Thread.currentThread?();
          ????????setThreadMappings?(?ret,?thread?);

          ????????return?ret;
          ????}
          • 創(chuàng)建了事務(wù)補(bǔ)償點(diǎn),然后把他放到了用當(dāng)前線程作為key的Map當(dāng)中,這里思考,為啥它不用 threadLocal。

          到這里atomikos的事務(wù)begin流程已經(jīng)完成。大家可能有些疑惑,begin好像什么都沒有做,XA start 也沒調(diào)用?別慌,下一節(jié)繼續(xù)來講。

          XATransactionDataSource getConnection() 流程

          我們都知道想要執(zhí)行SQL語句,必須要獲取到數(shù)據(jù)庫的connection。讓我們再回到?XAShardingTransactionManager.getConnection()?最后會調(diào)用到org.apache.shardingsphere.transaction.xa.jta.datasourceXATransactionDataSource.getConnection()。流程圖如下:

          代碼 :

          ?public?Connection?getConnection()?throws?SQLException,?SystemException,?RollbackException?{
          ??????//先檢查是否已經(jīng)有存在的connection,這一步很關(guān)心,也是XA的關(guān)鍵,因?yàn)閄A事務(wù),必須在同一個(gè)connection
          ????????if?(CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName()))?{
          ????????????return?dataSource.getConnection();
          ????????}
          ??????//獲取數(shù)據(jù)庫連接
          ????????Connection?result?=?dataSource.getConnection();
          ??????//轉(zhuǎn)成XAConnection,其實(shí)是同一個(gè)連接
          ????????XAConnection?xaConnection?=?XAConnectionFactory.createXAConnection(databaseType,?xaDataSource,?result);
          ??????//獲取JTA事務(wù)定義接口
          ????????Transaction?transaction?=?xaTransactionManager.getTransactionManager().getTransaction();
          ????????if?(!enlistedTransactions.get().contains(transaction))?{
          ??????//進(jìn)行資源注冊
          ????????????transaction.enlistResource(new?SingleXAResource(resourceName,?xaConnection.getXAResource()));
          ????????????transaction.registerSynchronization(new?Synchronization()?{
          ????????????????@Override
          ????????????????public?void?beforeCompletion()?{
          ????????????????????enlistedTransactions.get().remove(transaction);
          ????????????????}

          ????????????????@Override
          ????????????????public?void?afterCompletion(final?int?status)?{
          ????????????????????enlistedTransactions.get().clear();
          ????????????????}
          ????????????});
          ????????????enlistedTransactions.get().add(transaction);
          ????????}
          ????????return?result;
          ????}
          • 首先第一步很關(guān)心,尤其是對shardingsphere來說,因?yàn)樵谝粋€(gè)事務(wù)里面,會有多個(gè)SQL語句,打到相同的數(shù)據(jù)庫,所以對相同的數(shù)據(jù)庫,必須獲取同一個(gè)XAConnection,這樣才能進(jìn)行XA事務(wù)的提交與回滾。

          • 我們接下來關(guān)心?transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));, 會進(jìn)入com.atomikos.icatch.jta.TransactionImp.enlistResource(), 代碼太長,截取一部分。

          try?{
          ????restx?=?(XAResourceTransaction)?res
          ??????.getResourceTransaction(this.compositeTransaction);

          ????//?next,?we?MUST?set?the?xa?resource?again,
          ????//?because?ONLY?the?instance?we?got?as?argument
          ????//?is?available?for?use?now?!
          ????//?older?instances?(set?in?restx?from?previous?sibling)
          ????//?have?connections?that?may?be?in?reuse?already
          ????//?->old?xares?not?valid?except?for?2pc?operations

          ????restx.setXAResource(xares);
          ????restx.resume();
          ???}?catch?(ResourceException?re)?{
          ????throw?new?ExtendedSystemException(
          ??????"Unexpected?error?during?enlist",?re);
          ???}?catch?(RuntimeException?e)?{
          ????throw?e;
          ???}

          ???addXAResourceTransaction(restx,?xares);
          • 我們直接看?restx.resume();
          public?synchronized?void?resume()?throws?ResourceException?{
          ??int?flag?=?0;
          ??String?logFlag?=?"";
          ??if?(this.state.equals(TxState.LOCALLY_DONE))?{//?reused?instance
          ???flag?=?XAResource.TMJOIN;
          ???logFlag?=?"XAResource.TMJOIN";
          ??}?else?if?(!this.knownInResource)?{//?new?instance
          ???flag?=?XAResource.TMNOFLAGS;
          ???logFlag?=?"XAResource.TMNOFLAGS";
          ??}?else
          ???throw?new?IllegalStateException("Wrong?state?for?resume:?"
          ?????+?this.state);

          ??try?{
          ???if?(LOGGER.isDebugEnabled())?{
          ????LOGGER.logDebug("XAResource.start?(?"?+?this.xidToHexString
          ??????+?"?,?"?+?logFlag?+?"?)?on?resource?"
          ??????+?this.resourcename
          ??????+?"?represented?by?XAResource?instance?"
          ??????+?this.xaresource);
          ???}
          ???this.xaresource.start(this.xid,?flag);

          ??}?catch?(XAException?xaerr)?{
          ???String?msg?=?interpretErrorCode(this.resourcename,?"resume",
          ?????this.xid,?xaerr.errorCode);
          ???LOGGER.logWarning(msg,?xaerr);
          ???throw?new?ResourceException(msg,?xaerr);
          ??}
          ??setState(TxState.ACTIVE);
          ??this.knownInResource?=?true;
          ?}
          • 哦多尅,看見了嗎,各位,看見了?this.xaresource.start(this.xid, flag);?了嗎????,我們進(jìn)去,假設(shè)我們使用的Mysql數(shù)據(jù)庫:
          ?public?void?start(Xid?xid,?int?flags)?throws?XAException?{
          ????????StringBuilder?commandBuf?=?new?StringBuilder(300);
          ????????commandBuf.append("XA?START?");
          ????????appendXid(commandBuf,?xid);
          ????????switch(flags)?{
          ????????case?0:
          ????????????break;
          ????????case?2097152:
          ????????????commandBuf.append("?JOIN");
          ????????????break;
          ????????case?134217728:
          ????????????commandBuf.append("?RESUME");
          ????????????break;
          ????????default:
          ????????????throw?new?XAException(-5);
          ????????}

          ????????this.dispatchCommand(commandBuf.toString());
          ????????this.underlyingConnection.setInGlobalTx(true);
          ????}
          • 組裝XA start Xid?SQL語句,進(jìn)行執(zhí)行。

          到這里,我們總結(jié)下,在獲取數(shù)據(jù)庫連接的時(shí)候,我們執(zhí)行了XA協(xié)議接口中的?XA start xid

          atomikos事務(wù)commit流程

          好了,上面我們已經(jīng)開啟了事務(wù),現(xiàn)在我們來分析下事務(wù)commit流程,我們再把視角切換回XAShardingTransactionManager.commit(),最后我們會進(jìn)入com.atomikos.icatch.imp.CompositeTransactionImp.commit()?方法。流程圖如下:

          代碼:

          ?public?void?commit?()?throws?HeurRollbackException,?HeurMixedException,
          ????????????HeurHazardException,?SysException,?SecurityException,
          ????????????RollbackException
          ????
          {
          ???????//首先更新下事務(wù)日志的狀態(tài)
          ????????doCommit?();
          ????????setSiblingInfoForIncoming1pcRequestFromRemoteClient();

          ????????if?(?isRoot?()?)?{
          ?????????//真正的commit操作
          ??????????coordinator.terminate?(?true?);
          ????????}
          ????}
          • 我們關(guān)注coordinator.terminate ( true );
          ?protected?void?terminate?(?boolean?commit?)?throws?HeurRollbackException,
          ????????????HeurMixedException,?SysException,?java.lang.SecurityException,
          ????????????HeurCommitException,?HeurHazardException,?RollbackException,
          ????????????IllegalStateException

          ????
          {
          ?????synchronized?(?fsm_?)?{
          ??????if?(?commit?)?{
          ?????????????????????//判斷有幾個(gè)參與者,如果只有一個(gè),直接提交
          ???????if?(?participants_.size?()?<=?1?)?{
          ????????commit?(?true?);
          ???????}?else?{
          ????????????????????????????????//否則,走XA?2階段提交流程,先prepare,?再提交
          ????????int?prepareResult?=?prepare?();
          ????????//?make?sure?to?only?do?commit?if?NOT?read?only
          ????????if?(?prepareResult?!=?Participant.READ_ONLY?)
          ?????????commit?(?false?);
          ???????}
          ??????}?else?{
          ???????rollback?();
          ??????}
          ?????}
          ????}
          • 首先會判斷參與者的個(gè)數(shù),這里我們可以理解為MySQL的database數(shù)量,如果只有一個(gè),退化成一階段,直接提交。如果有多個(gè),則走標(biāo)準(zhǔn)的XA二階段提交流程。

          • 我們來看?prepare ();?流程,最后會走到com.atomikos.icatch.imp.PrepareMessage.send()?--->?com.atomikos.datasource.xa.XAResourceTransaction.prepare()

          int?ret?=?0;
          ??terminateInResource();

          ??if?(TxState.ACTIVE?==?this.state)?{
          ???//?tolerate?non-delisting?apps/servers
          ???suspend();
          ??}

          ??//?duplicate?prepares?can?happen?for?siblings?in?serial?subtxs!!!
          ??//?in?that?case,?the?second?prepare?just?returns?READONLY
          ??if?(this.state?==?TxState.IN_DOUBT)
          ???return?Participant.READ_ONLY;
          ??else?if?(!(this.state?==?TxState.LOCALLY_DONE))
          ???throw?new?SysException("Wrong?state?for?prepare:?"?+?this.state);
          ??try?{
          ???//?refresh?xaresource?for?MQSeries:?seems?to?close?XAResource?after
          ???//?suspend???
          ???testOrRefreshXAResourceFor2PC();
          ???if?(LOGGER.isTraceEnabled())?{
          ????LOGGER.logTrace("About?to?call?prepare?on?XAResource?instance:?"
          ??????+?this.xaresource);
          ???}
          ???ret?=?this.xaresource.prepare(this.xid);

          ??}?catch?(XAException?xaerr)?{
          ???String?msg?=?interpretErrorCode(this.resourcename,?"prepare",
          ?????this.xid,?xaerr.errorCode);
          ???if?(XAException.XA_RBBASE?<=?xaerr.errorCode
          ?????&&?xaerr.errorCode?<=?XAException.XA_RBEND)?{
          ????LOGGER.logWarning(msg,?xaerr);?//?see?case?84253
          ????throw?new?RollbackException(msg);
          ???}?else?{
          ????LOGGER.logError(msg,?xaerr);
          ????throw?new?SysException(msg,?xaerr);
          ???}
          ??}
          ??setState(TxState.IN_DOUBT);
          ??if?(ret?==?XAResource.XA_RDONLY)?{
          ???if?(LOGGER.isDebugEnabled())?{
          ????LOGGER.logDebug("XAResource.prepare?(?"?+?this.xidToHexString
          ??????+?"?)?returning?XAResource.XA_RDONLY?"?+?"on?resource?"
          ??????+?this.resourcename
          ??????+?"?represented?by?XAResource?instance?"
          ??????+?this.xaresource);
          ???}
          ???return?Participant.READ_ONLY;
          ??}?else?{
          ???if?(LOGGER.isDebugEnabled())?{
          ????LOGGER.logDebug("XAResource.prepare?(?"?+?this.xidToHexString
          ??????+?"?)?returning?OK?"?+?"on?resource?"
          ??????+?this.resourcename
          ??????+?"?represented?by?XAResource?instance?"
          ??????+?this.xaresource);
          ???}
          ???return?Participant.READ_ONLY?+?1;
          ??}
          • 終于,我們看到了這么一句?ret = this.xaresource.prepare(this.xid);?但是等等,我們之前不是說了,XA start xid?以后要先?XA end xid?嗎?答案就在?suspend();?里面。
          public?synchronized?void?suspend()?throws?ResourceException?{

          ??//?BugzID:?20545
          ??//?State?may?be?IN_DOUBT?or?TERMINATED?when?a?connection?is?closed?AFTER
          ??//?commit!
          ??//?In?that?case,?don't?call?END?again,?and?also?don't?generate?any
          ??//?error!
          ??//?This?is?required?for?some?hibernate?connection?release?strategies.
          ??if?(this.state.equals(TxState.ACTIVE))?{
          ???try?{
          ????if?(LOGGER.isDebugEnabled())?{
          ?????LOGGER.logDebug("XAResource.end?(?"?+?this.xidToHexString
          ???????+?"?,?XAResource.TMSUCCESS?)?on?resource?"
          ???????+?this.resourcename
          ???????+?"?represented?by?XAResource?instance?"
          ???????+?this.xaresource);
          ????}
          ?????????????????//執(zhí)行了?xa?end?語句
          ????this.xaresource.end(this.xid,?XAResource.TMSUCCESS);

          ???}?catch?(XAException?xaerr)?{
          ????String?msg?=?interpretErrorCode(this.resourcename,?"end",
          ??????this.xid,?xaerr.errorCode);
          ????if?(LOGGER.isTraceEnabled())
          ?????LOGGER.logTrace(msg,?xaerr);
          ????//?don't?throw:?fix?for?case?102827
          ???}
          ???setState(TxState.LOCALLY_DONE);
          ??}
          ?}

          到了這里,我們已經(jīng)執(zhí)行了 XA start xid -> XA end xid --> XA prepare xid, 接下來就是最后一步 commit

          • 我們再回到?terminate(false)?方法,來看 commit()流程。其實(shí)和 prepare流程一樣,最后會走到?com.atomikos.datasource.xa.XAResourceTransaction.commit()。commit執(zhí)行完,數(shù)據(jù)提交
          //繁雜代碼過多,就顯示核心的
          this.xaresource.commit(this.xid,?onePhase);

          思考:這里的參與者提交是在一個(gè)循環(huán)里面,一個(gè)一個(gè)提交的,如果之前的提交了,后面的參與者提交的時(shí)候,掛了,就會造成數(shù)據(jù)的不一致性。

          Atomikos rollback() 流程


          上面我們已經(jīng)分析了commit流程,其實(shí)rollback流程和commit流程一樣,我們在把目光切換回?org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager.rollback()?,最后會執(zhí)行到com.atomikos.icatch.imp.CompositeTransactionImp.rollback()。

          ????public?void?rollback?()?throws?IllegalStateException,?SysException
          ????
          {
          ????????//清空資源,更新事務(wù)日志狀態(tài)等
          ?????doRollback?();
          ????????if?(?isRoot?()?)?{
          ????????????try?{
          ????????????????coordinator.terminate?(?false?);
          ????????????}?catch?(?Exception?e?)?{
          ????????????????throw?new?SysException?(?"Unexpected?error?in?rollback:?"?+?e.getMessage?(),?e?);
          ????????????}
          ????????}
          ????}
          • 重點(diǎn)關(guān)注?coordinator.terminate ( false );?,這個(gè)和 commit流程是一樣的,只不過在 commit流程里面,參數(shù)傳的是true。
          ?protected?void?terminate?(?boolean?commit?)?throws?HeurRollbackException,
          ????????????HeurMixedException,?SysException,?java.lang.SecurityException,
          ????????????HeurCommitException,?HeurHazardException,?RollbackException,
          ????????????IllegalStateException

          ????
          {
          ?????synchronized?(?fsm_?)?{
          ??????if?(?commit?)?{
          ???????if?(?participants_.size?()?<=?1?)?{
          ????????commit?(?true?);
          ???????}?else?{
          ????????int?prepareResult?=?prepare?();
          ????????//?make?sure?to?only?do?commit?if?NOT?read?only
          ????????if?(?prepareResult?!=?Participant.READ_ONLY?)
          ?????????commit?(?false?);
          ???????}
          ??????}?else?{
          ?????????????????//如果是false,走的是rollback
          ???????rollback?();
          ??????}
          ?????}
          ????}
          • 我們重點(diǎn)關(guān)注?rollback()?,最后會走到com.atomikos.datasource.xa.XAResourceTransaction.rollback()。
          public?synchronized?void?rollback()
          ???throws?HeurCommitException,?HeurMixedException,
          ???HeurHazardException,?SysException?
          {
          ??terminateInResource();

          ??if?(rollbackShouldDoNothing())?{
          ???return;
          ??}
          ??if?(this.state.equals(TxState.TERMINATED))?{
          ???return;
          ??}

          ??if?(this.state.equals(TxState.HEUR_MIXED))
          ???throw?new?HeurMixedException();
          ??if?(this.state.equals(TxState.HEUR_COMMITTED))
          ???throw?new?HeurCommitException();
          ??if?(this.xaresource?==?null)?{
          ???throw?new?HeurHazardException("XAResourceTransaction?"
          ?????+?getXid()?+?":?no?XAResource?to?rollback?");
          ??}

          ??try?{
          ???if?(this.state.equals(TxState.ACTIVE))?{?//?first?suspend?xid
          ????suspend();
          ???}

          ???//?refresh?xaresource?for?MQSeries:?seems?to?close?XAResource?after
          ???//?suspend???
          ???testOrRefreshXAResourceFor2PC();
          ???if?(LOGGER.isDebugEnabled())?{
          ????LOGGER.logDebug("XAResource.rollback?(?"?+?this.xidToHexString
          ??????+?"?)?"?+?"on?resource?"?+?this.resourcename
          ??????+?"?represented?by?XAResource?instance?"
          ??????+?this.xaresource);
          ???}
          ???this.xaresource.rollback(this.xid);

          先在supend()方法里面執(zhí)行了?XA end xid?語句, 接下來執(zhí)行?this.xaresource.rollback(this.xid);?進(jìn)行數(shù)據(jù)的回滾。

          Atomikos-recover 流程

          說事務(wù)恢復(fù)流程之前,我們來討論下,會啥會出現(xiàn)事務(wù)恢復(fù)?XA二階段提交協(xié)議不是強(qiáng)一致性的嗎?要解答這個(gè)問題,我們就要來看看XA二階段協(xié)議有什么問題?

          問題一 :單點(diǎn)故障

          由于協(xié)調(diào)者的重要性,一旦協(xié)調(diào)者TM發(fā)生故障。參與者RM會一直阻塞下去。尤其在第二階段,協(xié)調(diào)者發(fā)生故障,那么所有的參與者還都處于鎖定事務(wù)資源的狀態(tài)中,而無法繼續(xù)完成事務(wù)操作。(如果是協(xié)調(diào)者掛掉,可以重新選舉一個(gè)協(xié)調(diào)者,但是無法解決因?yàn)閰f(xié)調(diào)者宕機(jī)導(dǎo)致的參與者處于阻塞狀態(tài)的問題)

          問題二 :數(shù)據(jù)不一致

          數(shù)據(jù)不一致。在二階段提交的階段二中,當(dāng)協(xié)調(diào)者向參與者發(fā)送commit請求之后,發(fā)生了局部網(wǎng)絡(luò)異?;蛘咴诎l(fā)送commit請求過程中協(xié)調(diào)者發(fā)生了故障,這回導(dǎo)致只有一部分參與者接受到了commit請求。而在這部分參與者接到commit請求之后就會執(zhí)行commit操作。但是其他部分未接到commit請求的機(jī)器則無法執(zhí)行事務(wù)提交。于是整個(gè)分布式系統(tǒng)便出現(xiàn)了數(shù)據(jù)不一致性的現(xiàn)象。

          如何解決?

          解決的方案簡單,就是我們在事務(wù)的操作的每一步,我們都需要對事務(wù)狀態(tài)的日志進(jìn)行人為的記錄,我們可以把日志記錄存儲在我們想存儲的地方,可以是本地存儲,也可以中心化的存儲。atomikos的開源版本,我們之前也分析了,它是使用內(nèi)存 + file的方式,存儲在本地,這樣的話,如果在一個(gè)集群系統(tǒng)里面,如果有節(jié)點(diǎn)宕機(jī),日志又存儲在本地,所以事務(wù)不能及時(shí)的恢復(fù)(需要重啟服務(wù))。

          Atomikos 多場景下事務(wù)恢復(fù)。

          Atomikos 提供了二種方式,來應(yīng)對不同場景下的異常情況。

          • 場景一:服務(wù)節(jié)點(diǎn)不宕機(jī),因?yàn)槠渌脑?,產(chǎn)生需要事務(wù)恢復(fù)的情況。這個(gè)時(shí)候才要定時(shí)任務(wù)進(jìn)行恢復(fù)。具體的代碼?com.atomikos.icatch.imp.TransactionServiceImp.init()?方法,會初始化一個(gè)定時(shí)任務(wù),進(jìn)行事務(wù)的恢復(fù)。
          public?synchronized?void?init?(?Properties?properties?)?throws?SysException
          ????
          {
          ????????shutdownInProgress_?=?false;
          ????????control_?=?new?com.atomikos.icatch.admin.imp.LogControlImp?(?(AdminLog)?this.recoveryLog?);
          ??ConfigProperties?configProperties?=?new?ConfigProperties(properties);
          ??long?recoveryDelay?=?configProperties.getRecoveryDelay();
          ????????recoveryTimer?=?new?PooledAlarmTimer(recoveryDelay);
          ????????recoveryTimer.addAlarmTimerListener(new?AlarmTimerListener()?{
          ???@Override
          ???public?void?alarm(AlarmTimer?timer)?{
          ????//進(jìn)行事務(wù)恢復(fù)
          ????performRecovery();

          ???}
          ??});

          ????????TaskManager.SINGLETON.executeTask(recoveryTimer);
          ????????initialized_?=?true;
          ????}
          • 最終會進(jìn)入com.atomikos.datasource.xa.XATransactionalResource.recover()?方法。
          ???public?void?recover()?{
          ?????XaResourceRecoveryManager?xaResourceRecoveryManager?=?XaResourceRecoveryManager.getInstance();
          ?????if?(xaResourceRecoveryManager?!=?null)?{?//null?for?LogCloud?recovery
          ??????try?{
          ????xaResourceRecoveryManager.recover(getXAResource());
          ???}?catch?(Exception?e)?{
          ????refreshXAResource();?//cf?case?156968
          ???}

          ?????}
          ????}
          • 場景二: 當(dāng)服務(wù)節(jié)點(diǎn)宕機(jī)重啟動(dòng)過程中進(jìn)行事務(wù)的恢復(fù)。具體實(shí)現(xiàn)在com.atomikos.datasource.xa.XATransactionalResource.setRecoveryService()方法里面
          ?@Override
          ?public?void?setRecoveryService?(?RecoveryService?recoveryService?)
          ????????????throws?ResourceException
          ????
          {

          ????????if?(?recoveryService?!=?null?)?{
          ????????????if?(?LOGGER.isTraceEnabled()?)?LOGGER.logTrace?(?"Installing?recovery?service?on?resource?"
          ????????????????????+?getName?()?);
          ????????????this.branchIdentifier=recoveryService.getName();
          ?????????//進(jìn)行事務(wù)恢復(fù)
          ????????????recover();
          ????????}

          ????}

          com.atomikos.datasource.xa.XATransactionalResource.recover() 流程詳解。

          主代碼:

          ?public?void?recover(XAResource?xaResource)?throws?XAException?{
          ??????//?根據(jù)XA?recovery?協(xié)議獲取?xid
          ??List?xidsToRecover?=?retrievePreparedXidsFromXaResource(xaResource);
          ??Collection?xidsToCommit;
          ??try?{
          ????????????//?xid?與日志記錄的xid進(jìn)行匹配
          ???xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();
          ???for?(XID?xid?:?xidsToRecover)?{
          ????if?(xidsToCommit.contains(xid))?{
          ????????????//執(zhí)行?XA?commit?xid?進(jìn)行提交
          ?????replayCommit(xid,?xaResource);
          ????}?else?{
          ?????attemptPresumedAbort(xid,?xaResource);
          ????}
          ???}
          ??}?catch?(LogException?couldNotRetrieveCommittingXids)?{
          ???LOGGER.logWarning("Transient?error?while?recovering?-?will?retry?later...",?couldNotRetrieveCommittingXids);
          ??}
          ?}
          • 我們來看一下如何根據(jù)?XA recovery 協(xié)議獲取RM端存儲的xid。進(jìn)入方法?retrievePreparedXidsFromXaResource(xaResource), 最后進(jìn)入?com.atomikos.datasource.xa.RecoveryScan.recoverXids()方法。
          public?static?List?recoverXids(XAResource?xaResource,?XidSelector?selector)?throws?XAException?{
          ??List?ret?=?new?ArrayList();

          ????????boolean?done?=?false;
          ????????int?flags?=?XAResource.TMSTARTRSCAN;
          ????????Xid[]?xidsFromLastScan?=?null;
          ????????List?allRecoveredXidsSoFar?=?new?ArrayList();
          ????????do?{
          ?????????xidsFromLastScan?=?xaResource.recover(flags);
          ????????????flags?=?XAResource.TMNOFLAGS;
          ????????????done?=?(xidsFromLastScan?==?null?||?xidsFromLastScan.length?==?0);
          ????????????if?(!done)?{
          ????????????????//?TEMPTATIVELY?SET?done?TO?TRUE
          ????????????????//?TO?TOLERATE?ORACLE?8.1.7?INFINITE
          ????????????????//?LOOP?(ALWAYS?RETURNS?SAME?RECOVER
          ????????????????//?SET).?IF?A?NEW?SET?OF?XIDS?IS?RETURNED
          ????????????????//?THEN?done?WILL?BE?RESET?TO?FALSE
          ????????????????done?=?true;
          ????????????????for?(?int?i?=?0;?i??????????????????XID?xid?=?new?XID?(?xidsFromLastScan[i]?);
          ????????????????????//?our?own?XID?implements?equals?and?hashCode?properly
          ????????????????????if?(!allRecoveredXidsSoFar.contains(xid))?{
          ????????????????????????//?a?new?xid?is?returned?->?we?can?not?be?in?a?recovery?loop?->?go?on
          ????????????????????????allRecoveredXidsSoFar.add(xid);
          ????????????????????????done?=?false;
          ????????????????????????if?(selector.selects(xid))?{
          ?????????????????????????ret.add(xid);
          ????????????????????????}
          ????????????????????}
          ????????????????}
          ????????????}
          ????????}?while?(!done);

          ??return?ret;
          ?}
          • 我們重點(diǎn)關(guān)注xidsFromLastScan = xaResource.recover(flags);?這個(gè)方法,如果我們使用MySQL,那么久會進(jìn)入 MysqlXAConnection.recover()方法。執(zhí)行?XA recovery xid?語句來獲取 xid。
          ?protected?static?Xid[]?recover(Connection?c,?int?flag)?throws?XAException?{
          ????????/*
          ?????????*?The?XA?RECOVER?statement?returns?information?for?those?XA?transactions?on?the?MySQL?server?that?are?in?the?PREPARED?state.?(See?Section?13.4.7.2,????XA
          ?????????*?Transaction?States???.)?The?output?includes?a?row?for?each?such?XA?transaction?on?the?server,?regardless?of?which?client?started?it.
          ?????????*
          ?????????*?XA?RECOVER?output?rows?look?like?this?(for?an?example?xid?value?consisting?of?the?parts?'abc',?'def',?and?7):
          ?????????*
          ?????????*?mysql>?XA?RECOVER;
          ?????????*?+----------+--------------+--------------+--------+
          ?????????*?|?formatID?|?gtrid_length?|?bqual_length?|?data?|
          ?????????*?+----------+--------------+--------------+--------+
          ?????????*?|?7?|?3?|?3?|?abcdef?|
          ?????????*?+----------+--------------+--------------+--------+
          ?????????*
          ?????????*?The?output?columns?have?the?following?meanings:
          ?????????*
          ?????????*?formatID?is?the?formatID?part?of?the?transaction?xid
          ?????????*?gtrid_length?is?the?length?in?bytes?of?the?gtrid?part?of?the?xid
          ?????????*?bqual_length?is?the?length?in?bytes?of?the?bqual?part?of?the?xid
          ?????????*?data?is?the?concatenation?of?the?gtrid?and?bqual?parts?of?the?xid
          ?????????*/


          ????????boolean?startRscan?=?((flag?&?TMSTARTRSCAN)?>?0);
          ????????boolean?endRscan?=?((flag?&?TMENDRSCAN)?>?0);

          ????????if?(!startRscan?&&?!endRscan?&&?flag?!=?TMNOFLAGS)?{
          ????????????throw?new?MysqlXAException(XAException.XAER_INVAL,?Messages.getString("MysqlXAConnection.001"),?null);
          ????????}

          ????????//
          ????????//?We?return?all?recovered?XIDs?at?once,?so?if?not??TMSTARTRSCAN,?return?no?new?XIDs
          ????????//
          ????????//?We?don't?attempt?to?maintain?state?to?check?for?TMNOFLAGS?"outside"?of?a?scan
          ????????//

          ????????if?(!startRscan)?{
          ????????????return?new?Xid[0];
          ????????}

          ????????ResultSet?rs?=?null;
          ????????Statement?stmt?=?null;

          ????????List?recoveredXidList?=?new?ArrayList();

          ????????try?{
          ????????????//?TODO:?Cache?this?for?lifetime?of?XAConnection
          ????????????stmt?=?c.createStatement();

          ????????????rs?=?stmt.executeQuery("XA?RECOVER");

          ????????????while?(rs.next())?{
          ????????????????final?int?formatId?=?rs.getInt(1);
          ????????????????int?gtridLength?=?rs.getInt(2);
          ????????????????int?bqualLength?=?rs.getInt(3);
          ????????????????byte[]?gtridAndBqual?=?rs.getBytes(4);

          ????????????????final?byte[]?gtrid?=?new?byte[gtridLength];
          ????????????????final?byte[]?bqual?=?new?byte[bqualLength];

          ????????????????if?(gtridAndBqual.length?!=?(gtridLength?+?bqualLength))?{
          ????????????????????throw?new?MysqlXAException(XAException.XA_RBPROTO,?Messages.getString("MysqlXAConnection.002"),?null);
          ????????????????}

          ????????????????System.arraycopy(gtridAndBqual,?0,?gtrid,?0,?gtridLength);
          ????????????????System.arraycopy(gtridAndBqual,?gtridLength,?bqual,?0,?bqualLength);

          ????????????????recoveredXidList.add(new?MysqlXid(gtrid,?bqual,?formatId));
          ????????????}
          ????????}?catch?(SQLException?sqlEx)?{
          ????????????throw?mapXAExceptionFromSQLException(sqlEx);
          ????????}?finally?{
          ????????????if?(rs?!=?null)?{
          ????????????????try?{
          ????????????????????rs.close();
          ????????????????}?catch?(SQLException?sqlEx)?{
          ????????????????????throw?mapXAExceptionFromSQLException(sqlEx);
          ????????????????}
          ????????????}

          ????????????if?(stmt?!=?null)?{
          ????????????????try?{
          ????????????????????stmt.close();
          ????????????????}?catch?(SQLException?sqlEx)?{
          ????????????????????throw?mapXAExceptionFromSQLException(sqlEx);
          ????????????????}
          ????????????}
          ????????}

          ????????int?numXids?=?recoveredXidList.size();

          ????????Xid[]?asXids?=?new?Xid[numXids];
          ????????Object[]?asObjects?=?recoveredXidList.toArray();

          ????????for?(int?i?=?0;?i?????????????asXids[i]?=?(Xid)?asObjects[i];
          ????????}

          ????????return?asXids;
          ????}
          • 這里要注意如果Mysql的版本 <5.7.7 ,則不會有任何數(shù)據(jù),在以后的版本中Mysql進(jìn)行了修復(fù),因此如果我們想要使用MySQL充當(dāng)RM,版本必須 >= 5.7.7?,原因是:

          MySQL 5.6版本在客戶端退出的時(shí)候,自動(dòng)把已經(jīng)prepare的事務(wù)回滾了,那么MySQL為什么要這樣做?這主要取決于MySQL的內(nèi)部實(shí)現(xiàn),MySQL 5.7以前的版本,對于prepare的事務(wù),MySQL是不會記錄binlog的(官方說是減少fsync,起到了優(yōu)化的作用)。只有當(dāng)分布式事務(wù)提交的時(shí)候才會把前面的操作寫入binlog信息,所以對于binlog來說,分布式事務(wù)與普通的事務(wù)沒有區(qū)別,而prepare以前的操作信息都保存在連接的IO_CACHE中,如果這個(gè)時(shí)候客戶端退出了,以前的binlog信息都會被丟失,再次重連后允許提交的話,會造成Binlog丟失,從而造成主從數(shù)據(jù)的不一致,所以官方在客戶端退出的時(shí)候直接把已經(jīng)prepare的事務(wù)都回滾了!

          • 回到主線再從自己記錄的事務(wù)日志里面獲取XID
          ??Collection?xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();
          • 我們來看下獲取事務(wù)日志里面的XID的retrieveExpiredCommittingXidsFromLog()方法。然后進(jìn)入com.atomikos.recovery.imp.RecoveryLogImp.getCommittingParticipants()方法。
          public?Collection?getCommittingParticipants()
          ???throws?LogReadException?
          {
          ??Collection?committingParticipants?=?new?HashSet();
          ??Collection?committingCoordinatorLogEntries?=?repository.findAllCommittingCoordinatorLogEntries();

          ??for?(CoordinatorLogEntry?coordinatorLogEntry?:?committingCoordinatorLogEntries)?{
          ???for?(ParticipantLogEntry?participantLogEntry?:?coordinatorLogEntry.participants)?{
          ????committingParticipants.add(participantLogEntry);
          ???}
          ??}
          ??return?committingParticipants;
          ?}

          到這里我們來簡單介紹一下,事務(wù)日志的存儲結(jié)構(gòu)。首先是?CoordinatorLogEntry,這是一次XA事務(wù)的所有信息實(shí)體類。

          public?class?CoordinatorLogEntry?implements?Serializable?{

          ??//全局事務(wù)id
          ??public?final?String?id;

          ???//是否已經(jīng)提交
          ?public?final?boolean?wasCommitted;

          ?/**
          ??*?Only?for?subtransactions,?null?otherwise.
          ??*/

          ?public?final?String?superiorCoordinatorId;

          ???//參與者集合
          ?public?final?ParticipantLogEntry[]?participants;
          }
          • 再來看一下參與者實(shí)體類?ParticipantLogEntry?:
          public?class?ParticipantLogEntry?implements?Serializable?{

          ?private?static?final?long?serialVersionUID?=?1728296701394899871L;

          ?/**
          ??*?The?ID?of?the?global?transaction?as?known?by?the?transaction?core.
          ??*/


          ?public?final?String?coordinatorId;

          ?/**
          ??*?Identifies?the?participant?within?the?global?transaction.
          ??*/


          ?public?final?String?uri;

          ?/**
          ??*?When?does?this?participant?expire?(expressed?in?millis?since?Jan?1,?1970)?
          ??*/


          ?public?final?long?expires;

          ?/**
          ??*?Best-known?state?of?the?participant.
          ??*/

          ?public?final?TxState?state;

          ?/**
          ??*?For?diagnostic?purposes,?null?if?not?relevant.
          ??*/

          ?public?final?String?resourceName;
          }
          • 回到com.atomikos.recovery.xa.DefaultXaRecoveryLog.getExpiredCommittingXids()?方法,可以到獲取了一次XA事務(wù)過程中,存儲的事務(wù)日志中的xid。
          public?Set?getExpiredCommittingXids()?throws?LogReadException?{
          ??Set?ret?=?new?HashSet();
          ??Collection?entries?=?log.getCommittingParticipants();
          ??for?(ParticipantLogEntry?entry?:?entries)?{
          ???if?(expired(entry)?&&?!http(entry))?{
          ????XID?xid?=?new?XID(entry.coordinatorId,?entry.uri);
          ????ret.add(xid);
          ???}
          ??}
          ??return?ret;
          ?}
          • 如果從RM中通過XA recovery取出的XID,包含在從事務(wù)日志中取出的XID,則進(jìn)行commit,否則進(jìn)行rollback.
          List?xidsToRecover?=?retrievePreparedXidsFromXaResource(xaResource);
          ??Collection?xidsToCommit;
          ??try?{
          ???xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();
          ???for?(XID?xid?:?xidsToRecover)?{
          ????if?(xidsToCommit.contains(xid))?{
          ?????replayCommit(xid,?xaResource);
          ????}?else?{
          ?????attemptPresumedAbort(xid,?xaResource);
          ????}
          ???}
          ??}?catch?(LogException?couldNotRetrieveCommittingXids)?{
          ???LOGGER.logWarning("Transient?error?while?recovering?-?will?retry?later...",?couldNotRetrieveCommittingXids);
          ??}
          • replayCommit 方法如下:
          private?void?replayCommit(XID?xid,?XAResource?xaResource)?{
          ??if?(LOGGER.isDebugEnabled())?LOGGER.logDebug("Replaying?commit?of?xid:?"?+?xid);
          ??try?{
          ??????//進(jìn)行事務(wù)提交
          ???xaResource.commit(xid,?false);
          ?????//更新事務(wù)日志
          ???log.terminated(xid);
          ??}?catch?(XAException?e)?{
          ???if?(alreadyHeuristicallyTerminatedByResource(e))?{
          ????handleHeuristicTerminationByResource(xid,?xaResource,?e,?true);
          ???}?else?if?(xidTerminatedInResourceByConcurrentCommit(e))?{
          ????log.terminated(xid);
          ???}?else?{
          ????LOGGER.logWarning("Transient?error?while?replaying?commit?-?will?retry?later...",?e);
          ???}
          ??}
          ?}
          • attemptPresumedAbort(xid, xaResource); 方法如下:
          private?void?attemptPresumedAbort(XID?xid,?XAResource?xaResource)?{
          ??try?{
          ???log.presumedAborting(xid);
          ???if?(LOGGER.isDebugEnabled())?LOGGER.logDebug("Presumed?abort?of?xid:?"?+?xid);
          ???try?{
          ?????????//進(jìn)行回滾
          ????xaResource.rollback(xid);
          ????????//更新日志狀態(tài)
          ????log.terminated(xid);
          ???}?catch?(XAException?e)?{
          ????if?(alreadyHeuristicallyTerminatedByResource(e))?{
          ?????handleHeuristicTerminationByResource(xid,?xaResource,?e,?false);
          ????}?else?if?(xidTerminatedInResourceByConcurrentRollback(e))?{
          ?????log.terminated(xid);
          ????}?else?{
          ?????LOGGER.logWarning("Unexpected?exception?during?recovery?-?ignoring?to?retry?later...",?e);
          ????}
          ???}
          ??}?catch?(IllegalStateException?presumedAbortNotAllowedInCurrentLogState)?{
          ???//?ignore?to?retry?later?if?necessary
          ??}?catch?(LogException?logWriteException)?{
          ???LOGGER.logWarning("log?write?failed?for?Xid:?"+xid+",?ignoring?to?retry?later",?logWriteException);
          ??}
          ?}

          總結(jié)

          文章到此,已經(jīng)寫的很長很多了,我們分析了ShardingSphere對于XA方案,提供了一套SPI解決方案,對Atomikos進(jìn)行了整合,也分析了Atomikos初始化流程,開始事務(wù)流程,獲取連接流程,提交事務(wù)流程,回滾事務(wù)流程,事務(wù)恢復(fù)流程。希望對大家理解XA的原理有所幫助。

          DD自研的滬牌代拍業(yè)務(wù),點(diǎn)擊直達(dá)



          【往期推薦】

          TiDB 數(shù)據(jù)庫的 4 大應(yīng)用場景分析

          2020-11-27

          雖然這些代碼很少,就幾行,但卻很牛逼!

          2020-11-27

          都2020 了,最流行的密碼居然依舊是...

          2020-11-26

          不講武德的微信,又來一波新功能!

          2020-11-25

          其他職業(yè)越老越值錢,程序員越老越貶值?

          2020-11-24




          掃一掃,關(guān)注我

          一起學(xué)習(xí),一起進(jìn)步

          每周贈書,福利不斷

          深度內(nèi)容

          推薦加入




          瀏覽 38
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  囯产黄色视频 | 国产内射久久 | 欧美激情综合五月色丁香 | 尤物视频高清无码在线观看 | 人人摸人人干人人操 |