如何動(dòng)態(tài)管理Spring Boot定時(shí)任務(wù)
??Java大聯(lián)盟 ? 致力于最高效的Java學(xué)習(xí)
關(guān)注
原文鏈接 blog.csdn.net/qq_34886352/article/details/106494637
B 站搜索:楠哥教你學(xué)Java
獲取更多優(yōu)質(zhì)視頻教程
1、功能說明
SpringBoot的定時(shí)任務(wù)的加強(qiáng)工具,實(shí)現(xiàn)對SpringBoot原生的定時(shí)任務(wù)進(jìn)行動(dòng)態(tài)管理,完全兼容原生@Scheduled注解,無需對原本的定時(shí)任務(wù)進(jìn)行修改。
2、快速使用
具體的功能已經(jīng)封裝成SpringBoot-starter即插即用
<dependency><groupId>com.github.guoyixinggroupId><artifactId>spring-boot-starter-super-scheduledartifactId><version>0.3.1version>dependency>
使用方法和源碼:
碼云:https://gitee.com/qiaodaimadewangcai/super-scheduled
github:https://github.com/guoyixing/super-scheduled
3、實(shí)現(xiàn)原理
1、動(dòng)態(tài)管理實(shí)現(xiàn)
(1) 配置管理介紹
("superScheduledConfig")public class SuperScheduledConfig {/*** 執(zhí)行定時(shí)任務(wù)的線程池*/private ThreadPoolTaskScheduler taskScheduler;/*** 定時(shí)任務(wù)名稱與定時(shí)任務(wù)回調(diào)鉤子 的關(guān)聯(lián)關(guān)系容器*/private Map<String, ScheduledFuture> nameToScheduledFuture = new ConcurrentHashMap<>();/*** 定時(shí)任務(wù)名稱與定時(shí)任務(wù)需要執(zhí)行的邏輯 的關(guān)聯(lián)關(guān)系容器*/private Map<String, Runnable> nameToRunnable = new ConcurrentHashMap<>();/*** 定時(shí)任務(wù)名稱與定時(shí)任務(wù)的源信息 的關(guān)聯(lián)關(guān)系容器*/private Map<String, ScheduledSource> nameToScheduledSource = new ConcurrentHashMap<>();/* 普通的get/sets省略 */}
(2) 使用后處理器攔截SpringBoot原本的定時(shí)任務(wù)
實(shí)現(xiàn)ApplicationContextAware接口拿到SpringBoot的上下文 實(shí)現(xiàn)BeanPostProcessor接口,將這個(gè)類標(biāo)記為后處理器,后處理器會(huì)在每個(gè)bean實(shí)例化之后執(zhí)行 使用@DependsOn注解強(qiáng)制依賴SuperScheduledConfig類,讓SpringBoot實(shí)例化SuperScheduledPostProcessor類之前先實(shí)例化SuperScheduledConfig類 主要實(shí)現(xiàn)邏輯在postProcessAfterInitialization()方法中

public class SuperScheduledPostProcessor implements BeanPostProcessor, ApplicationContextAware {protected final Log logger = LogFactory.getLog(getClass());private ApplicationContext applicationContext;/*** 實(shí)例化bean之前的操作* @param bean bean實(shí)例* @param beanName bean的Name*/public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {return bean;}/*** 實(shí)例化bean之后的操作* @param bean bean實(shí)例* @param beanName bean的Name*/public Object postProcessAfterInitialization(Object bean,String beanName) throws BeansException {//1.獲取配置管理器SuperScheduledConfig superScheduledConfig = applicationContext.getBean(SuperScheduledConfig.class);//2.獲取當(dāng)前實(shí)例化完成的bean的所有方法Method[] methods = bean.getClass().getDeclaredMethods();//循環(huán)處理對每個(gè)方法逐一處理if (methods.length > 0) {for (Method method : methods) {//3.嘗試在該方法上獲取@Scheduled注解(SpringBoot的定時(shí)任務(wù)注解)Scheduled annotation = method.getAnnotation(Scheduled.class);//如果無法獲取到@Scheduled注解,就跳過這個(gè)方法if (annotation == null) {continue;}//4.創(chuàng)建定時(shí)任務(wù)的源屬性//創(chuàng)建定時(shí)任務(wù)的源屬性(用來記錄定時(shí)任務(wù)的配置,初始化的時(shí)候記錄的是注解上原本的屬性)ScheduledSource scheduledSource = new ScheduledSource(annotation, method, bean);//對注解上獲取到源屬性中的屬性進(jìn)行檢測if (!scheduledSource.check()) {throw new SuperScheduledException("在" + beanName + "Bean中" + method.getName() + "方法的注解參數(shù)錯(cuò)誤");}//生成定時(shí)任務(wù)的名稱(id),使用beanName+“.”+方法名String name = beanName + "." + method.getName();//將以key-value的形式,將源數(shù)據(jù)存入配置管理器中,key:定時(shí)任務(wù)的名稱 value:源數(shù)據(jù)superScheduledConfig.addScheduledSource(name, scheduledSource);try {//5.將原本SpringBoot的定時(shí)任務(wù)取消掉clearOriginalScheduled(annotation);} catch (Exception e) {throw new SuperScheduledException("在關(guān)閉原始方法" + beanName + method.getName() + "時(shí)出現(xiàn)錯(cuò)誤");}}}//最后bean保持原有返回return bean;}/*** 修改注解原先的屬性* @param annotation 注解實(shí)例對象* @throws Exception*/private void clearOriginalScheduled(Scheduled annotation) throws Exception {changeAnnotationValue(annotation, "cron", Scheduled.CRON_DISABLED);changeAnnotationValue(annotation, "fixedDelay", -1L);changeAnnotationValue(annotation, "fixedDelayString", "");changeAnnotationValue(annotation, "fixedRate", -1L);changeAnnotationValue(annotation, "fixedRateString", "");changeAnnotationValue(annotation, "initialDelay", -1L);changeAnnotationValue(annotation, "initialDelayString", "");}/*** 獲取SpringBoot的上下文* @param applicationContext SpringBoot的上下文*/public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}}
(3) 使用ApplicationRunner初始化自定義的定時(shí)任務(wù)運(yùn)行器
實(shí)現(xiàn)ApplicationContextAware接口拿到SpringBoot的上下文 使用@DependsOn注解強(qiáng)制依賴threadPoolTaskScheduler類 實(shí)現(xiàn)ApplicationRunner接口,在所有bean初始化結(jié)束之后,運(yùn)行自定義邏輯 主要實(shí)現(xiàn)邏輯在run()方法中

("threadPoolTaskScheduler")public class SuperScheduledApplicationRunner implements ApplicationRunner, ApplicationContextAware {protected final Log logger = LogFactory.getLog(getClass());private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");private ApplicationContext applicationContext;/*** 定時(shí)任務(wù)配置管理器*/private SuperScheduledConfig superScheduledConfig;/*** 定時(shí)任務(wù)執(zhí)行線程*/private ThreadPoolTaskScheduler threadPoolTaskScheduler;public void run(ApplicationArguments args) {//1.定時(shí)任務(wù)配置管理器中緩存 定時(shí)任務(wù)執(zhí)行線程superScheduledConfig.setTaskScheduler(threadPoolTaskScheduler);//2.獲取所有定時(shí)任務(wù)源數(shù)據(jù)Map<String, ScheduledSource> nameToScheduledSource = superScheduledConfig.getNameToScheduledSource();//逐一處理定時(shí)任務(wù)for (String name : nameToScheduledSource.keySet()) {//3.獲取定時(shí)任務(wù)源數(shù)據(jù)ScheduledSource scheduledSource = nameToScheduledSource.get(name);//4.獲取所有增強(qiáng)類String[] baseStrengthenBeanNames = applicationContext.getBeanNamesForType(BaseStrengthen.class);//5.創(chuàng)建執(zhí)行控制器SuperScheduledRunnable runnable = new SuperScheduledRunnable();//配置執(zhí)行控制器runnable.setMethod(scheduledSource.getMethod());runnable.setBean(scheduledSource.getBean());//6.逐一處理增強(qiáng)類(增強(qiáng)器實(shí)現(xiàn)原理后面具體分析)Listpoints = new ArrayList<>(baseStrengthenBeanNames.length); for (String baseStrengthenBeanName : baseStrengthenBeanNames) {//7.將增強(qiáng)器代理成pointObject baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);//創(chuàng)建代理Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));proxy.setSuperScheduledName(name);//8.所有的points連成起來points.add(proxy);}//將point形成調(diào)用鏈runnable.setChain(new Chain(points));//將執(zhí)行邏輯封裝并緩存到定時(shí)任務(wù)配置管理器中superScheduledConfig.addRunnable(name, runnable::invoke);try {//8.啟動(dòng)定時(shí)任務(wù)ScheduledFuture> schedule = ScheduledFutureFactory.create(threadPoolTaskScheduler, scheduledSource, runnable::invoke);//將線程回調(diào)鉤子存到任務(wù)配置管理器中superScheduledConfig.addScheduledFuture(name, schedule);logger.info(df.format(LocalDateTime.now()) + "任務(wù)" + name + "已經(jīng)啟動(dòng)...");} catch (Exception e) {throw new SuperScheduledException("任務(wù)" + name + "啟動(dòng)失敗,錯(cuò)誤信息:" + e.getLocalizedMessage());}}}public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}}
(4) 進(jìn)行動(dòng)態(tài)管理
public class SuperScheduledManager {protected final Log logger = LogFactory.getLog(getClass());private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");private SuperScheduledConfig superScheduledConfig;/*** 修改Scheduled的執(zhí)行周期** @param name scheduled的名稱* @param cron cron表達(dá)式*/public void setScheduledCron(String name, String cron) {//終止原先的任務(wù)cancelScheduled(name);//創(chuàng)建新的任務(wù)ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);scheduledSource.clear();scheduledSource.setCron(cron);addScheduled(name, scheduledSource);}/*** 修改Scheduled的fixedDelay** @param name scheduled的名稱* @param fixedDelay 上一次執(zhí)行完畢時(shí)間點(diǎn)之后多長時(shí)間再執(zhí)行*/public void setScheduledFixedDelay(String name, Long fixedDelay) {//終止原先的任務(wù)cancelScheduled(name);//創(chuàng)建新的任務(wù)ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);scheduledSource.clear();scheduledSource.setFixedDelay(fixedDelay);addScheduled(name, scheduledSource);}/*** 修改Scheduled的fixedRate** @param name scheduled的名稱* @param fixedRate 上一次開始執(zhí)行之后多長時(shí)間再執(zhí)行*/public void setScheduledFixedRate(String name, Long fixedRate) {//終止原先的任務(wù)cancelScheduled(name);//創(chuàng)建新的任務(wù)ScheduledSource scheduledSource = superScheduledConfig.getScheduledSource(name);scheduledSource.clear();scheduledSource.setFixedRate(fixedRate);addScheduled(name, scheduledSource);}/*** 查詢所有啟動(dòng)的Scheduled*/public ListgetRunScheduledName() {Setnames = superScheduledConfig.getNameToScheduledFuture().keySet(); return new ArrayList<>(names);}/*** 查詢所有的Scheduled*/public ListgetAllSuperScheduledName() {Setnames = superScheduledConfig.getNameToRunnable().keySet(); return new ArrayList<>(names);}/*** 終止Scheduled** @param name scheduled的名稱*/public void cancelScheduled(String name) {ScheduledFuture scheduledFuture = superScheduledConfig.getScheduledFuture(name);scheduledFuture.cancel(true);superScheduledConfig.removeScheduledFuture(name);logger.info(df.format(LocalDateTime.now()) + "任務(wù)" + name + "已經(jīng)終止...");}/*** 啟動(dòng)Scheduled** @param name scheduled的名稱* @param scheduledSource 定時(shí)任務(wù)的源信息*/public void addScheduled(String name, ScheduledSource scheduledSource) {if (getRunScheduledName().contains(name)) {throw new SuperScheduledException("定時(shí)任務(wù)" + name + "已經(jīng)被啟動(dòng)過了");}if (!scheduledSource.check()) {throw new SuperScheduledException("定時(shí)任務(wù)" + name + "源數(shù)據(jù)內(nèi)容錯(cuò)誤");}scheduledSource.refreshType();Runnable runnable = superScheduledConfig.getRunnable(name);ThreadPoolTaskScheduler taskScheduler = superScheduledConfig.getTaskScheduler();ScheduledFuture> schedule = ScheduledFutureFactory.create(taskScheduler, scheduledSource, runnable);logger.info(df.format(LocalDateTime.now()) + "任務(wù)" + name + "已經(jīng)啟動(dòng)...");superScheduledConfig.addScheduledSource(name, scheduledSource);superScheduledConfig.addScheduledFuture(name, schedule);}/*** 以cron類型啟動(dòng)Scheduled** @param name scheduled的名稱* @param cron cron表達(dá)式*/public void addCronScheduled(String name, String cron) {ScheduledSource scheduledSource = new ScheduledSource();scheduledSource.setCron(cron);addScheduled(name, scheduledSource);}/*** 以fixedDelay類型啟動(dòng)Scheduled** @param name scheduled的名稱* @param fixedDelay 上一次執(zhí)行完畢時(shí)間點(diǎn)之后多長時(shí)間再執(zhí)行* @param initialDelay 第一次執(zhí)行的延遲時(shí)間*/public void addFixedDelayScheduled(String name, Long fixedDelay, Long... initialDelay) {ScheduledSource scheduledSource = new ScheduledSource();scheduledSource.setFixedDelay(fixedDelay);if (initialDelay != null && initialDelay.length == 1) {scheduledSource.setInitialDelay(initialDelay[0]);} else if (initialDelay != null && initialDelay.length > 1) {throw new SuperScheduledException("第一次執(zhí)行的延遲時(shí)間只能傳入一個(gè)參數(shù)");}addScheduled(name, scheduledSource);}/*** 以fixedRate類型啟動(dòng)Scheduled** @param name scheduled的名稱* @param fixedRate 上一次開始執(zhí)行之后多長時(shí)間再執(zhí)行* @param initialDelay 第一次執(zhí)行的延遲時(shí)間*/public void addFixedRateScheduled(String name, Long fixedRate, Long... initialDelay) {ScheduledSource scheduledSource = new ScheduledSource();scheduledSource.setFixedRate(fixedRate);if (initialDelay != null && initialDelay.length == 1) {scheduledSource.setInitialDelay(initialDelay[0]);} else if (initialDelay != null && initialDelay.length > 1) {throw new SuperScheduledException("第一次執(zhí)行的延遲時(shí)間只能傳入一個(gè)參數(shù)");}addScheduled(name, scheduledSource);}/*** 手動(dòng)執(zhí)行一次任務(wù)** @param name scheduled的名稱*/public void runScheduled(String name) {Runnable runnable = superScheduledConfig.getRunnable(name);runnable.run();}}
2、增強(qiáng)接口實(shí)現(xiàn)
增強(qiáng)器實(shí)現(xiàn)的整體思路與SpringAop的思路一致,實(shí)現(xiàn)沒有Aop復(fù)雜
(1) 增強(qiáng)接口
(Ordered.HIGHEST_PRECEDENCE)public interface BaseStrengthen {/*** 前置強(qiáng)化方法** @param bean bean實(shí)例(或者是被代理的bean)* @param method 執(zhí)行的方法對象* @param args 方法參數(shù)*/void before(Object bean, Method method, Object[] args);/*** 后置強(qiáng)化方法* 出現(xiàn)異常不會(huì)執(zhí)行* 如果未出現(xiàn)異常,在afterFinally方法之后執(zhí)行** @param bean bean實(shí)例(或者是被代理的bean)* @param method 執(zhí)行的方法對象* @param args 方法參數(shù)*/void after(Object bean, Method method, Object[] args);/*** 異常強(qiáng)化方法** @param bean bean實(shí)例(或者是被代理的bean)* @param method 執(zhí)行的方法對象* @param args 方法參數(shù)*/void exception(Object bean, Method method, Object[] args);/*** Finally強(qiáng)化方法,出現(xiàn)異常也會(huì)執(zhí)行** @param bean bean實(shí)例(或者是被代理的bean)* @param method 執(zhí)行的方法對象* @param args 方法參數(shù)*/void afterFinally(Object bean, Method method, Object[] args);}
(2) 代理抽象類
public abstract class Point {/*** 定時(shí)任務(wù)名*/private String superScheduledName;/*** 抽象的執(zhí)行方法,使用代理實(shí)現(xiàn)* @param runnable 定時(shí)任務(wù)執(zhí)行器*/public abstract Object invoke(SuperScheduledRunnable runnable);/* 普通的get/sets省略 */}
(3) 調(diào)用鏈類
public class Chain {private Listlist; private int index = -1;/*** 索引自增1*/public int incIndex() {return ++index;}/*** 索引還原*/public void resetIndex() {this.index = -1;}}
(4) cglib動(dòng)態(tài)代理實(shí)現(xiàn)
使用cglib代理增強(qiáng)器,將增強(qiáng)器全部代理成調(diào)用鏈節(jié)點(diǎn)Point
public class RunnableBaseInterceptor implements MethodInterceptor {/*** 定時(shí)任務(wù)執(zhí)行器*/private SuperScheduledRunnable runnable;/*** 定時(shí)任務(wù)增強(qiáng)類*/private BaseStrengthen strengthen;@Overridepublic Object intercept(Object obj, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {Object result;//如果執(zhí)行的是invoke()方法if ("invoke".equals(method.getName())) {//前置強(qiáng)化方法strengthen.before(obj, method, args);try {//調(diào)用執(zhí)行器中的invoke()方法result = runnable.invoke();} catch (Exception e) {//異常強(qiáng)化方法strengthen.exception(obj, method, args);throw new SuperScheduledException(strengthen.getClass() + "中強(qiáng)化執(zhí)行時(shí)發(fā)生錯(cuò)誤", e);} finally {//Finally強(qiáng)化方法,出現(xiàn)異常也會(huì)執(zhí)行strengthen.afterFinally(obj, method, args);}//后置強(qiáng)化方法strengthen.after(obj, method, args);} else {//直接執(zhí)行方法result = methodProxy.invokeSuper(obj, args);}return result;}public RunnableBaseInterceptor(Object object, SuperScheduledRunnable runnable) {this.runnable = runnable;if (BaseStrengthen.class.isAssignableFrom(object.getClass())) {this.strengthen = (BaseStrengthen) object;} else {throw new SuperScheduledException(object.getClass() + "對象不是BaseStrengthen類型");}}public RunnableBaseInterceptor() {}}
(5) 定時(shí)任務(wù)執(zhí)行器實(shí)現(xiàn)
public class SuperScheduledRunnable {/*** 原始的方法*/private Method method;/*** 方法所在的bean*/private Object bean;/*** 增強(qiáng)器的調(diào)用鏈*/private Chain chain;public Object invoke() {Object result;//索引自增1if (chain.incIndex() == chain.getList().size()) {//調(diào)用鏈中的增強(qiáng)方法已經(jīng)全部執(zhí)行結(jié)束try {//調(diào)用鏈索引初始化chain.resetIndex();//增強(qiáng)器全部執(zhí)行完畢,執(zhí)行原本的方法result = method.invoke(bean);} catch (IllegalAccessException | InvocationTargetException e) {throw new SuperScheduledException(e.getLocalizedMessage());}} else {//獲取被代理后的方法增強(qiáng)器Point point = chain.getList().get(chain.getIndex());//執(zhí)行增強(qiáng)器代理//增強(qiáng)器代理中,會(huì)回調(diào)方法執(zhí)行器,形成調(diào)用鏈,逐一運(yùn)行調(diào)用鏈中的增強(qiáng)器result = point.invoke(this);}return result;}/* 普通的get/sets省略 */}
(6) 增強(qiáng)器代理邏輯
com.gyx.superscheduled.core.SuperScheduledApplicationRunner類中的代碼片段
//創(chuàng)建執(zhí)行控制器SuperScheduledRunnable runnable = new SuperScheduledRunnable();runnable.setMethod(scheduledSource.getMethod());runnable.setBean(scheduledSource.getBean());//用來存放 增強(qiáng)器的代理對象Listpoints = new ArrayList<>(baseStrengthenBeanNames.length); //循環(huán)所有的增強(qiáng)器的beanNamefor (String baseStrengthenBeanName : baseStrengthenBeanNames) {//獲取增強(qiáng)器的bean對象Object baseStrengthenBean = applicationContext.getBean(baseStrengthenBeanName);//將增強(qiáng)器代理成Point節(jié)點(diǎn)Point proxy = ProxyUtils.getInstance(Point.class, new RunnableBaseInterceptor(baseStrengthenBean, runnable));proxy.setSuperScheduledName(name);//增強(qiáng)器的代理對象緩存到list中points.add(proxy);}//將增強(qiáng)器代理實(shí)例的集合生成調(diào)用鏈//執(zhí)行控制器中設(shè)置調(diào)用鏈runnable.setChain(new Chain(points));
推薦閱讀 1、Spring Boot+Vue項(xiàng)目實(shí)戰(zhàn)
楠哥簡介
資深 Java 工程師,微信號(hào) nnsouthwind
《Java零基礎(chǔ)實(shí)戰(zhàn)》一書作者
騰訊課程官方 Java 面試官,今日頭條認(rèn)證大V
GitChat認(rèn)證作者,B站認(rèn)證UP主(楠哥教你學(xué)Java)
致力于幫助萬千 Java 學(xué)習(xí)者持續(xù)成長。
有收獲,就點(diǎn)個(gè)在看?



