坑爹!Quartz 重復調(diào)度問題,你遇到過么?
點擊關注公眾號,Java干貨及時送達
作者:Lavender
來源:https://segmentfault.com/a/1190000015492260
1. 引子
公司前期改用quartz做任務調(diào)度,一日的調(diào)度量均在兩百萬次以上。隨著調(diào)度量的增加,突然開始出現(xiàn)job重復調(diào)度的情況,且沒有規(guī)律可循。網(wǎng)上也沒有說得較為清楚的解決辦法,于是我們開始調(diào)試Quartz源碼,并最終找到了問題所在。
如果沒有耐性看完源碼解析,可以直接拉到文章最末,有直接簡單的解決辦法。注:本文中使用的quartz版本為2.3.0,且使用JDBC模式存儲Job。
2. 準備
首先,因為本文是代碼級別的分析文章,因而需要提前了解Quartz的用途和用法,網(wǎng)上還是有很多不錯的文章,可以提前自行了解。
其次,在用法之外,我們還需要了解一些Quartz框架的基礎概念:
1)Quartz把觸發(fā)job,叫做fire。TRIGGER_STATE是當前trigger的狀態(tài),PREV_FIRE_TIME是上一次觸發(fā)時間,NEXT_FIRE_TIME是下一次觸發(fā)時間,misfire是指這個job在某一時刻要觸發(fā),卻因為某些原因沒有觸發(fā)的情況。
2)Quartz在運行時,會起兩類線程(不止兩類),一類用于調(diào)度job的調(diào)度線程(單線程),一類是用于執(zhí)行job具體業(yè)務的工作池。
3)Quartz自帶的表里面,本文主要涉及以下3張表:
triggers表。triggers表里記錄了,某個trigger的PREV_FIRE_TIME(上次觸發(fā)時間),NEXT_FIRE_TIME(下一次觸發(fā)時間),TRIGGER_STATE(當前狀態(tài))。雖未盡述,但是本文用到的只有這些。 locks表。Quartz支持分布式,也就是會存在多個線程同時搶占相同資源的情況,而Quartz正是依賴這張表,處理這種狀況,至于如何做到,參見3.1。 fired_triggers表,記錄正在觸發(fā)的triggers信息。
4)TRIGGER_STATE,也就是trigger的狀態(tài),主要有以下幾類:
trigger的初始狀態(tài)是WAITING,處于WAITING狀態(tài)的trigger等待被觸發(fā)。調(diào)度線程會不停地掃triggers表,根據(jù)NEXT_FIRE_TIME提前拉取即將觸發(fā)的trigger,如果這個trigger被該調(diào)度線程拉取到,它的狀態(tài)就會變?yōu)?strong style="color: rgb(53, 179, 120);">ACQUIRED。
因為是提前拉取trigger,并未到達trigger真正的觸發(fā)時刻,所以調(diào)度線程會等到真正觸發(fā)的時刻,再將trigger狀態(tài)由ACQUIRED改為EXECUTING。
如果這個trigger不再執(zhí)行,就將狀態(tài)改為COMPLETE,否則為WAITING,開始新的周期。如果這個周期中的任何環(huán)節(jié)拋出異常,trigger的狀態(tài)會變成ERROR。如果手動暫停這個trigger,狀態(tài)會變成PAUSED。
3. 開始排查
3.1分布式狀態(tài)下的數(shù)據(jù)訪問
前文提到,trigger的狀態(tài)儲存在數(shù)據(jù)庫,Quartz支持分布式,所以如果起了多個quartz服務,會有多個調(diào)度線程來搶奪觸發(fā)同一個trigger。mysql在默認情況下執(zhí)行select 語句,是不上鎖的,那么如果同時有1個以上的調(diào)度線程搶到同一個trigger,是否會導致這個trigger重復調(diào)度呢?我們來看看,Quartz是如何解決這個問題的。
推薦一個 Spring Boot 基礎教程及實戰(zhàn)示例:https://github.com/javastacks/spring-boot-best-practice
首先,我們先來看下JobStoreSupport類的executeInNonManagedTXLock()方法:

/**
*Execute the given callback having acquired the given lock.
*Depending on the JobStore,the surrounding transaction maybe
*assumed to be already present(managed).
*
*@param lockName The name of the lock to acquire,for example
*"TRIGGER_ACCESS".If null, then no lock is acquired ,but the
*lockCallback is still executed in a transaction.
*/
這意味著,我們使用這個方法,不僅可以保證事務,還可以選擇保證,callback方法的線程安全。
接下來,我們來看一下executeInNonManagedTXLock(…)中的obtainLock(conn,lockName)方法,即搶鎖的過程。這個方法是在Semaphore接口中定義的,Semaphore接口通過鎖住線程或者資源,來保護資源不被其他線程修改,由于我們的調(diào)度信息是存在數(shù)據(jù)庫的,所以現(xiàn)在查看DBSemaphore.java中obtainLock方法的具體實現(xiàn):

我們通過調(diào)試查看expandedSQL和expandedInsertSQL這兩個變量:

圖3-3可以看出,obtainLock方法通過locks表的一個行鎖(lockName確定)來保證callback方法的事務和線程安全。拿到鎖后,obtainLock方法將lockName寫入threadlocal。當然在releaseLock的時候,會將lockName從threadlocal中刪除。
總而言之,executeInNonManagedTXLock()方法,保證了在分布式的情況,同一時刻,只有一個線程可以執(zhí)行這個方法。
3.2 quartz的調(diào)度過程

QuartzSchedulerThread是調(diào)度線程的具體實現(xiàn),圖3-4 是這個線程run()方法的主要內(nèi)容,圖中只提到了正常的情況下,也就是流程中沒有出現(xiàn)異常的情況下的處理過程。由圖可以看出,調(diào)度流程主要分為以下三步:最新面試題整理好了,大家可以在Java面試庫小程序在線刷題。
1)拉取待觸發(fā)trigger:
調(diào)度線程會一次性拉取距離現(xiàn)在,一定時間窗口內(nèi)的,一定數(shù)量內(nèi)的,即將觸發(fā)的trigger信息。那么,時間窗口和數(shù)量信息如何確定呢,我們先來看一下,以下幾個參數(shù):
idleWaitTime:默認30s,可通過配置屬性org.quartz.scheduler.idleWaitTime設置。availThreadCount:獲取可用(空閑)的工作線程數(shù)量,總會大于1,因為該方法會一直阻塞,直到有工作線程空閑下來。maxBatchSize:一次拉取trigger的最大數(shù)量,默認是1,可通過org.quartz.scheduler.batchTriggerAcquisitionMaxCount改寫batchTimeWindow:時間窗口調(diào)節(jié)參數(shù),默認是0,可通過org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow改寫misfireThreshold:超過這個時間還未觸發(fā)的trigger,被認為發(fā)生了misfire,默認60s,可通過org.quartz.jobStore.misfireThreshold設置。
調(diào)度線程一次會拉取NEXT_FIRE_TIME小于(now + idleWaitTime +batchTimeWindow),大于(now - misfireThreshold)的,min(availThreadCount,maxBatchSize)個triggers,默認情況下,會拉取未來30s,過去60s之間還未fire的1個trigger。隨后將這些triggers的狀態(tài)由WAITING改為ACQUIRED,并插入fired_triggers表。
2)觸發(fā)trigger:
首先,我們會檢查每個trigger的狀態(tài)是不是ACQUIRED,如果是,則將狀態(tài)改為EXECUTING,然后更新trigger的NEXT_FIRE_TIME,如果這個trigger的NEXT_FIRE_TIME為空,也就是未來不再觸發(fā),就將其狀態(tài)改為COMPLETE。如果trigger不允許并發(fā)執(zhí)行(即Job的實現(xiàn)類標注了@DisallowConcurrentExecution),則將狀態(tài)變?yōu)?strong style="color: rgb(53, 179, 120);">BLOCKED,否則就將狀態(tài)改為WAITING。
3)包裝trigger,丟給工作線程池:
遍歷triggers,如果其中某個trigger在第二步出錯,即返回值里面有exception或者為null,就會做一些triggers表,fired_triggers表的內(nèi)容修正,跳過這個trigger,繼續(xù)檢查下一個。否則,則根據(jù)trigger信息實例化JobRunShell(實現(xiàn)了Thread接口),同時依據(jù)JOB_CLASS_NAME實例化Job,隨后我們將JobRunShell實例丟入工作線。
另外,關注公眾號Java技術棧,在后臺回復:Java,可以獲取我整理的 Java 多線程系列教程,非常齊全。
在JobRunShell的run()方法,Quartz會在執(zhí)行job.execute()的前后通知之前綁定的監(jiān)聽器,如果job.execute()執(zhí)行的過程中有異常拋出,則執(zhí)行結(jié)果jobExEx會保存異常信息,反之如果沒有異常拋出,則jobExEx為null。然后根據(jù)jobExEx的不同,得到不同的執(zhí)行指令instCode。
JobRunShell將trigger信息,job信息和執(zhí)行指令傳給triggeredJobComplete()方法來完成最后的數(shù)據(jù)表更新操作。例如如果job執(zhí)行過程有異常拋出,就將這個trigger狀態(tài)變?yōu)?strong style="color: rgb(53, 179, 120);">ERROR,如果是BLOCKED狀態(tài),就將其變?yōu)?strong style="color: rgb(53, 179, 120);">WAITING等等,最后從fired_triggers表中刪除這個已經(jīng)執(zhí)行完成的trigger。注意,這些是在工作線程池異步完成。
點擊關注公眾號,Java干貨及時送達
3.3 排查問題
在前文,我們可以看到,Quartz的調(diào)度過程中有3次(可選的)上鎖行為,為什么稱為可選?因為這三個步驟雖然在executeInNonManagedTXLock方法的保護下,但executeInNonManagedTXLock方法可以通過設置傳入?yún)?shù)lockName為空,取消上鎖。在翻閱代碼時,我們看到第一步拉取待觸發(fā)的trigger時:
public?List?acquireNextTriggers(final?long?noLaterThan,?final?int?maxCount,?final?long?timeWindow)throws?JobPersistenceException? {
????String?lockName;
????//判斷是否需要上鎖
????if?(isAcquireTriggersWithinLock()?||?maxCount?>?1)?{
????????lockName?=?LOCK_TRIGGER_ACCESS;
????}?else?{
????????lockName?=?null;
????}
????return?executeInNonManagedTXLock(lockName,
?????????????????????????????????????new?TransactionCallback>(){
????????public?List?execute(Connection?conn)?throws?JobPersistenceException? {
????????????return?acquireNextTrigger(conn,?noLaterThan,?maxCount,?timeWindow);
????????}
????},?new?TransactionValidator>()?{
?????????//省略
????});
}
在加鎖之前對lockName做了一次判斷,而非像其他加鎖方法一樣,默認傳入的就是LOCK_TRIGGER_ACCESS:
public?List?triggersFired(final?List?triggers) ?throws?JobPersistenceException? {
????//默認上鎖
????return?executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
????????new?TransactionCallback>()?{
????????//省略
????????},new?TransactionValidator>()?{
????????????//省略
???????????});
}
通過調(diào)試發(fā)現(xiàn)isAcquireTriggersWithinLock()的值是false,因而導致傳入的lockName是null。我在代碼中加入日志,可以更清楚的看到這個過程。最新面試題整理好了,大家可以在Java面試庫小程序在線刷題。

protected?TriggerFiredBundle?triggerFired(Connection?conn,?OperableTrigger?trigger)
????throws?JobPersistenceException?{
????JobDetail?job;
????Calendar?cal?=?null;
????//?Make?sure?trigger?wasn't?deleted,?paused,?or?completed...
????try?{?//?if?trigger?was?deleted,?state?will?be?STATE_DELETED
????????String?state?=?getDelegate().selectTriggerState(conn,trigger.getKey());
?????????if?(!state.equals(STATE_ACQUIRED))?{
????????????return?null;
????????}
????}?catch?(SQLException?e)?{
????????????throw?new?JobPersistenceException("Couldn't?select?trigger?state:?"
????????????????????+?e.getMessage(),?e);
????}
調(diào)度線程如果發(fā)現(xiàn)當前trigger的狀態(tài)不是ACQUIRED,也就是說,這個trigger被其他線程fire了,就會返回null。在3.2,我們提到,在調(diào)度流程的第三步,如果發(fā)現(xiàn)某個trigger第二步的返回值是null,就會跳過第三步,取消fire。在通常的情況下,樂觀鎖能保證不發(fā)生重復調(diào)度,但是難免發(fā)生ABA問題,我們看一下這是發(fā)生重復調(diào)度時的日志:

在第一步時,也就是quartz在拉取到符合條件的triggers 到將他們的狀態(tài)由WAITING改為ACQUIRED之間停頓了有超過9ms的時間,而另一臺服務器正是趁著這9ms的空檔完成了WAITING-->ACQUIRED-->EXECUTING-->WAITING(也就是一個完整的狀態(tài)變化周期)的全部過程,圖示參見圖3-6。

3.4 解決辦法
如何去解決這個問題呢?在配置文件加上org.quartz.jobStore.acquireTriggersWithinLock=true,這樣,在調(diào)度流程的第一步,也就是拉取待即將觸發(fā)的triggers時,是上鎖的狀態(tài),即不會同時存在多個線程拉取到相同的trigger的情況,也就避免的重復調(diào)度的危險。
3.5 心得
此次排查過程并非一帆風順,走過一些坑,也有一些非技術相關的體會:
3)日志很重要。雖然我們可以調(diào)試,但是沒有日志,我們是無法發(fā)現(xiàn)并證明,程序發(fā)生了ABA問題。
4)最重要的是,不要害怕問題,即使是Quartz這樣大型的框架,解決問題也不一定需要把2.4MB的源碼通通讀懂。只要有時間,問題都能解決,只是好的技巧能縮短這個時間,而我們需要在一次次實戰(zhàn)中磨練技巧。

關注Java技術棧看更多干貨


