SpringBoot 分布式事務的解決方案(JTA+Atomic+多數(shù)據(jù)源
來源:blog.csdn.net/jaryle/article/details/88638780
首先,到底啥是分布式事務呢,比如我們在執(zhí)行一個業(yè)務邏輯的時候有兩步分別操作A數(shù)據(jù)源和B數(shù)據(jù)源,當我們在A數(shù)據(jù)源執(zhí)行數(shù)據(jù)更改后,在B數(shù)據(jù)源執(zhí)行時出現(xiàn)運行時異常,那么我們必須要讓B數(shù)據(jù)源的操作回滾,并回滾對A數(shù)據(jù)源的操作;這種情況在支付業(yè)務時常常出現(xiàn);比如買票業(yè)務在最后支付失敗,那之前的操作必須全部回滾,如果之前的操作分布在多個數(shù)據(jù)源中,那么這就是典型的分布式事務回滾;
了解了什么是分布式事務,那分布式事務在java的解決方案就是JTA(即Java Transaction API);springboot官方提供了 Atomikos or Bitronix的解決思路;
其實,大多數(shù)情況下很多公司是使用消息隊列的方式實現(xiàn)分布式事務。
本篇文章重點講解springboot環(huán)境下,整合 Atomikos +mysql+mybatis+tomcat/jetty;
一、項目依賴
pom.xml中添加atomikos的springboot相關依賴:
<dependency>
????<groupId>org.springframework.bootgroupId>
????<artifactId>spring-boot-starter-jta-atomikosartifactId>
dependency>
點進去會發(fā)現(xiàn)里面整合好了:transactions-jms、transactions-jta、transactions-jdbc、javax.transaction-api
二、把數(shù)據(jù)源的相關配置項單獨提煉到一個application.yml中:
注意:
這回我們的
spring.datasource.type?是com.alibaba.druid.pool.xa.DruidXADataSource;spring.jta.transaction-manager-id的值在你的電腦中是唯一的,這個詳細請閱讀官方文檔;

完整的yml文件如下:
spring:
??datasource:
????type:?com.alibaba.druid.pool.xa.DruidXADataSource
????druid:
????
??????systemDB:
????????name:?systemDB
????????url:?jdbc:mysql://localhost:3306/springboot-mybatis?useUnicode=true&characterEncoding=utf-8
????????username:?root
????????password:?root
????????#?下面為連接池的補充設置,應用到上面所有數(shù)據(jù)源中
????????#?初始化大小,最小,最大
????????initialSize:?5
????????minIdle:?5
????????maxActive:?20
????????#?配置獲取連接等待超時的時間
????????maxWait:?60000
????????#?配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒
????????timeBetweenEvictionRunsMillis:?60000
????????#?配置一個連接在池中最小生存的時間,單位是毫秒
????????minEvictableIdleTimeMillis:?30
????????validationQuery:?SELECT?1
????????validationQueryTimeout:?10000
????????testWhileIdle:?true
????????testOnBorrow:?false
????????testOnReturn:?false
????????#?打開PSCache,并且指定每個連接上PSCache的大小
????????poolPreparedStatements:?true
????????maxPoolPreparedStatementPerConnectionSize:?20
????????filters:?stat,wall
????????#?通過connectProperties屬性來打開mergeSql功能;慢SQL記錄
????????connectionProperties:?druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
????????#?合并多個DruidDataSource的監(jiān)控數(shù)據(jù)
????????useGlobalDataSourceStat:?true
?
??????businessDB:
????????name:?businessDB
?
????????url:?jdbc:mysql://localhost:3306/springboot-mybatis2?useUnicode=true&characterEncoding=utf-8
????????username:?root
????????password:?root
????????#?下面為連接池的補充設置,應用到上面所有數(shù)據(jù)源中
????????#?初始化大小,最小,最大
????????initialSize:?5
????????minIdle:?5
????????maxActive:?20
????????#?配置獲取連接等待超時的時間
????????maxWait:?60000
????????#?配置間隔多久才進行一次檢測,檢測需要關閉的空閑連接,單位是毫秒
????????timeBetweenEvictionRunsMillis:?60000
????????#?配置一個連接在池中最小生存的時間,單位是毫秒
????????minEvictableIdleTimeMillis:?30
????????validationQuery:?SELECT?1
????????validationQueryTimeout:?10000
????????testWhileIdle:?true
????????testOnBorrow:?false
????????testOnReturn:?false
????????#?打開PSCache,并且指定每個連接上PSCache的大小
????????poolPreparedStatements:?true
????????maxPoolPreparedStatementPerConnectionSize:?20
????????filters:?stat,wall
????????#?通過connectProperties屬性來打開mergeSql功能;慢SQL記錄
????????connectionProperties:?druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
????????#?合并多個DruidDataSource的監(jiān)控數(shù)據(jù)
????????useGlobalDataSourceStat:?true
?
??#jta相關參數(shù)配置
??jta:
????log-dir:?classpath:tx-logs
????transaction-manager-id:?txManager
三、在DruidConfig.java中實現(xiàn)多個數(shù)據(jù)源的注冊;分布式事務管理器的注冊;druid的注冊;
package?com.zjt.config;
?
import?com.alibaba.druid.filter.stat.StatFilter;
import?com.alibaba.druid.support.http.StatViewServlet;
import?com.alibaba.druid.support.http.WebStatFilter;
import?com.alibaba.druid.wall.WallConfig;
import?com.alibaba.druid.wall.WallFilter;
import?com.atomikos.icatch.jta.UserTransactionImp;
import?com.atomikos.icatch.jta.UserTransactionManager;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import?org.springframework.boot.web.servlet.FilterRegistrationBean;
import?org.springframework.boot.web.servlet.ServletRegistrationBean;
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;
import?org.springframework.context.annotation.Primary;
import?org.springframework.core.env.Environment;
import?org.springframework.transaction.jta.JtaTransactionManager;
?
import?javax.sql.DataSource;
import?javax.transaction.UserTransaction;
import?java.util.Properties;
?
/**
?*?Druid配置
?*
?*?
?*/
@Configuration
public?class?DruidConfig?{
????@Bean(name?=?"systemDataSource")
????@Primary
????@Autowired
????public?DataSource?systemDataSource(Environment?env)?{
????????AtomikosDataSourceBean?ds?=?new?AtomikosDataSourceBean();
????????Properties?prop?=?build(env,?"spring.datasource.druid.systemDB.");
????????ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
????????ds.setUniqueResourceName("systemDB");
????????ds.setPoolSize(5);
????????ds.setXaProperties(prop);
????????return?ds;
?
????}
?
????@Autowired
????@Bean(name?=?"businessDataSource")
????public?AtomikosDataSourceBean?businessDataSource(Environment?env)?{
?
????????AtomikosDataSourceBean?ds?=?new?AtomikosDataSourceBean();
????????Properties?prop?=?build(env,?"spring.datasource.druid.businessDB.");
????????ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
????????ds.setUniqueResourceName("businessDB");
????????ds.setPoolSize(5);
????????ds.setXaProperties(prop);
?
????????return?ds;
????}
?
?
????/**
?????*?注入事物管理器
?????*?@return
?????*/
????@Bean(name?=?"xatx")
????public?JtaTransactionManager?regTransactionManager?()?{
????????UserTransactionManager?userTransactionManager?=?new?UserTransactionManager();
????????UserTransaction?userTransaction?=?new?UserTransactionImp();
????????return?new?JtaTransactionManager(userTransaction,?userTransactionManager);
????}
?
?
????private?Properties?build(Environment?env,?String?prefix)?{
?
????????Properties?prop?=?new?Properties();
????????prop.put("url",?env.getProperty(prefix?+?"url"));
????????prop.put("username",?env.getProperty(prefix?+?"username"));
????????prop.put("password",?env.getProperty(prefix?+?"password"));
????????prop.put("driverClassName",?env.getProperty(prefix?+?"driverClassName",?""));
????????prop.put("initialSize",?env.getProperty(prefix?+?"initialSize",?Integer.class));
????????prop.put("maxActive",?env.getProperty(prefix?+?"maxActive",?Integer.class));
????????prop.put("minIdle",?env.getProperty(prefix?+?"minIdle",?Integer.class));
????????prop.put("maxWait",?env.getProperty(prefix?+?"maxWait",?Integer.class));
????????prop.put("poolPreparedStatements",?env.getProperty(prefix?+?"poolPreparedStatements",?Boolean.class));
?
????????prop.put("maxPoolPreparedStatementPerConnectionSize",
????????????????env.getProperty(prefix?+?"maxPoolPreparedStatementPerConnectionSize",?Integer.class));
?
????????prop.put("maxPoolPreparedStatementPerConnectionSize",
????????????????env.getProperty(prefix?+?"maxPoolPreparedStatementPerConnectionSize",?Integer.class));
????????prop.put("validationQuery",?env.getProperty(prefix?+?"validationQuery"));
????????prop.put("validationQueryTimeout",?env.getProperty(prefix?+?"validationQueryTimeout",?Integer.class));
????????prop.put("testOnBorrow",?env.getProperty(prefix?+?"testOnBorrow",?Boolean.class));
????????prop.put("testOnReturn",?env.getProperty(prefix?+?"testOnReturn",?Boolean.class));
????????prop.put("testWhileIdle",?env.getProperty(prefix?+?"testWhileIdle",?Boolean.class));
????????prop.put("timeBetweenEvictionRunsMillis",
????????????????env.getProperty(prefix?+?"timeBetweenEvictionRunsMillis",?Integer.class));
????????prop.put("minEvictableIdleTimeMillis",?env.getProperty(prefix?+?"minEvictableIdleTimeMillis",?Integer.class));
????????prop.put("filters",?env.getProperty(prefix?+?"filters"));
?
????????return?prop;
????}
?
????@Bean
????public?ServletRegistrationBean?druidServlet()?{
????????ServletRegistrationBean?servletRegistrationBean?=?new?ServletRegistrationBean(new?StatViewServlet(),?"/druid/*");
?
????????//控制臺管理用戶,加入下面2行?進入druid后臺就需要登錄
????????//servletRegistrationBean.addInitParameter("loginUsername",?"admin");
????????//servletRegistrationBean.addInitParameter("loginPassword",?"admin");
????????return?servletRegistrationBean;
????}
?
????@Bean
????public?FilterRegistrationBean?filterRegistrationBean()?{
????????FilterRegistrationBean?filterRegistrationBean?=?new?FilterRegistrationBean();
????????filterRegistrationBean.setFilter(new?WebStatFilter());
????????filterRegistrationBean.addUrlPatterns("/*");
????????filterRegistrationBean.addInitParameter("exclusions",?"*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
????????filterRegistrationBean.addInitParameter("profileEnable",?"true");
????????return?filterRegistrationBean;
????}
?
????@Bean
????public?StatFilter?statFilter(){
????????StatFilter?statFilter?=?new?StatFilter();
????????statFilter.setLogSlowSql(true);?//slowSqlMillis用來配置SQL慢的標準,執(zhí)行時間超過slowSqlMillis的就是慢。
????????statFilter.setMergeSql(true);?//SQL合并配置
????????statFilter.setSlowSqlMillis(1000);//slowSqlMillis的缺省值為3000,也就是3秒。
????????return?statFilter;
????}
?
????@Bean
????public?WallFilter?wallFilter(){
????????WallFilter?wallFilter?=?new?WallFilter();
????????//允許執(zhí)行多條SQL
????????WallConfig?config?=?new?WallConfig();
????????config.setMultiStatementAllow(true);
????????wallFilter.setConfig(config);
????????return?wallFilter;
????}
?
}
四、分別配置每個數(shù)據(jù)源對應的sqlSessionFactory,以及MapperScan掃描的包:
MybatisDatasourceConfig.java
package?com.zjt.config;
?
import?com.zjt.util.MyMapper;
import?org.apache.ibatis.session.SqlSessionFactory;
import?org.mybatis.spring.SqlSessionFactoryBean;
import?org.mybatis.spring.SqlSessionTemplate;
import?org.mybatis.spring.annotation.MapperScan;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.beans.factory.annotation.Qualifier;
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;
import?org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import?org.springframework.core.io.support.ResourcePatternResolver;
?
import?javax.sql.DataSource;
?
/**
?*?
?*?@description
?*/
@Configuration
//?精確到?mapper?目錄,以便跟其他數(shù)據(jù)源隔離
@MapperScan(basePackages?=?"com.zjt.mapper",?markerInterface?=?MyMapper.class,?sqlSessionFactoryRef?=?"sqlSessionFactory")
public?class?MybatisDatasourceConfig?{
?
????@Autowired
????@Qualifier("systemDataSource")
????private?DataSource?ds;
?
????@Bean
????public?SqlSessionFactory?sqlSessionFactory()?throws?Exception?{
????????SqlSessionFactoryBean?factoryBean?=?new?SqlSessionFactoryBean();
????????factoryBean.setDataSource(ds);
????????//指定mapper?xml目錄
????????ResourcePatternResolver?resolver?=?new?PathMatchingResourcePatternResolver();
????????factoryBean.setMapperLocations(resolver.getResources("classpath:mapper/*.xml"));
????????return?factoryBean.getObject();
?
????}
?
????@Bean
????public?SqlSessionTemplate?sqlSessionTemplate()?throws?Exception?{
????????SqlSessionTemplate?template?=?new?SqlSessionTemplate(sqlSessionFactory());?//?使用上面配置的Factory
????????return?template;
????}
?
????//關于事務管理器,不管是JPA還是JDBC等都實現(xiàn)自接口?PlatformTransactionManager
????//?如果你添加的是 spring-boot-starter-jdbc 依賴,框架會默認注入 DataSourceTransactionManager 實例。
????//在Spring容器中,我們手工注解@Bean 將被優(yōu)先加載,框架不會重新實例化其他的 PlatformTransactionManager 實現(xiàn)類。
????/*@Bean(name?=?"transactionManager")
????@Primary
????public?DataSourceTransactionManager?masterTransactionManager()?{
????????//MyBatis自動參與到spring事務管理中,無需額外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的數(shù)據(jù)源
????????//?與DataSourceTransactionManager引用的數(shù)據(jù)源一致即可,否則事務管理會不起作用。
????????return?new?DataSourceTransactionManager(ds);
????}*/
?
}
MybatisDatasource2Config.java
package?com.zjt.config;
?
import?com.zjt.util.MyMapper;
import?org.apache.ibatis.session.SqlSessionFactory;
import?org.mybatis.spring.SqlSessionFactoryBean;
import?org.mybatis.spring.SqlSessionTemplate;
import?org.mybatis.spring.annotation.MapperScan;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.beans.factory.annotation.Qualifier;
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;
import?org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import?org.springframework.core.io.support.ResourcePatternResolver;
?
import?javax.sql.DataSource;
?
/**
?*?
?*?@description
?*/
@Configuration
//?精確到?mapper?目錄,以便跟其他數(shù)據(jù)源隔離
@MapperScan(basePackages?=?"com.zjt.mapper2",?markerInterface?=?MyMapper.class,?sqlSessionFactoryRef?=?"sqlSessionFactory2")
public?class?MybatisDatasource2Config?{
?
????@Autowired
????@Qualifier("businessDataSource")
????private?DataSource?ds;
?
????@Bean
????public?SqlSessionFactory?sqlSessionFactory2()?throws?Exception?{
????????SqlSessionFactoryBean?factoryBean?=?new?SqlSessionFactoryBean();
????????factoryBean.setDataSource(ds);
????????//指定mapper?xml目錄
????????ResourcePatternResolver?resolver?=?new?PathMatchingResourcePatternResolver();
????????factoryBean.setMapperLocations(resolver.getResources("classpath:mapper2/*.xml"));
????????return?factoryBean.getObject();
?
????}
?
????@Bean
????public?SqlSessionTemplate?sqlSessionTemplate2()?throws?Exception?{
????????SqlSessionTemplate?template?=?new?SqlSessionTemplate(sqlSessionFactory2());?//?使用上面配置的Factory
????????return?template;
????}
?
????//關于事務管理器,不管是JPA還是JDBC等都實現(xiàn)自接口?PlatformTransactionManager
????//?如果你添加的是 spring-boot-starter-jdbc 依賴,框架會默認注入 DataSourceTransactionManager 實例。
????//在Spring容器中,我們手工注解@Bean 將被優(yōu)先加載,框架不會重新實例化其他的 PlatformTransactionManager 實現(xiàn)類。
????/*@Bean(name?=?"transactionManager2")
????@Primary
????public?DataSourceTransactionManager?masterTransactionManager()?{
????????//MyBatis自動參與到spring事務管理中,無需額外配置,只要org.mybatis.spring.SqlSessionFactoryBean引用的數(shù)據(jù)源
????????//?與DataSourceTransactionManager引用的數(shù)據(jù)源一致即可,否則事務管理會不起作用。
????????return?new?DataSourceTransactionManager(ds);
????}*/
?
}
由于我們本例中只使用一個事務管理器:xatx,故就不在使用TxAdviceInterceptor.java和TxAdvice2Interceptor.java中配置的事務管理器了;有需求的童鞋可以自己配置其他的事務管理器;(見DruidConfig.java中查看)
五、新建分布式業(yè)務測試接口JtaTestService.java和實現(xiàn)類JtaTestServiceImpl.java
其實就是一個很簡單的test01()方法,在該方法中我們分別先后調(diào)用classService.saveOrUpdateTClass(tClass);和teacherService.saveOrUpdateTeacher(teacher);
實現(xiàn)先后操作兩個數(shù)據(jù)源:然后我們可以自己debug跟蹤事務的提交時機,此外,也可以在在兩個方法全執(zhí)行結(jié)束之后,手動制造一個運行時異常,來檢查分布式事務是否全部回滾;
注意:
在實現(xiàn)類的方法中我使用的是:
@Transactional(transactionManager?=?"xatx",?propagation?=?Propagation.REQUIRED,?rollbackFor?=?{?java.lang.RuntimeException.class?})
從而指定了使用哪個事務管理器,事務隔離級別(一般都用我這個默認的),回滾的條件(一般可以使用Exception),這三個可以自己根據(jù)業(yè)務實際修改;
package?com.zjt.service3;
?
import?java.util.Map;
?
public?interface?JtaTestService?{
?
????public?Map?test01() ;
?
}
package?com.zjt.service3.impl;
?
?
import?com.zjt.entity.TClass;
import?com.zjt.entity.Teacher;
import?com.zjt.service.TClassService;
import?com.zjt.service2.TeacherService;
import?com.zjt.service3.JtaTestService;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.beans.factory.annotation.Qualifier;
import?org.springframework.stereotype.Service;
import?org.springframework.transaction.annotation.Propagation;
import?org.springframework.transaction.annotation.Transactional;
?
import?java.util.LinkedHashMap;
import?java.util.Map;
?
@Service("jtaTestServiceImpl")
public?class?JtaTestServiceImpl?implements?JtaTestService{
?
????@Autowired
????@Qualifier("teacherServiceImpl")
????private?TeacherService?teacherService;
????@Autowired
????@Qualifier("tclassServiceImpl")
????private?TClassService?tclassService;
?
????@Override
????@Transactional(transactionManager?=?"xatx",?propagation?=?Propagation.REQUIRED,?rollbackFor?=?{?java.lang.RuntimeException.class?})
????public?Map<String,?Object>?test01()?{
????????LinkedHashMap?resultMap=new?LinkedHashMap();
????????TClass?tClass=new?TClass();
????????tClass.setName("8888");
????????tclassService.saveOrUpdateTClass(tClass);
?
????????Teacher?teacher=new?Teacher();
????????teacher.setName("8888");
????????teacherService.saveOrUpdateTeacher(teacher);
?
????????System.out.println(1/0);
?
????????resultMap.put("state","success");
????????resultMap.put("message","分布式事務同步成功");
????????return?resultMap;
????}
}
六、建立JtaTestContoller.java,接受一個來自前端的http請求,觸發(fā)JtaTestService 的test01方法:
package?com.zjt.web;
?
?
import?com.zjt.service3.JtaTestService;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.beans.factory.annotation.Qualifier;
import?org.springframework.stereotype.Controller;
import?org.springframework.web.bind.annotation.RequestMapping;
import?org.springframework.web.bind.annotation.ResponseBody;
?
import?java.util.LinkedHashMap;
import?java.util.Map;
?
@Controller
@RequestMapping("/jtaTest")
public?class?JtaTestContoller?{
?
????@Autowired
????@Qualifier("jtaTestServiceImpl")
????private?JtaTestService?taTestService;
?
?
?
????@ResponseBody
????@RequestMapping("/test01")
????public?Map?test01() {
????????LinkedHashMap?resultMap=new?LinkedHashMap();
????????try?{
????????????return?taTestService.test01();
????????}catch?(Exception?e){
????????????resultMap.put("state","fail");
????????????resultMap.put("message","分布式事務同步失敗");
????????????return?resultMap;
????????}
????}
}
七、在test.ftl中增加一個按鈕來測試;
//分布式事務測試
$("#JTATest").click(function(){
$.ajax({
type: "POST",
url: "${basePath!}/jtaTest/test01",
data: {} ,
async: false,
error: function (request) {
layer.alert("與服務器連接失敗/(ㄒoㄒ)/~~");
return false;
},
success: function (data) {
if (data.state == 'fail') {
layer.alert(data.message);
return false;
}else if(data.state == 'success'){
layer.alert(data.message);
}
}
});
});
八、啟動服務,驗證結(jié)果:

點擊這個按鈕,跳轉(zhuǎn)到controller:

當正常執(zhí)行了sql語句之后,我們可以發(fā)現(xiàn)數(shù)據(jù)庫并沒有變化,因為整個方法的事務還沒有走完,當我們走到1/0這步時:

拋出運行時異常,并被spring事務攔截器攔截,并捕獲異常:

在this.completeTransactionAfterThrowing(txInfo, var16);方法中會將事務全部回滾:
22:09:04.243?logback?[http-nio-8080-exec-5]?INFO?c.a.i.imp.CompositeTransactionImp?-?rollback()?done?of?transaction?192.168.1.103.tm0000400006
此時,當我們再次打開數(shù)據(jù)庫驗證,依舊沒有變化,證明分布式事務配置成功;
大家可以基于我的代碼自己練習一下,自己嘗試著使用多事務管理器的情況下的靈活配置;
九、后記:
本文源代碼:
https://github.com/zhaojiatao/springboot-zjt-chapter10-springboot-atomikos-mysql-mybatis-druid.git
代碼在tomcat和jetty環(huán)境下均可完成事務回滾;
在事務回滾時可能報一個Transactional not active的警告,我google后,老外也說不出這個具體作用,大部分人認為這只是一個警告,可以忽略;
END
END
