SpringBoot 多數(shù)據(jù)源及事務(wù)解決方案
程序員的成長之路互聯(lián)網(wǎng)/程序員/技術(shù)/資料共享?
關(guān)注
閱讀本文大概需要 10?分鐘。
來自: blog.csdn.net/qq381332153/article/ details/126541276
1. 背景
一個主庫和N個應(yīng)用庫的數(shù)據(jù)源,并且會同時操作主庫和應(yīng)用庫的數(shù)據(jù),需要解決以下兩個問題:- 如何動態(tài)管理多個數(shù)據(jù)源以及切換?
- 如何保證多數(shù)據(jù)源場景下的數(shù)據(jù)一致性(事務(wù))?
2. 數(shù)據(jù)源切換原理
通過擴展Spring提供的抽象類AbstractRoutingDataSource,可以實現(xiàn)切換數(shù)據(jù)源。其類結(jié)構(gòu)如下圖所示:
- targetDataSources&defaultTargetDataSource
- resolvedDataSources&resolvedDefaultDataSource
AbstractRoutingDataSource對象時,通過調(diào)用afterPropertiesSet復(fù)制上述目標(biāo)數(shù)據(jù)源。由此可見,一旦數(shù)據(jù)源實例對象創(chuàng)建完畢,業(yè)務(wù)無法再添加新的數(shù)據(jù)源。- determineCurrentLookupKey
Map<Object, DataSource>其key為lookup key。我們來看官方對這個方法的注釋:
lookup key通常是綁定在線程上下文中,根據(jù)這個key去resolvedDataSources中取出DataSource。根據(jù)目標(biāo)數(shù)據(jù)源的管理方式不同,可以使用基于配置文件和數(shù)據(jù)庫表兩種方式。基于配置文件管理方案無法后續(xù)添加新的數(shù)據(jù)源,而基于數(shù)據(jù)庫表方案管理,則更加靈活。3. 配置文件解決方案
根據(jù)上面的分析,我們可以按照下面的步驟去實現(xiàn):-
定義
DynamicDataSource類繼承AbstractRoutingDataSource,重寫determineCurrentLookupKey()方法。 -
配置多個數(shù)據(jù)源注入
targetDataSources和defaultTargetDataSource,通過afterPropertiesSet()方法將數(shù)據(jù)源寫入resolvedDataSources和resolvedDefaultDataSource。 -
調(diào)用
AbstractRoutingDataSource的getConnection()方法時,determineTargetDataSource()方法返回DataSource執(zhí)行底層的getConnection()。

3.1 創(chuàng)建數(shù)據(jù)源
DynamicDataSource數(shù)據(jù)源的注入,目前業(yè)界主流實現(xiàn)步驟如下:在配置文件中定義數(shù)據(jù)源
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driverClassName=com.mysql.jdbc.Driver
#?主數(shù)據(jù)源
spring.datasource.druid.master.url=jdbcUrl
spring.datasource.druid.master.username=***
spring.datasource.druid.master.password=***
#?其他數(shù)據(jù)源
spring.datasource.druid.second.url=jdbcUrl
spring.datasource.druid.second.username=***
spring.datasource.druid.second.password=***
@Configuration
public?class?DynamicDataSourceConfig?{
????@Bean
????@ConfigurationProperties("spring.datasource.druid.master")
????public?DataSource?firstDataSource(){
????????return?DruidDataSourceBuilder.create().build();
????}
?
????@Bean
????@ConfigurationProperties("spring.datasource.druid.second")
????public?DataSource?secondDataSource(){
????????return?DruidDataSourceBuilder.create().build();
????}
?
????@Bean
????@Primary
????public?DynamicDataSource?dataSource(DataSource?firstDataSource,?DataSource?secondDataSource)?{
????????Map<Object,?Object>?targetDataSources?=?new?HashMap<>(5);
????????targetDataSources.put(DataSourceNames.FIRST,?firstDataSource);
????????targetDataSources.put(DataSourceNames.SECOND,?secondDataSource);
????????return?new?DynamicDataSource(firstDataSource,?targetDataSources);
????}
}
3.2 AOP處理
通過DataSourceAspect切面技術(shù)來簡化業(yè)務(wù)上的使用,只需要在業(yè)務(wù)方法添加@SwitchDataSource注解即可完成動態(tài)切換:
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public?@interface?SwitchDataSource?{
????String?value();
}
DataSourceAspect攔截業(yè)務(wù)方法,更新當(dāng)前線程上下文DataSourceContextHolder中存儲的key,即可實現(xiàn)數(shù)據(jù)源切換。3.3 方案不足
基于AbstractRoutingDataSource的多數(shù)據(jù)源動態(tài)切換,有個明顯的缺點,無法動態(tài)添加和刪除數(shù)據(jù)源。在我們的產(chǎn)品中,不能把應(yīng)用數(shù)據(jù)源寫死在配置文件。接下來分享一下基于數(shù)據(jù)庫表的實現(xiàn)方案。4. 數(shù)據(jù)庫表解決方案
我們需要實現(xiàn)可視化的數(shù)據(jù)源管理,并實時查看數(shù)據(jù)源的運行狀態(tài)。所以我們不能把數(shù)據(jù)源全部配置在文件中,應(yīng)該將數(shù)據(jù)源定義保存到數(shù)據(jù)庫表。參考AbstractRoutingDataSource的設(shè)計思路,實現(xiàn)自定義數(shù)據(jù)源管理。4.1 設(shè)計數(shù)據(jù)源表
主庫的數(shù)據(jù)源信息仍然配置在項目配置文件中,應(yīng)用庫數(shù)據(jù)源配置參數(shù),則設(shè)計對應(yīng)的數(shù)據(jù)表。表結(jié)構(gòu)如下所示:
這個表主要就是DataSource的相關(guān)配置參數(shù),其相應(yīng)的ORM操作代碼在此不再贅述,主要是實現(xiàn)數(shù)據(jù)源的增刪改查操作。4.2 自定義數(shù)據(jù)源管理
4.2.1 定義管理接口
通過繼承AbstractDataSource即可實現(xiàn)DynamicDataSource。為了方便對數(shù)據(jù)源進行操作,我們定義一個接口DataSourceManager,為業(yè)務(wù)提供操作數(shù)據(jù)源的統(tǒng)一接口。
public?interface?DataSourceManager?{
????void?put(String?var1,?DataSource?var2);
?
????DataSource?get(String?var1);
?
????Boolean?hasDataSource(String?var1);
?
????void?remove(String?var1);
?
????void?closeDataSource(String?var1);
?
????Collection<DataSource>?all();
}
4.2.2 自定義數(shù)據(jù)源
DynamicDataSource的實現(xiàn)如下圖所示:
根據(jù)前面的分析,AbstractRoutingDataSource是在容器啟動的時候,執(zhí)行afterPropertiesSet注入數(shù)據(jù)源對象,完成之后無法對數(shù)據(jù)源進行修改。DynamicDataSource則實現(xiàn)DataSourceManager接口,可以將數(shù)據(jù)表中的數(shù)據(jù)源加載到dataSources。4.2.3 切面處理
這一塊的處理跟配置文件數(shù)據(jù)源方案處理方式相同,都是通過AOP技術(shù)切換lookup key。
public?DataSource?determineTargetDataSource()?{
????????String?lookupKey?=?DataSourceContextHolder.getKey();
????????DataSource?dataSource?=?Optional.ofNullable(lookupKey)
????????????????.map(dataSources::get)
????????????????.orElse(defaultDataSource);
????????if?(dataSource?==?null)?{
????????????throw?new?IllegalStateException("Cannot?determine?DataSource?for?lookup?key?["?+?lookupKey?+?"]");
????????}
????????return?dataSource;
????}
4.2.4 管理數(shù)據(jù)源狀態(tài)
在項目啟動的時候,加載數(shù)據(jù)表中的所有數(shù)據(jù)源,并執(zhí)行初始化。初始化操作主要是使用SpringBoot提供的DataSourceBuilder類,根據(jù)數(shù)據(jù)源表的定義創(chuàng)建DataSource。在項目運行過程中,可以使用定時任務(wù)對數(shù)據(jù)源進行保活,為了提升性能再添加一層緩存。
AbstractRoutingDataSource?只支持單庫事務(wù),切換數(shù)據(jù)源是在開啟事務(wù)之前執(zhí)行。Spring使用?DataSourceTransactionManager進行事務(wù)管理。開啟事務(wù),會將數(shù)據(jù)源緩存到DataSourceTransactionObject對象中,后續(xù)的commit和 rollback事務(wù)操作實際上是使用的同一個數(shù)據(jù)源。如何解決切庫事務(wù)問題?借助Spring的聲明式事務(wù)處理,我們可以在多次切庫操作時強制開啟新的事務(wù):
@SwitchDataSource????
@Transactional(rollbackFor?=?Exception.class,?propagation?=?Propagation.REQUIRES_NEW)
假若ServiceB正常執(zhí)行提交事務(wù),接著返回ServiceA執(zhí)行并且發(fā)生異常。因為兩次處理是不同的事務(wù),ServiceA這個事務(wù)執(zhí)行回滾,而ServiceA事務(wù)已經(jīng)提交。這樣的話,數(shù)據(jù)就不一致了。接下來,我們主要討論如何解決多庫的事務(wù)問題。6. 多庫事務(wù)處理
6.1 關(guān)于事務(wù)的理解
首先有必要理解事務(wù)的本質(zhì)。1.提到Spring事務(wù),就離不開事務(wù)的四大特性和隔離級別、七大傳播特性。事務(wù)特性和離級別是屬于數(shù)據(jù)庫范疇。Spring事務(wù)的七大傳播特性是什么呢?它是Spring在當(dāng)前線程內(nèi),處理多個事務(wù)操作時的事務(wù)應(yīng)用策略,數(shù)據(jù)庫事務(wù)本身并不存在傳播特性。
2.Spring事務(wù)的定義包括:begin、commit、rollback、close、suspend、resume等動作。-
begin(事務(wù)開始):?可以認(rèn)為存在于數(shù)據(jù)庫的命令中,比如Mysql的
start transaction命令,但是在JDBC編程方式中不存在。 -
close(事務(wù)關(guān)閉):?Spring事務(wù)的close()方法,是把
Connection對象歸還給數(shù)據(jù)庫連接池,與事務(wù)無關(guān)。 -
suspend(事務(wù)掛起):?Spring中事務(wù)掛起的語義是:需要新事務(wù)時,將現(xiàn)有的
Connection保存起來(還有尚未提交的事務(wù)),然后創(chuàng)建新的Connection2,Connection2提交、回滾、關(guān)閉完畢后,再把Connection1取出來繼續(xù)執(zhí)行。 - resume(事務(wù)恢復(fù)):?嵌套事務(wù)執(zhí)行完畢,返回上層事務(wù)重新綁定連接對象到事務(wù)管理器的過程。
setAutoCommit()、commit()、rollback()。close()語義為:- 關(guān)閉一個數(shù)據(jù)庫連接,這已經(jīng)不再是事務(wù)的方法了。
6.2 自定義管理事務(wù)
為了保證在多個數(shù)據(jù)源中事務(wù)的一致性,我們可以手動管理Connetion的事務(wù)提交和回滾。考慮到不同ORM框架的事務(wù)管理實現(xiàn)差異,要求實現(xiàn)自定義事務(wù)管理不影響框架層的事務(wù)。這可以通過使用裝飾器設(shè)計模式,對Connection進行包裝重寫commit和rolllback屏蔽其默認(rèn)行為,這樣就不會影響到原生Connection和ORM框架的默認(rèn)事務(wù)行為。其整體思路如下圖所示:
這里并沒有使用前面提到的@SwitchDataSource,這是因為我們在TransactionAop中已經(jīng)執(zhí)行了lookupKey的切換。6.2.1 定義多事務(wù)注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public?@interface?MultiTransaction?{
????String?transactionManager()?default?"multiTransactionManager";
????//?默認(rèn)數(shù)據(jù)隔離級別,隨數(shù)據(jù)庫本身默認(rèn)值
????IsolationLevel?isolationLevel()?default?IsolationLevel.DEFAULT;
????//?默認(rèn)為主庫數(shù)據(jù)源
????String?datasourceId()?default?"default";
????//?只讀事務(wù),若有更新操作會拋出異常
????boolean?readOnly()?default?false;
datasourceId指定事務(wù)用到的數(shù)據(jù)源,不指定默認(rèn)為主庫。6.2.3 包裝Connection
自定義事務(wù)我們使用包裝過的Connection,屏蔽其中的commit&rollback方法。這樣我們就可以在主事務(wù)里進行統(tǒng)一的事務(wù)提交和回滾操作。
public?class?ConnectionProxy?implements?Connection?{
?
????private?final?Connection?connection;
?
????public?ConnectionProxy(Connection?connection)?{
????????this.connection?=?connection;
????}
?
????@Override
????public?void?commit()?throws?SQLException?{
????????//?connection.commit();
????}
?
????public?void?realCommit()?throws?SQLException?{
????????connection.commit();
????}
?
????@Override
????public?void?close()?throws?SQLException?{
????????//connection.close();
????}
?
????public?void?realClose()?throws?SQLException?{
????????if?(!connection.getAutoCommit())?{
????????????connection.setAutoCommit(true);
????????}
????????connection.close();
????}
?
????@Override
????public?void?rollback()?throws?SQLException?{
????????if(!connection.isClosed())
????????????connection.rollback();
????}
????...
}
commit&close方法不執(zhí)行操作,rollback執(zhí)行的前提是連接執(zhí)行close才生效。這樣不管是使用哪個ORM框架,其自身事務(wù)管理都將失效。事務(wù)的控制就交由MultiTransaction控制了。6.2.4 事務(wù)上下文管理
public?class?TransactionHolder?{
????//?是否開啟了一個MultiTransaction
????private?boolean?isOpen;
????//?是否只讀事務(wù)
????private?boolean?readOnly;
????//?事務(wù)隔離級別
????private?IsolationLevel?isolationLevel;
????//?維護當(dāng)前線程事務(wù)ID和連接關(guān)系
????private?ConcurrentHashMap<String,?ConnectionProxy>?connectionMap;
????//?事務(wù)執(zhí)行棧
????private?Stack<String>?executeStack;
????//?數(shù)據(jù)源切換棧
????private?Stack<String>?datasourceKeyStack;
????//?主事務(wù)ID
????private?String?mainTransactionId;
????//?執(zhí)行次數(shù)
????private?AtomicInteger?transCount;
?
????//?事務(wù)和數(shù)據(jù)源key關(guān)系
????private?ConcurrentHashMap<String,?String>?executeIdDatasourceKeyMap;
?
}
ConnectionProxy。事務(wù)嵌套調(diào)用,保存事務(wù)ID和lookupKey至棧中,當(dāng)內(nèi)層事務(wù)執(zhí)行完畢執(zhí)行pop。這樣的話,外層事務(wù)只需在棧中執(zhí)行peek即可獲取事務(wù)ID和lookupKey。6.2.5 數(shù)據(jù)源兼容處理
為了不影響原生事務(wù)的使用,需要重寫getConnection方法。當(dāng)前線程沒有啟動自定義事務(wù),則直接從數(shù)據(jù)源中返回連接。
@Override
????public?Connection?getConnection()?throws?SQLException?{
????????TransactionHolder?transactionHolder?=?MultiTransactionManager.TRANSACTION_HOLDER_THREAD_LOCAL.get();
????????if?(Objects.isNull(transactionHolder))?{
????????????return?determineTargetDataSource().getConnection();
????????}
????????ConnectionProxy?ConnectionProxy?=?transactionHolder.getConnectionMap()
????????????????.get(transactionHolder.getExecuteStack().peek());
????????if?(ConnectionProxy?==?null)?{
????????????//?沒開跨庫事務(wù),直接返回
????????????return?determineTargetDataSource().getConnection();
????????}?else?{
????????????transactionHolder.addCount();
????????????//?開了跨庫事務(wù),從當(dāng)前線程中拿包裝過的Connection
????????????return?ConnectionProxy;
????????}
????}
6.2.6 切面處理
切面處理的核心邏輯是:維護一個嵌套事務(wù)棧,當(dāng)業(yè)務(wù)方法執(zhí)行結(jié)束,或者發(fā)生異常時,判斷當(dāng)前棧頂事務(wù)ID是否為主事務(wù)ID。如果是的話這時候已經(jīng)到了最外層事務(wù),這時才執(zhí)行提交和回滾。詳細(xì)流程如下圖所示:
package?com.github.mtxn.transaction.aop;
?
import?com.github.mtxn.application.Application;
import?com.github.mtxn.transaction.MultiTransactionManager;
import?com.github.mtxn.transaction.annotation.MultiTransaction;
import?com.github.mtxn.transaction.context.DataSourceContextHolder;
import?com.github.mtxn.transaction.support.IsolationLevel;
import?com.github.mtxn.transaction.support.TransactionHolder;
import?com.github.mtxn.utils.ExceptionUtils;
import?lombok.extern.slf4j.Slf4j;
import?org.aspectj.lang.ProceedingJoinPoint;
import?org.aspectj.lang.annotation.Around;
import?org.aspectj.lang.annotation.Aspect;
import?org.aspectj.lang.annotation.Pointcut;
import?org.aspectj.lang.reflect.MethodSignature;
import?org.springframework.core.annotation.Order;
import?org.springframework.stereotype.Component;
?
import?java.lang.reflect.Method;
?
?
@Aspect
@Component
@Slf4j
@Order(99999)
public?class?MultiTransactionAop?{
?
????@Pointcut("@annotation(com.github.mtxn.transaction.annotation.MultiTransaction)")
????public?void?pointcut()?{
????????if?(log.isDebugEnabled())?{
????????????log.debug("start?in?transaction?pointcut...");
????????}
????}
?
?
????@Around("pointcut()")
????public?Object?aroundTransaction(ProceedingJoinPoint?point)?throws?Throwable?{
????????MethodSignature?signature?=?(MethodSignature)?point.getSignature();
????????//?從切面中獲取當(dāng)前方法
????????Method?method?=?signature.getMethod();
????????MultiTransaction?multiTransaction?=?method.getAnnotation(MultiTransaction.class);
????????if?(multiTransaction?==?null)?{
????????????return?point.proceed();
????????}
????????IsolationLevel?isolationLevel?=?multiTransaction.isolationLevel();
????????boolean?readOnly?=?multiTransaction.readOnly();
????????String?prevKey?=?DataSourceContextHolder.getKey();
????????MultiTransactionManager?multiTransactionManager?=?Application.resolve(multiTransaction.transactionManager());
????????//?切數(shù)據(jù)源,如果失敗使用默認(rèn)庫
????????if?(multiTransactionManager.switchDataSource(point,?signature,?multiTransaction))?return?point.proceed();
????????//?開啟事務(wù)棧
????????TransactionHolder?transactionHolder?=?multiTransactionManager.startTransaction(prevKey,?isolationLevel,?readOnly,?multiTransactionManager);
????????Object?proceed;
?
????????try?{
????????????proceed?=?point.proceed();
????????????multiTransactionManager.commit();
????????}?catch?(Throwable?ex)?{
????????????log.error("execute?method:{}#{},err:",?method.getDeclaringClass(),?method.getName(),?ex);
????????????multiTransactionManager.rollback();
????????????throw?ExceptionUtils.api(ex,?"系統(tǒng)異常:%s",?ex.getMessage());
????????}?finally?{
????????????//?當(dāng)前事務(wù)結(jié)束出棧
????????????String?transId?=?multiTransactionManager.getTrans().getExecuteStack().pop();
????????????transactionHolder.getDatasourceKeyStack().pop();
????????????//?恢復(fù)上一層事務(wù)
????????????DataSourceContextHolder.setKey(transactionHolder.getDatasourceKeyStack().peek());
????????????//?最后回到主事務(wù),關(guān)閉此次事務(wù)
????????????multiTransactionManager.close(transId);
????????}
????????return?proceed;
?
????}
?
?
}
7.總結(jié)
本文主要介紹了多數(shù)據(jù)源管理的解決方案(應(yīng)用層事務(wù),而非XA二段提交保證),以及對多個庫同時操作的事務(wù)管理。需要注意的是,這種方式只適用于單體架構(gòu)的應(yīng)用。因為多個庫的事務(wù)參與者都是運行在同一個JVM進行。如果是在微服務(wù)架構(gòu)的應(yīng)用中,則需要使用分布式事務(wù)管理(譬如:Seata)。<END>
推薦閱讀:
船新 IDEA 2023.1 正式發(fā)布,新特性真香!
互聯(lián)網(wǎng)初中高級大廠面試題(9個G)
內(nèi)容包含Java基礎(chǔ)、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊列、高性能緩存、反射、Spring全家桶原理、微服務(wù)、Zookeeper......等技術(shù)棧!
?戳閱讀原文領(lǐng)取!
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??朕已閱?
![]()
