搞懂Druid之連接獲取和歸還
共 59404字,需瀏覽 119分鐘
·
2024-07-01 18:51
來源:juejin.cn/post/7201800584310210615
推薦:https://t.zsxq.com/8GXMB
前言
Druid是阿里開源的數(shù)據(jù)庫連接池,是阿里監(jiān)控系統(tǒng)Dragoon的副產(chǎn)品,提供了強(qiáng)大的可監(jiān)控性和基于Filter-Chain的可擴(kuò)展性。
本篇文章將對(duì)Druid數(shù)據(jù)庫連接池的連接獲取,歸還和連接泄漏檢測進(jìn)行分析。分析Druid數(shù)據(jù)庫連接池的源碼前,需要明確幾個(gè)概念。
-
Druid數(shù)據(jù)庫連接池中可用的連接存放在一個(gè)數(shù)組connections中; -
Druid數(shù)據(jù)庫連接池做并發(fā)控制,主要靠一把可重入鎖以及和這把鎖關(guān)聯(lián)的兩個(gè)Condition對(duì)象;
public DruidAbstractDataSource(boolean lockFair) {
lock = new ReentrantLock(lockFair);
notEmpty = lock.newCondition();
empty = lock.newCondition();
}
-
連接池沒有可用連接時(shí),應(yīng)用線程會(huì)在notEmpty上等待,連接池已滿時(shí),生產(chǎn)連接的線程會(huì)在empty上等待; -
對(duì)連接?;?,就是每間隔一定時(shí)間,對(duì)達(dá)到了?;铋g隔周期的連接進(jìn)行有效性校驗(yàn),可以將無效連接銷毀,也可以防止連接長時(shí)間不與數(shù)據(jù)庫服務(wù)端通信。
Druid版本:1.2.11
正文
DruidDataSource連接獲取
DruidDataSource獲取連接的入口方法是DruidDataSource#getConnection方法,實(shí)現(xiàn)如下。
public DruidPooledConnection getConnection() throws SQLException {
// maxWait表示獲取連接時(shí)最大等待時(shí)間,單位毫秒,默認(rèn)值為-1
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
// 首次獲取連接時(shí)觸發(fā)數(shù)據(jù)庫連接池初始化
init();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
// 直接獲取連接
return getConnectionDirect(maxWaitMillis);
}
}
DruidDataSource#getConnection方法會(huì)調(diào)用到DruidDataSource#getConnectionDirect方法來獲取連接,實(shí)現(xiàn)如下所示。
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (; ; ) {
DruidPooledConnection poolableConnection;
try {
// 從連接池拿到連接
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
// 拿連接時(shí)有異常,可以重試
// 重試次數(shù)由notFullTimeoutRetryCount指定
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
}
continue;
}
throw ex;
}
// 如果配置了testOnBorrow = true,那么每次拿到連接后,都需要校驗(yàn)這個(gè)連接的有效性
if (testOnBorrow) {
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
// 如果連接不可用,則銷毀連接,然后重新從池中獲取
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
} else {
if (poolableConnection.conn.isClosed()) {
discardConnection(poolableConnection.holder);
continue;
}
// 如果配置testOnBorrow = fasle但testWhileIdle = true
// 則判斷連接空閑時(shí)間是否大于等于timeBetweenEvictionRunsMillis
// 如果是,則校驗(yàn)連接的有效性
if (testWhileIdle) {
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
// lastActiveTimeMillis是連接最近一次活躍時(shí)間
// 新建連接,歸還連接到連接池,都會(huì)更新這個(gè)時(shí)間
long lastActiveTimeMillis = holder.lastActiveTimeMillis;
// lastExecTimeMillis是連接最近一次執(zhí)行時(shí)間
// 新建連接,設(shè)置連接的事務(wù)是否自動(dòng)提交,記錄SQL到事務(wù)信息中,都會(huì)更新這個(gè)時(shí)間
long lastExecTimeMillis = holder.lastExecTimeMillis;
// lastKeepTimeMillis是連接最近一次保活時(shí)間
// 在連接被?;畈⒎呕剡B接池時(shí),會(huì)更新這個(gè)時(shí)間
long lastKeepTimeMillis = holder.lastKeepTimeMillis;
// 如果配置checkExecuteTime為true,則最近活躍時(shí)間取值為最近執(zhí)行時(shí)間
if (checkExecuteTime
&& lastExecTimeMillis != lastActiveTimeMillis) {
lastActiveTimeMillis = lastExecTimeMillis;
}
// 如果連接最近一次做的操作是?;睿敲醋罱钴S時(shí)間取值為最近?;顣r(shí)間
if (lastKeepTimeMillis > lastActiveTimeMillis) {
lastActiveTimeMillis = lastKeepTimeMillis;
}
// 計(jì)算空閑時(shí)間
long idleMillis = currentTimeMillis - lastActiveTimeMillis;
// testWhileIdle為true時(shí)的判斷時(shí)間間隔
long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;
if (timeBetweenEvictionRunsMillis <= 0) {
// timeBetweenEvictionRunsMillis如果小于等于0,那么重置為60秒
timeBetweenEvictionRunsMillis
= DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
// 如果空閑時(shí)間大于等于timeBetweenEvictionRunsMillis,則執(zhí)行連接的有效性校驗(yàn)
if (idleMillis >= timeBetweenEvictionRunsMillis
|| idleMillis < 0
) {
boolean validate = testConnectionInternal(poolableConnection.holder,
poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
discardConnection(poolableConnection.holder);
continue;
}
}
}
}
// 如果設(shè)置removeAbandoned為true
// 則將連接放到activeConnections活躍連接map中
if (removeAbandoned) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {
activeConnections.put(poolableConnection, PRESENT);
} finally {
activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
DruidDataSource#getConnectionDirect方法中會(huì)先調(diào)用getConnectionInternal() 方法從連接池中拿連接,然后如果開啟了testOnBorrow,則校驗(yàn)一下連接的有效性,如果無效則重新調(diào)用getConnectionInternal() 方法拿連接,直到拿到的連接通過校驗(yàn)。如果沒有開啟testOnBorrow但是開啟了testWhileIdle,則會(huì)判斷連接的空閑時(shí)間是否大于等于timeBetweenEvictionRunsMillis參數(shù),如果滿足則校驗(yàn)一下連接的有效性,若沒有通過校驗(yàn),那么需要重新調(diào)用getConnectionInternal() 方法拿連接,直到拿到的連接通過校驗(yàn)或者連接的空閑時(shí)間小于timeBetweenEvictionRunsMillis。
下面看一下實(shí)際從連接池拿連接的getConnectionInternal() 方法的實(shí)現(xiàn),如下所示。
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
// 省略
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
final int maxWaitThreadCount = this.maxWaitThreadCount;
DruidConnectionHolder holder;
// 在死循環(huán)中從連接池拿連接
// 一開始createDirect為false,表示先從池子中拿
for (boolean createDirect = false; ; ) {
if (createDirect) {
// createDirect為true表示直接創(chuàng)建連接
createStartNanosUpdater.set(this, System.nanoTime());
// creatingCount為0表示當(dāng)前沒有其它連接正在被創(chuàng)建
if (creatingCountUpdater.compareAndSet(this, 0, 1)) {
// 創(chuàng)建物理連接
PhysicalConnectionInfo pyConnInfo
= DruidDataSource.this.createPhysicalConnection();
holder = new DruidConnectionHolder(this, pyConnInfo);
holder.lastActiveTimeMillis = System.currentTimeMillis();
creatingCountUpdater.decrementAndGet(this);
directCreateCountUpdater.incrementAndGet(this);
// 省略
boolean discard;
lock.lock();
try {
// 如果當(dāng)前正在使用的連接數(shù)未達(dá)到最大連接數(shù)
// 則當(dāng)前正在使用的連接數(shù)加1
// 否則銷毀剛剛創(chuàng)建出來的連接
if (activeCount < maxActive) {
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
break;
} else {
discard = true;
}
} finally {
lock.unlock();
}
if (discard) {
JdbcUtils.close(pyConnInfo.getPhysicalConnection());
}
}
}
// 上鎖
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
try {
// maxWaitThreadCount表示允許的最大等待連接的應(yīng)用線程數(shù)
// notEmptyWaitThreadCount表示正在等待連接的應(yīng)用線程數(shù)
// 等待連接的應(yīng)用線程數(shù)達(dá)到最大值時(shí),拋出異常
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}
// 發(fā)生了致命錯(cuò)誤,且設(shè)置了致命錯(cuò)誤數(shù)最大值大于0,且正在使用的連接數(shù)大于等于致命錯(cuò)誤數(shù)最大值
if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {
// 拼接異常并拋出
// 省略
throw new SQLException(
errorMsg.toString(), lastFatalError);
}
connectCount++;
// 如果配置的創(chuàng)建連接的線程池是一個(gè)定時(shí)線程池
// 且連接池已經(jīng)沒有可用連接,
// 且當(dāng)前借出的連接數(shù)未達(dá)到允許的最大連接數(shù)
// 且當(dāng)前沒有其它線程(應(yīng)用線程,創(chuàng)建連接的線程,創(chuàng)建連接的線程池里的線程)在創(chuàng)建連接
// 此時(shí)將createDirect置為true,讓當(dāng)前應(yīng)用線程直接創(chuàng)建連接
if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor
= (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true;
continue;
}
}
if (maxWait > 0) {
// 如果設(shè)置了等待連接的最大等待時(shí)間,則調(diào)用pollLast()方法來拿連接
// pollLast()方法執(zhí)行時(shí)如果池中沒有連接,則應(yīng)用線程會(huì)在notEmpty上最多等待maxWait的時(shí)間
holder = pollLast(nanos);
} else {
// 調(diào)用takeLast()方法拿連接時(shí),如果池中沒有連接,則會(huì)在notEmpty上一直等待,直到池中有連接
holder = takeLast();
}
if (holder != null) {
if (holder.discard) {
continue;
}
// 正在使用的連接數(shù)加1
activeCount++;
holder.active = true;
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
}
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw e;
} finally {
lock.unlock();
}
break;
}
// 如果拿到的連接為null,說明拿連接時(shí)等待超時(shí)了
// 此時(shí)拋出連接超時(shí)異常
if (holder == null) {
// 省略
final Throwable createError;
try {
lock.lock();
// 省略
createError = this.createError;
} finally {
lock.unlock();
}
// 省略
if (createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
}
holder.incrementUseCount();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
return poolalbeConnection;
}
getConnectionInternal() 方法中拿到連接的方式有三種,如下所示。
-
直接創(chuàng)建連接。需要滿足配置的創(chuàng)建連接的線程池是一個(gè)定時(shí)線程池,且連接池已經(jīng)沒有可用連接,且當(dāng)前借出的連接數(shù)未達(dá)到允許的最大連接數(shù),且當(dāng)前沒有其它線程在創(chuàng)建連接; -
從池中拿連接,并最多等待maxWait的時(shí)間。需要設(shè)置了maxWait; -
從池中拿連接,并一直等待直到拿到連接。
下面最后看一下超時(shí)等待拿連接的DruidDataSource#pollLast方法的實(shí)現(xiàn)。
private DruidConnectionHolder pollLast(long nanos)
throws InterruptedException, SQLException {
long estimate = nanos;
for (; ; ) {
if (poolingCount == 0) {
// 如果池中已經(jīng)沒有連接,則喚醒在empty上等待的創(chuàng)建連接線程來創(chuàng)建連接
emptySignal();
if (failFast && isFailContinuous()) {
throw new DataSourceNotAvailableException(createError);
}
// 等待時(shí)間耗盡,返回null
if (estimate <= 0) {
waitNanosLocal.set(nanos - estimate);
return null;
}
// 應(yīng)用線程即將在下面的notEmpty上等待
// 這里先把等待獲取連接的應(yīng)用線程數(shù)加1
notEmptyWaitThreadCount++;
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
long startEstimate = estimate;
// 應(yīng)用線程在notEmpty上等待
// 有連接被創(chuàng)建或者被歸還時(shí),會(huì)喚醒在notEmpty上等待的應(yīng)用線程
estimate = notEmpty.awaitNanos(estimate);
notEmptyWaitCount++;
notEmptyWaitNanos += (startEstimate - estimate);
if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (disableException != null) {
throw disableException;
}
throw new DataSourceDisableException();
}
} catch (InterruptedException ie) {
notEmpty.signal();
notEmptySignalCount++;
throw ie;
} finally {
notEmptyWaitThreadCount--;
}
if (poolingCount == 0) {
if (estimate > 0) {
// 若喚醒后池中還是沒有連接,且此時(shí)等待時(shí)間還有剩余
// 則重新在notEmpty上等待
continue;
}
waitNanosLocal.set(nanos - estimate);
return null;
}
}
// poolingCount--
decrementPoolingCount();
// 從池中拿到連接
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);
return last;
}
}
二. DruidDataSource連接歸還
Druid 數(shù)據(jù)庫連接池中,每一個(gè)物理連接都會(huì)被包裝成DruidConnectionHolder,在提供給應(yīng)用線程前,還會(huì)將DruidConnectionHolder包裝成DruidPooledConnection,類圖如下所示。
應(yīng)用線程中使用連接完畢后,會(huì)調(diào)用DruidPooledConnection的close() 方法來歸還連接,也就是將連接放回連接池。DruidPooledConnection#close方法如下所示。
public void close() throws SQLException {
if (this.disable) {
return;
}
DruidConnectionHolder holder = this.holder;
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
DruidAbstractDataSource dataSource = holder.getDataSource();
// 判斷歸還連接的線程和獲取連接的線程是否是同一個(gè)線程
boolean isSameThread = this.getOwnerThread() == Thread.currentThread();
// 如果不是同一個(gè)線程,則設(shè)置asyncCloseConnectionEnable為true
if (!isSameThread) {
dataSource.setAsyncCloseConnectionEnable(true);
}
// 如果開啟了removeAbandoned機(jī)制
// 或者asyncCloseConnectionEnable為true
// 則調(diào)用syncClose()方法來歸還連接
// syncClose()方法中會(huì)先加鎖,然后調(diào)用recycle()方法來回收連接
if (dataSource.isAsyncCloseConnectionEnable()) {
syncClose();
return;
}
if (!CLOSING_UPDATER.compareAndSet(this, 0, 1)) {
return;
}
try {
for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
listener.connectionClosed(new ConnectionEvent(this));
}
List<Filter> filters = dataSource.getProxyFilters();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(dataSource);
filterChain.dataSource_recycle(this);
} else {
// 回收連接
recycle();
}
} finally {
CLOSING_UPDATER.set(this, 0);
}
this.disable = true;
}
在DruidPooledConnection#close方法中,會(huì)先判斷本次歸還連接的線程和獲取連接的線程是否是同一個(gè)線程,如果不是,則先加鎖然后再調(diào)用recycle() 方法來回收連接,如果是則直接調(diào)用recycle() 方法來回收連接。當(dāng)開啟了removeAbandoned機(jī)制時(shí),就可能會(huì)出現(xiàn)歸還連接的線程和獲取連接的線程不是同一個(gè)線程的情況,這是因?yàn)橐坏╅_啟了removeAbandoned機(jī)制,那么每一個(gè)被借出的連接都會(huì)被放到activeConnections活躍連接map中,并且在銷毀連接的線程DestroyConnectionThread 中會(huì)每間隔timeBetweenEvictionRunsMillis的時(shí)間就遍歷一次activeConnections活躍連接map,一旦有活躍連接被借出的時(shí)間大于了removeAbandonedTimeoutMillis,那么銷毀連接的線程DestroyConnectionThread就會(huì)主動(dòng)去回收這個(gè)連接,以防止連接泄漏。
下面看一下DruidPooledConnection#recycle方法的實(shí)現(xiàn)。
public void recycle() throws SQLException {
if (this.disable) {
return;
}
DruidConnectionHolder holder = this.holder;
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
if (!this.abandoned) {
DruidAbstractDataSource dataSource = holder.getDataSource();
// 調(diào)用DruidAbstractDataSource#recycle回收當(dāng)前連接
dataSource.recycle(this);
}
this.holder = null;
conn = null;
transactionInfo = null;
closed = true;
}
在DruidPooledConnection#recycle方法中會(huì)調(diào)用到DruidDataSource#recycle方法來回收連接。DruidDataSource#recycle方法實(shí)現(xiàn)如下所示。
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
final DruidConnectionHolder holder = pooledConnection.holder;
// 省略
final boolean isAutoCommit = holder.underlyingAutoCommit;
final boolean isReadOnly = holder.underlyingReadOnly;
final boolean testOnReturn = this.testOnReturn;
try {
// 如果是非自動(dòng)提交且存在事務(wù)
// 則回滾事務(wù)
if ((!isAutoCommit) && (!isReadOnly)) {
pooledConnection.rollback();
}
// 重置連接信息(配置還原為默認(rèn)值,關(guān)閉Statement,清除連接的Warnings等)
boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();
if (!isSameThread) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
holder.reset();
} finally {
lock.unlock();
}
} else {
holder.reset();
}
// 省略
// 開啟了testOnReturn機(jī)制,則校驗(yàn)連接有效性
if (testOnReturn) {
boolean validate = testConnectionInternal(holder, physicalConnection);
// 校驗(yàn)不通過則關(guān)閉物理連接
if (!validate) {
JdbcUtils.close(physicalConnection);
destroyCountUpdater.incrementAndGet(this);
lock.lock();
try {
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
} finally {
lock.unlock();
}
return;
}
}
// 省略
lock.lock();
try {
// 連接即將放回連接池,需要將active設(shè)置為false
if (holder.active) {
activeCount--;
holder.active = false;
}
closeCount++;
// 將連接放到connections數(shù)組的poolingCount位置
// 然后poolingCount加1
// 然后喚醒在notEmpty上等待連接的一個(gè)應(yīng)用線程
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}
if (!result) {
JdbcUtils.close(holder.conn);
LOG.info("connection recyle failed.");
}
} catch (Throwable e) {
// 省略
}
}
DruidDataSource#recycle方法中會(huì)先重置連接信息,即將連接的一些配置重置為默認(rèn)值,然后關(guān)閉連接的Statement和Warnings,如果開啟了testOnReturn機(jī)制,則還需要校驗(yàn)一下連接的有效性,校驗(yàn)不通過則直接關(guān)閉物理連接,最后,將連接放回到connections數(shù)組的poolingCount位置,然后喚醒一個(gè)在notEmpty上等待連接的應(yīng)用線程。
removeAbandoned機(jī)制
Druid數(shù)據(jù)庫連接池提供了removeAbandoned機(jī)制來防止連接泄漏。要開啟removeAbandoned機(jī)制,需要設(shè)置如下參數(shù)。
| 參數(shù) | 說明 |
|---|---|
| removeAbandoned | 發(fā)生連接泄漏時(shí),是否需要回收泄漏的連接。默認(rèn)為false,表示不回收。 |
| removeAbandonedTimeoutMillis | 判斷發(fā)生連接泄漏的超時(shí)時(shí)間。默認(rèn)為300秒。 |
| logAbandoned | 是否打印堆棧。默認(rèn)為false,包是不打印。 |
下面將對(duì)開啟removeAbandoned機(jī)制后,如何回收發(fā)生了泄漏的連接進(jìn)行說明。當(dāng)應(yīng)用線程從連接池獲取到一個(gè)連接后,如果開啟了removeAbandoned機(jī)制,那么會(huì)將這個(gè)連接放到activeConnections活躍連接map中,對(duì)應(yīng)的方法為DruidDataSource#getConnectionDirect,源碼片段如下所示。
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (; ; ) {
DruidPooledConnection poolableConnection;
// 省略
if (removeAbandoned) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
// 設(shè)置connectedTimeNano,用于后續(xù)判斷連接借出時(shí)間是否大于removeAbandonedTimeoutMillis
poolableConnection.setConnectedTimeNano();
poolableConnection.traceEnable = true;
activeConnectionLock.lock();
try {
// 將從連接池獲取到的連接放到activeConnections中
activeConnections.put(poolableConnection, PRESENT);
} finally {
activeConnectionLock.unlock();
}
}
if (!this.defaultAutoCommit) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
又已知Druid數(shù)據(jù)庫連接池有一個(gè)銷毀連接的線程會(huì)每間隔timeBetweenEvictionRunsMillis執(zhí)行一次DestroyTask#run方法來銷毀連接,DestroyTask#run方法如下所示。
public void run() {
shrink(true, keepAlive);
// 如果開啟了removeAbandoned機(jī)制
// 則執(zhí)行removeAbandoned()方法來檢測發(fā)生了泄漏的連接并回收
if (isRemoveAbandoned()) {
removeAbandoned();
}
}
DestroyTask#run方法的最后會(huì)判斷是否開啟了removeAbandoned機(jī)制,如果開啟了則會(huì)執(zhí)行DruidDataSource#removeAbandoned方法來檢測哪些連接發(fā)生了泄漏,并主動(dòng)回收這些連接。DruidDataSource#removeAbandoned方法如下所示。
public int removeAbandoned() {
int removeCount = 0;
long currrentNanos = System.nanoTime();
List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();
activeConnectionLock.lock();
try {
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext(); ) {
DruidPooledConnection pooledConnection = iter.next();
// 運(yùn)行中的連接不會(huì)被判定為發(fā)生了泄漏
if (pooledConnection.isRunning()) {
continue;
}
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
// 判斷連接借出時(shí)間是否達(dá)到連接泄漏的超時(shí)時(shí)間
if (timeMillis >= removeAbandonedTimeoutMillis) {
// 將發(fā)生了泄漏的連接從activeConnections中移除
iter.remove();
pooledConnection.setTraceEnable(false);
// 將發(fā)生了泄露的連接添加到abandonedList集合中
abandonedList.add(pooledConnection);
}
}
} finally {
activeConnectionLock.unlock();
}
if (abandonedList.size() > 0) {
// 遍歷abandonedList集合
// 主動(dòng)調(diào)用每個(gè)發(fā)生了泄漏的DruidPooledConnection的close()方法來回收連接
for (DruidPooledConnection pooledConnection : abandonedList) {
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
if (pooledConnection.isDisable()) {
continue;
}
} finally {
lock.unlock();
}
JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;
// 省略
}
}
return removeCount;
}
DruidDataSource#removeAbandoned方法中主要完成的事情就是將每個(gè)發(fā)生了泄漏的連接從activeConnections中移動(dòng)到abandonedList中,然后遍歷abandonedList中的每個(gè)連接并調(diào)用DruidPooledConnection#close方法,最終完成泄漏連接的回收。
總結(jié)
Druid數(shù)據(jù)庫連接池中,應(yīng)用線程向連接池獲取連接時(shí),如果池中沒有連接,則應(yīng)用線程會(huì)在notEmpty上等待,同時(shí)Druid數(shù)據(jù)庫連接池中有一個(gè)創(chuàng)建連接的線程,會(huì)持續(xù)的向連接池創(chuàng)建連接,如果連接池已滿,則創(chuàng)建連接的線程會(huì)在empty上等待。
當(dāng)有連接被生產(chǎn),或者有連接被歸還,會(huì)喚醒在notEmpty上等待的應(yīng)用線程,同理有連接被銷毀時(shí),會(huì)喚醒在empty上等待的生產(chǎn)連接的線程。
Druid數(shù)據(jù)庫連接池中還有一個(gè)銷毀連接的線程,會(huì)每間隔timeBetweenEvictionRunsMillis的時(shí)間執(zhí)行一次DestroyTask任務(wù)來銷毀連接,這些被銷毀的連接可以是存活時(shí)間達(dá)到最大值的連接,也可以是空閑時(shí)間達(dá)到指定值的連接。如果還開啟了?;顧C(jī)制,那么空閑時(shí)間大于keepAliveBetweenTimeMillis的連接都會(huì)被校驗(yàn)一次有效性,校驗(yàn)不通過的連接會(huì)被銷毀。
最后,Druid數(shù)據(jù)庫連接池提供了removeAbandoned機(jī)制來防止連接泄漏,當(dāng)開啟了removeAbandoned機(jī)制時(shí),每一個(gè)被應(yīng)用線程獲取的連接都會(huì)被添加到activeConnections活躍連接map中,如果這個(gè)連接在應(yīng)用線程中使用完畢后沒有被關(guān)閉,那么Druid數(shù)據(jù)庫連接池會(huì)從activeConnections中將其識(shí)別出來并主動(dòng)回收。
