SpringBoot+Mybatis配置多數(shù)據(jù)源及事務(wù)方案
前言
可能由于業(yè)務(wù)上的某些需求,我們的系統(tǒng)中有時(shí)往往要連接多個(gè)數(shù)據(jù)庫,這就產(chǎn)生了多數(shù)據(jù)源問題。
多數(shù)據(jù)源的情況下,一般我們要做到可以自動(dòng)切換,此時(shí)會(huì)涉及到事務(wù)注解 Transactional 不生效問題和分布式事務(wù)問題。
關(guān)于多數(shù)據(jù)源方案,筆者在網(wǎng)上看過一些例子,然而大部分都是錯(cuò)誤示例,根本跑不通,或者沒辦法兼容事務(wù)。
今天,我們就一點(diǎn)點(diǎn)來分析這些問題產(chǎn)生的根源和相應(yīng)的解決方法。
多數(shù)據(jù)源
為了劇情的順利開展,我們模擬的業(yè)務(wù)是創(chuàng)建訂單和扣減庫存。
所以,我們先創(chuàng)建訂單表和庫存表。注意,把他們分別放到兩個(gè)數(shù)據(jù)庫中。
CREATE?TABLE?`t_storage`?(
??`id`?int(11)?NOT?NULL?AUTO_INCREMENT,
??`commodity_code`?varchar(255)?DEFAULT?NULL,
??`count`?int(11)?DEFAULT?'0',
??PRIMARY?KEY?(`id`),
??UNIQUE?KEY?`commodity_code`?(`commodity_code`)
)?ENGINE=InnoDB?AUTO_INCREMENT=2?DEFAULT?CHARSET=utf8;
CREATE?TABLE?`t_order`?(
??`id`?bigint(16)?NOT?NULL,
??`commodity_code`?varchar(255)?DEFAULT?NULL,
??`count`?int(11)?DEFAULT?'0',
??`amount`?double(14,2)?DEFAULT?'0.00',
??PRIMARY?KEY?(`id`)
)?ENGINE=InnoDB?DEFAULT?CHARSET=utf8;
1、數(shù)據(jù)庫連接
通過YML文件先把兩個(gè)數(shù)據(jù)庫都配置一下。
spring:
??datasource:
????ds1:
??????jdbc_url:?jdbc:mysql://127.0.0.1:3306/db1
??????username:?root
??????password:?root
????ds2:
??????jdbc_url:?jdbc:mysql://127.0.0.1:3306/db2
??????username:?root
??????password:?root
2、配置DataSource
我們知道,Mybatis執(zhí)行一條SQL語句的時(shí)候,需要先獲取一個(gè)Connection。這時(shí)候,就交由Spring管理器到DataSource中獲取連接。
Spring中有個(gè)具有路由功能的DataSource,它可以通過查找鍵調(diào)用不同的數(shù)據(jù)源,這就是AbstractRoutingDataSource。
public?abstract?class?AbstractRoutingDataSource{
????//數(shù)據(jù)源的集合
????@Nullable
????private?Map可以看到,該抽象類的核心就是先設(shè)置多個(gè)數(shù)據(jù)源到Map集合中,然后根據(jù)Key可以獲取不同的數(shù)據(jù)源。
那么,我們就可以重寫這個(gè)determineCurrentLookupKey方法,它返回的是一個(gè)數(shù)據(jù)源的名稱。
public?class?DynamicDataSource?extends?AbstractRoutingDataSource?{
????@Override
????protected?Object?determineCurrentLookupKey()?{
????????DataSourceType.DataBaseType?dataBaseType?=?DataSourceType.getDataBaseType();
????????return?dataBaseType;
????}
}
然后還需要一個(gè)工具類,來保存當(dāng)前線程的數(shù)據(jù)源類型。
public?class?DataSourceType?{
????public?enum?DataBaseType?{
????????ds1,?ds2
????}
????//?使用ThreadLocal保證線程安全
????private?static?final?ThreadLocal?TYPE?=?new?ThreadLocal();
????//?往當(dāng)前線程里設(shè)置數(shù)據(jù)源類型
????public?static?void?setDataBaseType(DataBaseType?dataBaseType)?{
????????if?(dataBaseType?==?null)?{
????????????throw?new?NullPointerException();
????????}
????????TYPE.set(dataBaseType);
????}
????//?獲取數(shù)據(jù)源類型
????public?static?DataBaseType?getDataBaseType()?{
????????DataBaseType?dataBaseType?=?TYPE.get()?==?null???DataBaseType.ds1?:?TYPE.get();
????????return?dataBaseType;
????}
}
這些都搞定之后,我們還需要把這個(gè)DataSource配置到Spring容器中去。下面這個(gè)配置類的作用如下:
創(chuàng)建多個(gè)數(shù)據(jù)源DataSource,ds1 和 ds2;
將ds1 和 ds2 數(shù)據(jù)源放入動(dòng)態(tài)數(shù)據(jù)源DynamicDataSource;
將DynamicDataSource注入到SqlSessionFactory。
@Configuration
public?class?DataSourceConfig?{
????/**
?????*?創(chuàng)建多個(gè)數(shù)據(jù)源?ds1?和?ds2
?????*?此處的Primary,是設(shè)置一個(gè)Bean的優(yōu)先級(jí)
?????*?@return
?????*/
????@Primary
????@Bean(name?=?"ds1")
????@ConfigurationProperties(prefix?=?"spring.datasource.ds1")
????public?DataSource?getDateSource1()?{
????????return?DataSourceBuilder.create().build();
????}
????@Bean(name?=?"ds2")
????@ConfigurationProperties(prefix?=?"spring.datasource.ds2")
????public?DataSource?getDateSource2()?{
????????return?DataSourceBuilder.create().build();
????}
????/**
?????*?將多個(gè)數(shù)據(jù)源注入到DynamicDataSource
?????*?@param?dataSource1
?????*?@param?dataSource2
?????*?@return
?????*/
????@Bean(name?=?"dynamicDataSource")
????public?DynamicDataSource?DataSource(@Qualifier("ds1")?DataSource?dataSource1,
????????????????????????????????????????@Qualifier("ds2")?DataSource?dataSource2)?{
????????Map3、設(shè)置路由鍵
上面的配置都完成之后,我們還需要想辦法動(dòng)態(tài)的改變數(shù)據(jù)源的鍵值,這個(gè)就跟系統(tǒng)的業(yè)務(wù)相關(guān)了。
比如在這里,我們有兩個(gè)Mapper接口,創(chuàng)建訂單和扣減庫存。
public?interface?OrderMapper?{
????void?createOrder(Order?order);
}
public?interface?StorageMapper?{
????void?decreaseStorage(Order?order);
}
那么,我們就可以搞一個(gè)切面,在執(zhí)行訂單的操作時(shí),切到數(shù)據(jù)源ds1,執(zhí)行庫存操作時(shí),切到數(shù)據(jù)源ds2。
@Component
@Aspect
public?class?DataSourceAop?{
????@Before("execution(*?cn.youyouxunyin.multipledb2.mapper.OrderMapper.*(..))")
????public?void?setDataSource1()?{
????????DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds1);
????}
????@Before("execution(*?cn.youyouxunyin.multipledb2.mapper.StorageMapper.*(..))")
????public?void?setDataSource2()?{
????????DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds2);
????}
}
4、測(cè)試
現(xiàn)在就可以寫一個(gè)Service方法,通過REST接口測(cè)試一下啦。
public?class?OrderServiceImpl?implements?OrderService?{
????@Override
????public?void?createOrder(Order?order)?{
????????storageMapper.decreaseStorage(order);
????????logger.info("庫存已扣減,商品代碼:{},購(gòu)買數(shù)量:{}。創(chuàng)建訂單中...",order.getCommodityCode(),order.getCount());
????????orderMapper.createOrder(order);
????}
}
不出意外的話,業(yè)務(wù)執(zhí)行完成后,兩個(gè)數(shù)據(jù)庫的表都已經(jīng)有了變化。
但此時(shí),我們會(huì)想到,這兩個(gè)操作是需要保證原子性的。所以,我們需要依賴事務(wù),在Service方法上標(biāo)注Transactional。
如果我們?cè)赾reateOrder方法上添加了Transactional注解,然后在運(yùn)行代碼,就會(huì)拋出異常。
###?Cause:?java.sql.SQLSyntaxErrorException:?Table?'db2.t_order'?doesn't?exist
;?bad?SQL?grammar?[];?nested?exception?is?java.sql.SQLSyntaxErrorException:?
????Table?'db2.t_order'?doesn't?exist]?with?root?cause
這就說明,如果加上了 Spring 的事務(wù),我們的數(shù)據(jù)源切換不過去了。這又是咋回事呢?
事務(wù)模式,為啥不能切換數(shù)據(jù)源
要想搞清楚原因,我們就得來分析分析如果加上了Spring事務(wù),它又干了哪些事情呢 ?
我們知道,Spring的自動(dòng)事務(wù)是基于AOP實(shí)現(xiàn)的。在調(diào)用包含事務(wù)的方法時(shí),會(huì)進(jìn)入一個(gè)攔截器。
public?class?TransactionInterceptor{
????public?Object?invoke(MethodInvocation?invocation)?throws?Throwable?{
????????//獲取目標(biāo)類
????????Class>?targetClass?=?AopUtils.getTargetClass(invocation.getThis());
????????//事務(wù)調(diào)用
????????return?invokeWithinTransaction(invocation.getMethod(),?targetClass,?invocation::proceed);
????}
}
1、創(chuàng)建事務(wù)
在這里面呢,首先就是開始創(chuàng)建一個(gè)事務(wù)。
protected?Object?doGetTransaction()?{
????//DataSource的事務(wù)對(duì)象
????DataSourceTransactionObject?txObject?=?new?DataSourceTransactionObject();
????//設(shè)置事務(wù)自動(dòng)保存
????txObject.setSavepointAllowed(isNestedTransactionAllowed());
????//給事務(wù)對(duì)象設(shè)置ConnectionHolder
????ConnectionHolder?conHolder?=?TransactionSynchronizationManager.getResource(obtainDataSource());
????txObject.setConnectionHolder(conHolder,?false);
????return?txObject;
}
在這一步,重點(diǎn)是給事務(wù)對(duì)象設(shè)置了ConnectionHolder屬性,不過此時(shí)還是為空。
2、開啟事務(wù)
接下來,就是開啟一個(gè)事務(wù),這里主要是通過ThreadLocal將資源和當(dāng)前的事務(wù)對(duì)象綁定,然后設(shè)置一些事務(wù)狀態(tài)。
protected?void?doBegin(Object?txObject,?TransactionDefinition?definition)?{
????Connection?con?=?null;
????//從數(shù)據(jù)源中獲取一個(gè)連接
????Connection?newCon?=?obtainDataSource().getConnection();
????//重新設(shè)置事務(wù)對(duì)象中的connectionHolder,此時(shí)已經(jīng)引用了一個(gè)連接
????txObject.setConnectionHolder(new?ConnectionHolder(newCon),?true);
????//將這個(gè)connectionHolder標(biāo)記為與事務(wù)同步
????txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
????con?=?txObject.getConnectionHolder().getConnection();
????con.setAutoCommit(false);
????//激活事務(wù)活動(dòng)狀態(tài)
????txObject.getConnectionHolder().setTransactionActive(true);
????//將connection?holder綁定到當(dāng)前線程,通過threadlocal
????if?(txObject.isNewConnectionHolder())?{
????????TransactionSynchronizationManager.bindResource(obtainDataSource(),?txObject.getConnectionHolder());
????}
????//事務(wù)管理器,激活事務(wù)同步狀態(tài)
????TransactionSynchronizationManager.initSynchronization();
}
3、執(zhí)行Mapper接口
開啟事務(wù)之后,就開始執(zhí)行目標(biāo)類真實(shí)方法。在這里,就會(huì)開始進(jìn)入Mybatis的代理對(duì)象。。哈哈,框架嘛,就各種代理。
我們知道,Mybatis在執(zhí)行SQL的之前,需要先獲取到SqlSession對(duì)象。
public?static?SqlSession?getSqlSession(SqlSessionFactory?sessionFactory,?ExecutorType?executorType,
????????????????PersistenceExceptionTranslator?exceptionTranslator)?{
????//從ThreadLocal中獲取SqlSessionHolder,第一次獲取不到為空
????SqlSessionHolder?holder?=?TransactionSynchronizationManager.getResource(sessionFactory);
????//如果SqlSessionHolder為空,那也肯定獲取不到SqlSession;
????//如果SqlSessionHolder不為空,直接通過它來拿到SqlSession
????SqlSession?session?=?sessionHolder(executorType,?holder);
????if?(session?!=?null)?{
????????return?session;
????}
????//創(chuàng)建一個(gè)新的SqlSession
????session?=?sessionFactory.openSession(executorType);
????//如果當(dāng)前線程的事務(wù)處于激活狀態(tài),就將SqlSessionHolder綁定到ThreadLocal
????registerSessionHolder(sessionFactory,?executorType,?exceptionTranslator,?session);
????return?session;
}
拿到SqlSession之后,就開始調(diào)用Mybatis的執(zhí)行器,準(zhǔn)備執(zhí)行SQL語句。在執(zhí)行SQL之前呢,當(dāng)然需要先拿到Connection連接。
public?Connection?getConnection()?throws?SQLException?{
????//通過數(shù)據(jù)源獲取連接
????//比如我們配置了多數(shù)據(jù)源,此時(shí)還會(huì)正常切換
????if?(this.connection?==?null)?{
????????openConnection();
????}
????return?this.connection;
}
我們看openConnection方法,它的作用就是從數(shù)據(jù)源中獲取一個(gè)Connection連接。如果我們配置了多數(shù)據(jù)源,此時(shí)是可以正常切換的。如果加了事務(wù),之所以沒有切換數(shù)據(jù)源,是因?yàn)榈诙握{(diào)用時(shí),this.connection != null,返回的還是上一次的連接。
這是因?yàn)椋诘诙潍@取SqlSession的時(shí)候,當(dāng)前線程是從ThreadLocal中拿到的,所以不會(huì)重復(fù)獲取Connection連接。
至此,在多數(shù)據(jù)源情況下,如果加了Spring事務(wù),不能動(dòng)態(tài)切換數(shù)據(jù)源的原因,我們應(yīng)該都明白了。
在這里,筆者插播一道面試題:
Spring是如何保證事務(wù)的?
那就是將多個(gè)業(yè)務(wù)操作,放到同一個(gè)數(shù)據(jù)庫連接中,一起提交或回滾。
怎么做到,都在一個(gè)連接中呢?
這里就是各種ThreadlLocal的運(yùn)用,想辦法將數(shù)據(jù)庫資源和當(dāng)前事務(wù)綁定到一起。
事務(wù)模式,怎么支持切換數(shù)據(jù)源
上面我們已經(jīng)把原因搞清楚了,接下來就看怎么支持它動(dòng)態(tài)切換數(shù)據(jù)源。
其他配置都不變的情況下,我們需要?jiǎng)?chuàng)建兩個(gè)不同的sqlSessionFactory。
@Bean(name?=?"sqlSessionFactory1")
public?SqlSessionFactory?sqlSessionFactory1(@Qualifier("ds1")?DataSource?dataSource){
????return?createSqlSessionFactory(dataSource);
}
@Bean(name?=?"sqlSessionFactory2")
public?SqlSessionFactory?sqlSessionFactory2(@Qualifier("ds2")?DataSource?dataSource){
????return?createSqlSessionFactory(dataSource);
}
然后自定義一個(gè)CustomSqlSessionTemplate,來代替Mybatis中原有的sqlSessionTemplate,把上面定義的兩個(gè)SqlSessionFactory注入進(jìn)去。
@Bean(name?=?"sqlSessionTemplate")
public?CustomSqlSessionTemplate?sqlSessionTemplate(){
????Map在定義的CustomSqlSessionTemplate中,其他都一樣,主要看獲取SqlSessionFactory的方法。
public?class?CustomSqlSessionTemplate?extends?SqlSessionTemplate?{
????@Override
????public?SqlSessionFactory?getSqlSessionFactory()?{
????????//當(dāng)前數(shù)據(jù)源的名稱
????????String?currentDsName?=?DataSourceType.getDataBaseType().name();
????????SqlSessionFactory?targetSqlSessionFactory?=?targetSqlSessionFactorys.get(currentDsName);
????????if?(targetSqlSessionFactory?!=?null)?{
????????????return?targetSqlSessionFactory;
????????}?else?if?(defaultTargetSqlSessionFactory?!=?null)?{
????????????return?defaultTargetSqlSessionFactory;
????????}
????????return?this.sqlSessionFactory;
????}
}
在這里,重點(diǎn)就是我們可以根據(jù)不同的數(shù)據(jù)源獲取不同的SqlSessionFactory。如果SqlSessionFactory不一樣,那么在獲取SqlSession的時(shí)候,就不會(huì)在ThreadLocal中拿到,從而每次都是新的SqlSession對(duì)象。
既然SqlSession也不一樣,那么在獲取Connection連接的時(shí)候,每次都會(huì)去動(dòng)態(tài)數(shù)據(jù)源中去獲取。
原理就是這么個(gè)原理,我們來走一把。
修改完配置之后,我們把Service方法加上事務(wù)的注解,此時(shí)數(shù)據(jù)也是可以正常更新的。
@Transactional
@Override
public?void?createOrder(Order?order)?{
????storageMapper.decreaseStorage(order);
????orderMapper.createOrder(order);
}
可以切換數(shù)據(jù)源只是第一步,我們需要的保證可以保證事務(wù)操作。假如在上面的代碼中,庫存扣減完成,但是創(chuàng)建訂單失敗,庫存是不會(huì)回滾的。因?yàn)樗鼈兎謩e屬于不同的數(shù)據(jù)源,根本不是同一個(gè)連接。
XA協(xié)議分布式事務(wù)
要解決上面那個(gè)問題,我們只能考慮XA協(xié)議。
關(guān)于XA協(xié)議是啥,筆者不再過多的描述。我們只需知道,MySQL InnoDB存儲(chǔ)引擎是支持XA事務(wù)的。
那么XA協(xié)議的實(shí)現(xiàn),在Java中叫做Java Transaction Manager,簡(jiǎn)稱JTA。
如何實(shí)現(xiàn)JTA呢?我們借助Atomikos框架,先引入它的依賴。
<dependency>
????<groupId>org.springframework.bootgroupId>
????<artifactId>spring-boot-starter-jta-atomikosartifactId>
????<version>2.2.7.RELEASEversion>
dependency>
然后,只需把DataSource對(duì)象改成AtomikosDataSourceBean。
public?DataSource?getDataSource(Environment?env,?String?prefix,?String?dataSourceName){
????Properties?prop?=?build(env,prefix);
????AtomikosDataSourceBean?ds?=?new?AtomikosDataSourceBean();
????ds.setXaDataSourceClassName(MysqlXADataSource.class.getName());
????ds.setUniqueResourceName(dataSourceName);
????ds.setXaProperties(prop);
????return?ds;
}
這樣配完之后,獲取Connection連接的時(shí)候,拿到的其實(shí)是MysqlXAConnection對(duì)象。在提交或者回滾的時(shí)候,走的就是MySQL的XA協(xié)議了。
public?void?commit(Xid?xid,?boolean?onePhase)?throws?XAException?{
????//封裝?XA?COMMIT?請(qǐng)求
????StringBuilder?commandBuf?=?new?StringBuilder(300);
????commandBuf.append("XA?COMMIT?");
????appendXid(commandBuf,?xid);
????try?{
????????//交給MySQL執(zhí)行XA事務(wù)操作
????????dispatchCommand(commandBuf.toString());
????}?finally?{
????????this.underlyingConnection.setInGlobalTx(false);
????}
}
通過引入Atomikos和修改DataSource,在多數(shù)據(jù)源情況下,即便業(yè)務(wù)操作中間發(fā)生錯(cuò)誤,多個(gè)數(shù)據(jù)庫也是可以正常回滾的。
另外一個(gè)問題,是否應(yīng)該使用XA協(xié)議?
XA協(xié)議看起來看起來比較簡(jiǎn)單,但它也有一些缺點(diǎn)。比如:
性能問題,所有參與者在事務(wù)提交階段處于同步阻塞狀態(tài),占用系統(tǒng)資源,容易導(dǎo)致性能瓶頸,無法滿足高并發(fā)場(chǎng)景;
如果協(xié)調(diào)者存在單點(diǎn)故障問題,如果協(xié)調(diào)者出現(xiàn)故障,參與者將一直處于鎖定狀態(tài);
主從復(fù)制可能產(chǎn)生事務(wù)狀態(tài)不一致。
在MySQL官方文檔中也列舉了一些XA協(xié)議的限制項(xiàng):
https://dev.mysql.com/doc/refman/8.0/en/xa-restrictions.html
另外,筆者在實(shí)際的項(xiàng)目里,其實(shí)也沒有用過,通過這樣的方式來解決分布式事務(wù)問題,此例僅做可行性方案探討。
總結(jié)
本文通過引入SpringBoot+Mybatis的多數(shù)據(jù)源場(chǎng)景,分析了如下問題:
多數(shù)據(jù)源的配置和實(shí)現(xiàn);
Spring事務(wù)模式,多數(shù)據(jù)源不生效的原因和解決方法;
多數(shù)據(jù)源,基于XA協(xié)議的分布式事務(wù)實(shí)現(xiàn)。
由于篇幅有限,本文示例不包含所有的代碼。如有需要,請(qǐng)到GitHub自取。
https://github.com/taoxun/multipledb2.git
后臺(tái)回復(fù)?學(xué)習(xí)資料?領(lǐng)取學(xué)習(xí)視頻
如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝
