老板今天讓我搞 分布式定時(shí)任務(wù)....
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
? 作者?|??荒野猛獸
來源 |? urlify.cn/vqiYva
66套java從入門到精通實(shí)戰(zhàn)課程分享
第一步:引入依賴
????????
????????????org.quartz-scheduler
????????????quartz
????????????${quartz.version}
????????
????????
????????????org.quartz-scheduler
????????????quartz-jobs
????????????${quartz.version}
????????
????????
????????
????????????org.springframework
????????????spring-context-support
????????
第二步:創(chuàng)建MySQL表,Quartz是基于表來感知其他定時(shí)任務(wù)節(jié)點(diǎn)的,節(jié)點(diǎn)間不會(huì)直接通信。建表語句在jar包中自帶了。
org\quartz-scheduler\quartz\2.3.0\quartz-2.3.0.jar!\org\quartz\impl\jdbcjobstore\tables_mysql_innodb.sql
?
?第三步:配置線程池,我這里是因?yàn)轫?xiàng)目的其他地方有用到線程池,你也可以選擇在Quartz的配置類中注入。
(我在其他位置使用了線程池,占用了一個(gè)線程,所以當(dāng)我將核心線程數(shù)量設(shè)置為1時(shí),定時(shí)任務(wù)不會(huì)執(zhí)行;需確保有足夠的線程來執(zhí)行)
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;
import?org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import?java.util.concurrent.Executor;
import?java.util.concurrent.ThreadPoolExecutor;
/**
?*?@Author?1
?*?@Description?配置線程池交給Spring容器管理
?*?@Date?2020/8/26?18:23
?**/
@Configuration
public?class?ExecturConfig?{
????@Bean("taskExector")
????public?Executor?taskExector()?{
????????ThreadPoolTaskExecutor?executor?=?new?ThreadPoolTaskExecutor();
????????//核心線程池?cái)?shù)量
????????executor.setCorePoolSize(2);
????????//最大線程數(shù)量
????????executor.setMaxPoolSize(5);
????????//線程池的隊(duì)列容量
????????executor.setQueueCapacity(10);
????????//線程名稱的前綴
????????executor.setThreadNamePrefix("expireOrderHandle-");
????????//配置拒絕策略
????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.AbortPolicy());
????????executor.initialize();
????????return?executor;
????}
}
第四步:因?yàn)槎〞r(shí)任務(wù)業(yè)務(wù)中需要使用到注入Spring容器的類,所以配置注入,否則報(bào)空指針異常。
參考了一位大佬的博客:https://blog.csdn.net/qq_39513430/article/details/104996237
import?org.quartz.spi.TriggerFiredBundle;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import?org.springframework.scheduling.quartz.AdaptableJobFactory;
import?org.springframework.stereotype.Component;
@Component("myAdaptableJobFactory")
public?class?MyAdaptableJobFactory?extends?AdaptableJobFactory?{
????//AutowireCapableBeanFactory?可以將一個(gè)對(duì)象添加到SpringIOC容器中,并且完成該對(duì)象注入
????@Autowired
????private?AutowireCapableBeanFactory?autowireCapableBeanFactory;
????
????/**
?????*?該方法需要將實(shí)例化的任務(wù)對(duì)象手動(dòng)的添加到springIOC容器中并且完成對(duì)象的注入
?????*/
????@Override
????protected?Object?createJobInstance(TriggerFiredBundle?bundle)?throws?Exception?{
????????Object?obj?=?super.createJobInstance(bundle);
????????//將obj對(duì)象添加Spring?IOC容器中,并完成注入
????????this.autowireCapableBeanFactory.autowireBean(obj);
????????return?obj;
????}
}
第五步:添加Quartz屬性文件
關(guān)于屬性配置解釋參考https://blog.csdn.net/github_36429631/article/details/63254055
#============================================================================
#?Configure?JobStore
#?Using?Spring?datasource?in?SchedulerConfig.java
#?Spring?uses?LocalDataSourceJobStore?extension?of?JobStoreCMT
#============================================================================
#設(shè)置為TRUE不會(huì)出現(xiàn)序列化非字符串類到?BLOB?時(shí)產(chǎn)生的類版本問題
org.quartz.jobStore.useProperties=true
#quartz相關(guān)數(shù)據(jù)表前綴名
org.quartz.jobStore.tablePrefix?=?QRTZ_
#開啟分布式部署
org.quartz.jobStore.isClustered?=?true
#分布式節(jié)點(diǎn)有效性檢查時(shí)間間隔,單位:毫秒
org.quartz.jobStore.clusterCheckinInterval?=?20000
#信息保存時(shí)間?默認(rèn)值60秒
org.quartz.jobStore.misfireThreshold?=?60000
#事務(wù)隔離級(jí)別為“讀已提交”
org.quartz.jobStore.txIsolationLevelReadCommitted?=?true
#配置線程池實(shí)現(xiàn)類
org.quartz.jobStore.class?=?org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass?=?org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#============================================================================
#?Configure?Main?Scheduler?Properties
#?Needed?to?manage?cluster?instances
#============================================================================
org.quartz.scheduler.instanceName?=?ClusterQuartz
org.quartz.scheduler.instanceId=?AUTO
#如果你想quartz-scheduler出口本身通過RMI作為服務(wù)器,然后設(shè)置“出口”標(biāo)志true(默認(rèn)值為false)。
org.quartz.scheduler.rmi.export?=?false
#true:鏈接遠(yuǎn)程服務(wù)調(diào)度(客戶端),這個(gè)也要指定registryhost和registryport,默認(rèn)為false
#?如果export和proxy同時(shí)指定為true,則export的設(shè)置將被忽略
org.quartz.scheduler.rmi.proxy?=?false
org.quartz.scheduler.wrapJobExecutionInUserTransaction?=?false
#============================================================================
#?Configure?ThreadPool
#?Can?also?be?configured?in?spring?configuration
#============================================================================
#線程池的實(shí)現(xiàn)類(一般使用SimpleThreadPool即可滿足幾乎所有用戶的需求)
#org.quartz.threadPool.class?=?org.quartz.simpl.SimpleThreadPool
#org.quartz.threadPool.threadCount?=?5
#org.quartz.threadPool.threadPriority?=?5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread?=?true
第六步:寫任務(wù)類
package?com.website.task;
import?com.website.mapper.WebTeacherMapper;
import?com.website.pojo.ao.TeacherSalaryRuleAO;
import?com.website.pojo.bo.TeacherSalaryRuleDetailBO;
import?com.website.pojo.bo.TeacherSalaryRuleRelationBO;
import?com.website.pojo.bo.TeacherSalaryStatTempBO;
import?com.website.pojo.bo.TeacherSalaryStatisticBO;
import?io.jsonwebtoken.lang.Collections;
import?lombok.extern.slf4j.Slf4j;
import?org.apache.commons.lang3.StringUtils;
import?org.joda.time.DateTime;
import?org.quartz.DisallowConcurrentExecution;
import?org.quartz.JobExecutionContext;
import?org.quartz.JobExecutionException;
import?org.quartz.PersistJobDataAfterExecution;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.scheduling.quartz.QuartzJobBean;
import?java.math.BigDecimal;
import?java.util.ArrayList;
import?java.util.HashSet;
import?java.util.List;
import?java.util.Map;
import?java.util.stream.Collectors;
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
@Slf4j
public?class?QuartzJob?extends?QuartzJobBean?{
????@Autowired
????private?WebTeacherMapper?webTeacherMapper;
????@Override
????protected?void?executeInternal(JobExecutionContext?context)?throws?JobExecutionException?{
????????log.info("統(tǒng)計(jì)老師月薪資定時(shí)任務(wù)開始執(zhí)行。");
????????System.out.println("任務(wù)編寫位置");
????????log.info("統(tǒng)計(jì)老師月薪資定時(shí)任務(wù)執(zhí)行完畢。");
????}
}
第七步:配置定時(shí)器
import?com.website.task.QuartzJob;
import?org.quartz.Scheduler;
import?org.quartz.TriggerKey;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.beans.factory.config.PropertiesFactoryBean;
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;
import?org.springframework.core.io.ClassPathResource;
import?org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import?org.springframework.scheduling.quartz.JobDetailFactoryBean;
import?org.springframework.scheduling.quartz.SchedulerFactoryBean;
import?javax.sql.DataSource;
import?java.io.IOException;
import?java.util.Properties;
import?java.util.concurrent.Executor;
@Configuration
public?class?SchedulerConfig?{
????@Autowired
????private?DataSource?dataSource;
????@Autowired
????private?Executor?taskExector;
????@Autowired
????private?MyAdaptableJobFactory?myAdaptableJobFactory;
????@Bean
????public?Scheduler?scheduler()?throws?Exception?{
????????Scheduler?scheduler?=?schedulerFactoryBean().getScheduler();
????????TriggerKey?triggerKey1?=?TriggerKey.triggerKey("trigger1",?"TriggerTest111");
????????/*========如果有必要可以配置刪除任務(wù),開始====================*/
????????//停止觸發(fā)器
//????????scheduler.pauseTrigger(triggerKey1);
????????//移除觸發(fā)器
//????????scheduler.unscheduleJob(triggerKey1);
//????????JobKey?jobKey1?=?JobKey.jobKey("job1111------",?"quartzTest--------");
????????//刪除任務(wù)
//????????boolean?b?=?scheduler.deleteJob(jobKey1);
//????????System.out.println(b);
????????/*=========結(jié)束====================*/
????????scheduler.start();
????????return?scheduler;
????}
????@Bean
????public?SchedulerFactoryBean?schedulerFactoryBean()?throws?IOException?{
????????SchedulerFactoryBean?factory?=?new?SchedulerFactoryBean();
????????//開啟更新job
????????factory.setOverwriteExistingJobs(true);
????????//如果不配置就會(huì)使用quartz.properties中的instanceName
????????//factory.setSchedulerName("Cluster_Scheduler");
????????//配置數(shù)據(jù)源,這是quartz使用的表的數(shù)據(jù)庫存放位置
????????factory.setDataSource(dataSource);
????????//設(shè)置實(shí)例在spring容器中的key
????????factory.setApplicationContextSchedulerContextKey("applicationContext");
????????//配置線程池
????????factory.setTaskExecutor(taskExector);
????????//配置配置文件
????????factory.setQuartzProperties(quartzProperties());
????????//設(shè)置調(diào)度器自動(dòng)運(yùn)行
????????factory.setAutoStartup(true);
????????//配置任務(wù)執(zhí)行規(guī)則,參數(shù)是一個(gè)可變數(shù)組
????????factory.setTriggers(trigger1().getObject());
????????//?解決mapper無法注入問題,此處配合第四步的配置。
????????factory.setJobFactory(myAdaptableJobFactory);
????????return?factory;
????}
????@Bean
????public?Properties?quartzProperties()?throws?IOException?{
????????PropertiesFactoryBean?propertiesFactoryBean?=?new?PropertiesFactoryBean();
????????propertiesFactoryBean.setLocation(new?ClassPathResource("/quartz.properties"));
????????//?在quartz.properties中的屬性被讀取并注入后再初始化對(duì)象
????????propertiesFactoryBean.afterPropertiesSet();
????????return?propertiesFactoryBean.getObject();
????}
????@Bean
????public?JobDetailFactoryBean?job1()?{
????????JobDetailFactoryBean?jobDetail?=?new?JobDetailFactoryBean();
????????//配置任務(wù)的具體實(shí)現(xiàn)
????????jobDetail.setJobClass(QuartzJob.class);
????????//是否持久化
????????jobDetail.setDurability(true);
????????//出現(xiàn)異常是否重新執(zhí)行
????????jobDetail.setRequestsRecovery(true);
????????//配置定時(shí)任務(wù)信息
????????jobDetail.setName("TeacherSalaryJob");
????????jobDetail.setGroup("TeacherSalaryJobGroup");
????????jobDetail.setDescription("這是每月1號(hào)凌晨統(tǒng)計(jì)教師薪資任務(wù)");
????????return?jobDetail;
????}
????@Bean
????public?CronTriggerFactoryBean?trigger1()?{
????????CronTriggerFactoryBean?cronTrigger?=?new?CronTriggerFactoryBean();
????????//定時(shí)規(guī)則的分組
????????cronTrigger.setGroup("TeacherSalaryTriggerGroup");
????????cronTrigger.setName("TeacherSalaryTrigger");
????????//配置執(zhí)行的任務(wù)jobdetail
????????cronTrigger.setJobDetail(job1().getObject());
????????//配置執(zhí)行規(guī)則?每月一號(hào)0點(diǎn)過1分執(zhí)行一次
????????cronTrigger.setCronExpression("0?1?0?1?*???");
????????return?cronTrigger;
????}
}
到此完畢,另外發(fā)現(xiàn)如果執(zhí)行任務(wù)的代碼中報(bào)錯(cuò),會(huì)導(dǎo)致定時(shí)任務(wù)停止循環(huán),重啟也不會(huì)再執(zhí)行。建議任務(wù)內(nèi)容用try...catch代碼塊包裹起來,打印好日志。
已中斷的任務(wù)清空Quartz所有表格,再啟動(dòng)項(xiàng)目即可再次觸發(fā)啟動(dòng)任務(wù)。
如果某一天定時(shí)任務(wù)突然不執(zhí)行了,網(wǎng)上很多情況都是遠(yuǎn)程調(diào)用沒有加超時(shí)中斷,從而導(dǎo)致線程阻塞引起的。
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
???

?長按上方微信二維碼?2 秒
感謝點(diǎn)贊支持下哈?
