Spring 官方批處理框架真香!Spring 全家桶永遠滴神!
假期余額嚴(yán)重不足,難受呀!今天早上 8 點半出門坐車,晚上 9 點才到家,差不多坐了一天車才到家,屁股都坐疼了......
推薦一個很多小伙伴沒注意到的 Spring 官方的批處理框架。
Spring Batch 是一個輕量級但功能又十分全面的批處理框架,主要用于批處理場景比如從數(shù)據(jù)庫、文件或隊列中讀取大量記錄。不過,需要注意的是:Spring Batch 不是調(diào)度框架。商業(yè)和開源領(lǐng)域都有許多優(yōu)秀的企業(yè)調(diào)度框架比如 Quartz、XXL-JOB、Elastic-Job。它旨在與調(diào)度程序一起工作,而不是取代調(diào)度程序。
目前,Spring Batch 也已經(jīng)被收錄進了開源項目 awesome-java (非常棒的 Java 開源項目集合)。
項目地址:https://github.com/CodingDocs/awesome-java

關(guān)于 Spring Batch 的詳細介紹可以參考 Spring Batch 官方文檔[1],入門教程可以參考下面的內(nèi)容,原文地址:https://mrbird.cc/Spring-Batch 入門.html?。
項目搭建
新建一個 Spring Boot 項目,版本為 2.2.4.RELEASE,artifactId 為 spring-batch-start,項目結(jié)構(gòu)如下圖所示:

然后在 pom.xml 中引入 Spring Batch、MySQL 和 JDBC 依賴,引入后 pom.xml 內(nèi)容如下所示:
<project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0modelVersion>
????<parent>
????????<groupId>org.springframework.bootgroupId>
????????<artifactId>spring-boot-starter-parentartifactId>
????????<version>2.2.5.RELEASEversion>
????????<relativePath/>?
????parent>
????<groupId>cc.mrbirdgroupId>
????<artifactId>spring-batch-startartifactId>
????<version>0.0.1-SNAPSHOTversion>
????<name>spring-batch-startname>
????<description>Demo?project?for?Spring?Bootdescription>
????<properties>
????????<java.version>1.8java.version>
????properties>
????<dependencies>
????????<dependency>
????????????<groupId>org.springframework.bootgroupId>
????????????<artifactId>spring-boot-starter-batchartifactId>
????????dependency>
????????<dependency>
????????????<groupId>mysqlgroupId>
????????????<artifactId>mysql-connector-javaartifactId>
????????dependency>
????????<dependency>
????????????<groupId>org.springframework.bootgroupId>
????????????<artifactId>spring-boot-starter-jdbcartifactId>
????????dependency>
????dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.springframework.bootgroupId>
????????????????<artifactId>spring-boot-maven-pluginartifactId>
????????????plugin>
????????plugins>
????build>
project>
在編寫代碼之前,我們先來簡單了解下 Spring Batch 的組成:

Spring Batch 里最基本的單元就是任務(wù) Job,一個 Job 由若干個步驟 Step 組成。 任務(wù)啟動器 Job Launcher 負(fù)責(zé)運行 Job。 任務(wù)存儲倉庫 Job Repository 存儲著 Job 的執(zhí)行狀態(tài),參數(shù)和日志等信息。Job 處理任務(wù)又可以分為三大類: 數(shù)據(jù)讀取 Item Reader 數(shù)據(jù)中間處理 Item Processor 數(shù)據(jù)輸出 Item Writer。
任務(wù)存儲倉庫可以是關(guān)系型數(shù)據(jù)庫 MySQL,非關(guān)系型數(shù)據(jù)庫 MongoDB 或者直接存儲在內(nèi)存中,本篇使用的是 MySQL 作為任務(wù)存儲倉庫。
新建一個名稱為 springbatch 的 MySQL 數(shù)據(jù)庫,然后導(dǎo)入 org.springframework.batch.core 目錄下的 schema-mysql.sql 文件:


導(dǎo)入后,庫表如下圖所示:

然后在項目的配置文件 application.yml 里添加 MySQL 相關(guān)配置:
spring:
??datasource:
????driver-class-name:?com.mysql.cj.jdbc.Driver
????url:?jdbc:mysql://127.0.0.1:3306/springbatch
????username:?root
????password:?123456
接著在 Spring Boot 的入口類上添加 @EnableBatchProcessing 注解,表示開啟 Spring Batch 批處理功能:
@SpringBootApplication
@EnableBatchProcessing
public?class?SpringBatchStartApplication?{
????public?static?void?main(String[]?args)?{
????????SpringApplication.run(SpringBatchStartApplication.class,?args);
????}
}
至此,基本框架搭建好了,下面開始配置一個簡單的任務(wù)。
編寫第一個任務(wù)
在 cc.mrbird.batch 目錄下新建 job 包,然后在該包下新建一個 FirstJobDemo 類,代碼如下所示:
@Component
public?class?FirstJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?firstJob()?{
????????return?jobBuilderFactory.get("firstJob")
????????????????.start(step())
????????????????.build();
????}
????private?Step?step()?{
????????return?stepBuilderFactory.get("step")
????????????????.tasklet((contribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟....");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
}
上面代碼中,我們注入了JobBuilderFactory任務(wù)創(chuàng)建工廠和StepBuilderFactory步驟創(chuàng)建工廠,分別用于創(chuàng)建任務(wù) Job 和步驟 Step。JobBuilderFactory的get方法用于創(chuàng)建一個指定名稱的任務(wù),start方法指定任務(wù)的開始步驟,步驟通過StepBuilderFactory構(gòu)建。
步驟 Step 由若干個小任務(wù) Tasklet 組成,所以我們通過tasklet方法創(chuàng)建。tasklet方法接收一個Tasklet類型參數(shù),Tasklet是一個函數(shù)是接口,源碼如下:
public?interface?Tasklet?{
????@Nullable
????RepeatStatus?execute(StepContribution?contribution,?ChunkContext?chunkContext)?throws?Exception;
}
所以我們可以使用 lambda 表達式創(chuàng)建一個匿名實現(xiàn):
(contribution,?chunkContext)?->?{
????System.out.println("執(zhí)行步驟....");
????return?RepeatStatus.FINISHED;
}
該匿名實現(xiàn)必須返回一個明確的執(zhí)行狀態(tài),這里返回RepeatStatus.FINISHED表示該小任務(wù)執(zhí)行成功,正常結(jié)束。
此外,需要注意的是,我們配置的任務(wù) Job 必須注冊到 Spring IOC 容器中,并且任務(wù)的名稱和步驟的名稱組成唯一。比如上面的例子,我們的任務(wù)名稱為 firstJob,步驟的名稱為 step,如果存在別的任務(wù)和步驟組合也叫這個名稱的話,則會執(zhí)行失敗。
啟動項目,控制臺打印日志如下:
...
2020-03-06?11:01:11.785??INFO?17324?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=firstJob]]?launched?with?the?following?parameters:?[{}]
2020-03-06?11:01:11.846??INFO?17324?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step]
執(zhí)行步驟....
2020-03-06?11:01:11.886??INFO?17324?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step]?executed?in?40ms
2020-03-06?11:01:11.909??INFO?17324?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=firstJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?101ms
可以看到,任務(wù)成功執(zhí)行了,數(shù)據(jù)庫的庫表也將記錄相關(guān)運行日志。
重新啟動項目,控制臺并不會再次打印出任務(wù)執(zhí)行日志,因為 Job 名稱和 Step 名稱組成唯一,執(zhí)行完的不可重復(fù)的任務(wù),不會再次執(zhí)行。
多步驟任務(wù)
一個復(fù)雜的任務(wù)一般包含多個步驟,下面舉個多步驟任務(wù)的例子。在 job 包下新建MultiStepJobDemo類:
@Component
public?class?MultiStepJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?multiStepJob()?{
????????return?jobBuilderFactory.get("multiStepJob")
????????????????.start(step1())
????????????????.next(step2())
????????????????.next(step3())
????????????????.build();
????}
????private?Step?step1()?{
????????return?stepBuilderFactory.get("step1")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟一操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step2()?{
????????return?stepBuilderFactory.get("step2")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟二操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step3()?{
????????return?stepBuilderFactory.get("step3")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟三操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
}
上面代碼中,我們通過step1()、step2()和step3()三個方法創(chuàng)建了三個步驟。Job 里要使用這些步驟,只需要通過JobBuilderFactory的start方法指定第一個步驟,然后通過next方法不斷地指定下一個步驟即可。
啟動項目,控制臺打印日志如下:
2020-03-06?13:52:52.188??INFO?18472?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=multiStepJob]]?launched?with?the?following?parameters:?[{}]
2020-03-06?13:52:52.222??INFO?18472?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
執(zhí)行步驟一操作。。。
2020-03-06?13:52:52.251??INFO?18472?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?29ms
2020-03-06?13:52:52.292??INFO?18472?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
執(zhí)行步驟二操作。。。
2020-03-06?13:52:52.323??INFO?18472?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?30ms
2020-03-06?13:52:52.375??INFO?18472?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
執(zhí)行步驟三操作。。。
2020-03-06?13:52:52.405??INFO?18472?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?29ms
2020-03-06?13:52:52.428??INFO?18472?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=multiStepJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?231ms
三個步驟依次執(zhí)行成功。
多個步驟在執(zhí)行過程中也可以通過上一個步驟的執(zhí)行狀態(tài)來決定是否執(zhí)行下一個步驟,修改上面的代碼:
@Component
public?class?MultiStepJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?multiStepJob()?{
????????return?jobBuilderFactory.get("multiStepJob2")
????????????????.start(step1())
????????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step2())
????????????????.from(step2())
????????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step3())
????????????????.from(step3()).end()
????????????????.build();
????}
????private?Step?step1()?{
????????return?stepBuilderFactory.get("step1")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟一操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step2()?{
????????return?stepBuilderFactory.get("step2")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟二操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step3()?{
????????return?stepBuilderFactory.get("step3")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟三操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
}
multiStepJob()方法的含義是:multiStepJob2 任務(wù)先執(zhí)行 step1,當(dāng) step1 狀態(tài)為完成時,接著執(zhí)行 step2,當(dāng) step2 的狀態(tài)為完成時,接著執(zhí)行 step3。ExitStatus.COMPLETED常量表示任務(wù)順利執(zhí)行完畢,正常退出,該類還包含以下幾種退出狀態(tài):
public?class?ExitStatus?implements?Serializable,?Comparable<ExitStatus>?{
????/**
?????*?Convenient?constant?value?representing?unknown?state?-?assumed?not
?????*?continuable.
?????*/
????public?static?final?ExitStatus?UNKNOWN?=?new?ExitStatus("UNKNOWN");
????/**
?????*?Convenient?constant?value?representing?continuable?state?where?processing
?????*?is?still?taking?place,?so?no?further?action?is?required.?Used?for
?????*?asynchronous?execution?scenarios?where?the?processing?is?happening?in
?????*?another?thread?or?process?and?the?caller?is?not?required?to?wait?for?the
?????*?result.
?????*/
????public?static?final?ExitStatus?EXECUTING?=?new?ExitStatus("EXECUTING");
????/**
?????*?Convenient?constant?value?representing?finished?processing.
?????*/
????public?static?final?ExitStatus?COMPLETED?=?new?ExitStatus("COMPLETED");
????/**
?????*?Convenient?constant?value?representing?job?that?did?no?processing?(e.g.
?????*?because?it?was?already?complete).
?????*/
????public?static?final?ExitStatus?NOOP?=?new?ExitStatus("NOOP");
????/**
?????*?Convenient?constant?value?representing?finished?processing?with?an?error.
?????*/
????public?static?final?ExitStatus?FAILED?=?new?ExitStatus("FAILED");
????/**
?????*?Convenient?constant?value?representing?finished?processing?with
?????*?interrupted?status.
?????*/
????public?static?final?ExitStatus?STOPPED?=?new?ExitStatus("STOPPED");
????...
}
啟動項目,控制臺日志打印如下:
2020-03-06?14:21:49.384??INFO?18745?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=multiStepJob2]]?launched?with?the?following?parameters:?[{}]
2020-03-06?14:21:49.427??INFO?18745?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
執(zhí)行步驟一操作。。。
2020-03-06?14:21:49.456??INFO?18745?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?29ms
2020-03-06?14:21:49.501??INFO?18745?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
執(zhí)行步驟二操作。。。
2020-03-06?14:21:49.527??INFO?18745?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?26ms
2020-03-06?14:21:49.576??INFO?18745?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
執(zhí)行步驟三操作。。。
2020-03-06?14:21:49.604??INFO?18745?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?28ms
2020-03-06?14:21:49.629??INFO?18745?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=multiStepJob2]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?238ms
Flow 的用法
Flow 的作用就是可以將多個步驟 Step 組合在一起然后再組裝到任務(wù) Job 中。舉個 Flow 的例子,在 job 包下新建FlowJobDemo類:
@Component
public?class?FlowJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?flowJob()?{
????????return?jobBuilderFactory.get("flowJob")
????????????????.start(flow())
????????????????.next(step3())
????????????????.end()
????????????????.build();
????}
????private?Step?step1()?{
????????return?stepBuilderFactory.get("step1")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟一操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step2()?{
????????return?stepBuilderFactory.get("step2")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟二操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step3()?{
????????return?stepBuilderFactory.get("step3")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟三操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????//?創(chuàng)建一個flow對象,包含若干個step
????private?Flow?flow()?{
????????return?new?FlowBuilder("flow")
????????????????.start(step1())
????????????????.next(step2())
????????????????.build();
????}
}
上面代碼中,我們通過FlowBuilder將 step1 和 step2 組合在一起,創(chuàng)建了一個名為 flow 的 Flow,然后再將其賦給任務(wù) Job。使用 Flow 和 Step 構(gòu)建 Job 的區(qū)別是,Job 流程中包含 Flow 類型的時候需要在build()方法前調(diào)用end()方法。
啟動程序,控制臺日志打印如下:
2020-03-06?14:36:42.621??INFO?18865?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=flowJob]]?launched?with?the?following?parameters:?[{}]
2020-03-06?14:36:42.667??INFO?18865?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
執(zhí)行步驟一操作。。。
2020-03-06?14:36:42.697??INFO?18865?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?30ms
2020-03-06?14:36:42.744??INFO?18865?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
執(zhí)行步驟二操作。。。
2020-03-06?14:36:42.771??INFO?18865?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?27ms
2020-03-06?14:36:42.824??INFO?18865?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
執(zhí)行步驟三操作。。。
2020-03-06?14:36:42.850??INFO?18865?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?25ms
2020-03-06?14:36:42.874??INFO?18865?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=flowJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?245ms
并行執(zhí)行
任務(wù)中的步驟除了可以串行執(zhí)行(一個接著一個執(zhí)行)外,還可以并行執(zhí)行,并行執(zhí)行在特定的業(yè)務(wù)需求下可以提供任務(wù)執(zhí)行效率。
將任務(wù)并行化只需兩個簡單步驟:
將步驟 Step 轉(zhuǎn)換為 Flow; 任務(wù) Job 中指定并行 Flow。
舉個例子,在 job 包下新建SplitJobDemo類:
@Component
public?class?SplitJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Bean
????public?Job?splitJob()?{
????????return?jobBuilderFactory.get("splitJob")
????????????????.start(flow1())
????????????????.split(new?SimpleAsyncTaskExecutor()).add(flow2())
????????????????.end()
????????????????.build();
????}
????private?Step?step1()?{
????????return?stepBuilderFactory.get("step1")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟一操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step2()?{
????????return?stepBuilderFactory.get("step2")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟二操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step3()?{
????????return?stepBuilderFactory.get("step3")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟三操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Flow?flow1()?{
????????return?new?FlowBuilder("flow1")
????????????????.start(step1())
????????????????.next(step2())
????????????????.build();
????}
????private?Flow?flow2()?{
????????return?new?FlowBuilder("flow2")
????????????????.start(step3())
????????????????.build();
????}
}
上面例子中,我們創(chuàng)建了兩個 Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通過JobBuilderFactory的split方法,指定一個異步執(zhí)行器,將 flow1 和 flow2 異步執(zhí)行(也就是并行)。
啟動項目,控制臺日志打印如下:
2020-03-06?15:25:43.602??INFO?19449?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=splitJob]]?launched?with?the?following?parameters:?[{}]
2020-03-06?15:25:43.643??INFO?19449?---?[cTaskExecutor-1]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
2020-03-06?15:25:43.650??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
執(zhí)行步驟三操作。。。
執(zhí)行步驟一操作。。。
2020-03-06?15:25:43.673??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?23ms
2020-03-06?15:25:43.674??INFO?19449?---?[cTaskExecutor-1]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?31ms
2020-03-06?15:25:43.714??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
執(zhí)行步驟二操作。。。
2020-03-06?15:25:43.738??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?24ms
2020-03-06?15:25:43.758??INFO?19449?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=splitJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?146ms
可以看到 step3 并沒有在 step2 后才執(zhí)行,說明步驟已經(jīng)是并行化的(開啟并行化后,并行的步驟執(zhí)行順序并不能 100%確定,因為線程調(diào)度具有不確定性)。
任務(wù)決策器
決策器的作用就是可以指定程序在不同的情況下運行不同的任務(wù)流程,比如今天是周末,則讓任務(wù)執(zhí)行 step1 和 step2,如果是工作日,則之心 step1 和 step3。
使用決策器前,我們需要自定義一個決策器的實現(xiàn)。在 cc.mrbird.batch 包下新建 decider 包,然后創(chuàng)建MyDecider類,實現(xiàn)JobExecutionDecider接口:
@Component
public?class?MyDecider?implements?JobExecutionDecider?{
????@Override
????public?FlowExecutionStatus?decide(JobExecution?jobExecution,?StepExecution?stepExecution)?{
????????LocalDate?now?=?LocalDate.now();
????????DayOfWeek?dayOfWeek?=?now.getDayOfWeek();
????????if?(dayOfWeek?==?DayOfWeek.SATURDAY?||?dayOfWeek?==?DayOfWeek.SUNDAY)?{
????????????return?new?FlowExecutionStatus("weekend");
????????}?else?{
????????????return?new?FlowExecutionStatus("workingDay");
????????}
????}
}
MyDecider實現(xiàn)JobExecutionDecider接口的decide方法,該方法返回FlowExecutionStatus。上面的邏輯是:判斷今天是否是周末,如果是,返回FlowExecutionStatus("weekend")狀態(tài),否則返回FlowExecutionStatus("workingDay")狀態(tài)。
下面演示如何在任務(wù) Job 里使用決策器。在 job 包下新建DeciderJobDemo:
@Component
public?class?DeciderJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Autowired
????private?MyDecider?myDecider;
????@Bean
????public?Job?deciderJob()?{
????????return?jobBuilderFactory.get("deciderJob")
????????????????.start(step1())
????????????????.next(myDecider)
????????????????.from(myDecider).on("weekend").to(step2())
????????????????.from(myDecider).on("workingDay").to(step3())
????????????????.from(step3()).on("*").to(step4())
????????????????.end()
????????????????.build();
????}
????private?Step?step1()?{
????????return?stepBuilderFactory.get("step1")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟一操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step2()?{
????????return?stepBuilderFactory.get("step2")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟二操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step3()?{
????????return?stepBuilderFactory.get("step3")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟三操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
????private?Step?step4()?{
????????return?stepBuilderFactory.get("step4")
????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????System.out.println("執(zhí)行步驟四操作。。。");
????????????????????return?RepeatStatus.FINISHED;
????????????????}).build();
????}
上面代碼中,我們注入了自定義決策器MyDecider,然后在jobDecider()方法里使用了該決策器:
@Bean
public?Job?deciderJob()?{
????return?jobBuilderFactory.get("deciderJob")
????????????.start(step1())
????????????.next(myDecider)
????????????.from(myDecider).on("weekend").to(step2())
????????????.from(myDecider).on("workingDay").to(step3())
????????????.from(step3()).on("*").to(step4())
????????????.end()
????????????.build();
}
這段代碼的含義是:任務(wù) deciderJob 首先執(zhí)行 step1,然后指定自定義決策器,如果決策器返回 weekend,那么執(zhí)行 step2,如果決策器返回 workingDay,那么執(zhí)行 step3。如果執(zhí)行了 step3,那么無論 step3 的結(jié)果是什么,都將執(zhí)行 step4。
啟動項目,控制臺輸出如下所示:
2020-03-06?16:09:10.541??INFO?19873?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=deciderJob]]?launched?with?the?following?parameters:?[{}]
2020-03-06?16:09:10.609??INFO?19873?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
執(zhí)行步驟一操作。。。
2020-03-06?16:09:10.641??INFO?19873?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?32ms
2020-03-06?16:09:10.692??INFO?19873?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
執(zhí)行步驟三操作。。。
2020-03-06?16:09:10.723??INFO?19873?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?31ms
2020-03-06?16:09:10.769??INFO?19873?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step4]
執(zhí)行步驟四操作。。。
2020-03-06?16:09:10.797??INFO?19873?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step4]?executed?in?27ms
2020-03-06?16:09:10.818??INFO?19873?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=deciderJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?256ms
因為今天是 2020 年 03 月 06 日星期五,是工作日,所以任務(wù)執(zhí)行了 step1、step3 和 step4。
任務(wù)嵌套
任務(wù) Job 除了可以由 Step 或者 Flow 構(gòu)成外,我們還可以將多個任務(wù) Job 轉(zhuǎn)換為特殊的 Step,然后再賦給另一個任務(wù) Job,這就是任務(wù)的嵌套。
舉個例子,在 job 包下新建NestedJobDemo類:
@Component
public?class?NestedJobDemo?{
????@Autowired
????private?JobBuilderFactory?jobBuilderFactory;
????@Autowired
????private?StepBuilderFactory?stepBuilderFactory;
????@Autowired
????private?JobLauncher?jobLauncher;
????@Autowired
????private?JobRepository?jobRepository;
????@Autowired
????private?PlatformTransactionManager?platformTransactionManager;
????//?父任務(wù)
????@Bean
????public?Job?parentJob()?{
????????return?jobBuilderFactory.get("parentJob")
????????????????.start(childJobOneStep())
????????????????.next(childJobTwoStep())
????????????????.build();
????}
????//?將任務(wù)轉(zhuǎn)換為特殊的步驟
????private?Step?childJobOneStep()?{
????????return?new?JobStepBuilder(new?StepBuilder("childJobOneStep"))
????????????????.job(childJobOne())
????????????????.launcher(jobLauncher)
????????????????.repository(jobRepository)
????????????????.transactionManager(platformTransactionManager)
????????????????.build();
????}
????//?將任務(wù)轉(zhuǎn)換為特殊的步驟
????private?Step?childJobTwoStep()?{
????????return?new?JobStepBuilder(new?StepBuilder("childJobTwoStep"))
????????????????.job(childJobTwo())
????????????????.launcher(jobLauncher)
????????????????.repository(jobRepository)
????????????????.transactionManager(platformTransactionManager)
????????????????.build();
????}
????//?子任務(wù)一
????private?Job?childJobOne()?{
????????return?jobBuilderFactory.get("childJobOne")
????????????????.start(
????????????????????stepBuilderFactory.get("childJobOneStep")
????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????????????????System.out.println("子任務(wù)一執(zhí)行步驟。。。");
????????????????????????????????return?RepeatStatus.FINISHED;
????????????????????????????}).build()
????????????????).build();
????}
????//?子任務(wù)二
????private?Job?childJobTwo()?{
????????return?jobBuilderFactory.get("childJobTwo")
????????????????.start(
????????????????????stepBuilderFactory.get("childJobTwoStep")
????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{
????????????????????????????????System.out.println("子任務(wù)二執(zhí)行步驟。。。");
????????????????????????????????return?RepeatStatus.FINISHED;
????????????????????????????}).build()
????????????????).build();
????}
}
上面代碼中,我們通過childJobOne()和childJobTwo()方法創(chuàng)建了兩個任務(wù) Job,這里沒什么好說的,前面都介紹過。關(guān)鍵在于childJobOneStep()方法和childJobTwoStep()方法。在childJobOneStep()方法中,我們通過JobStepBuilder構(gòu)建了一個名稱為childJobOneStep的 Step,顧名思義,它是一個任務(wù)型 Step 的構(gòu)造工廠,可以將任務(wù)轉(zhuǎn)換為“特殊”的步驟。在構(gòu)建過程中,我們還需要傳入任務(wù)執(zhí)行器 JobLauncher、任務(wù)倉庫 JobRepository 和事務(wù)管理器 PlatformTransactionManager。
將任務(wù)轉(zhuǎn)換為特殊的步驟后,將其賦給父任務(wù) parentJob 即可,流程和前面介紹的一致。
配置好后,啟動項目,控制臺輸出如下所示:
2020-03-06?16:58:39.771??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=parentJob]]?launched?with?the?following?parameters:?[{}]
2020-03-06?16:58:39.812??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobOneStep]
2020-03-06?16:58:39.866??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobOne]]?launched?with?the?following?parameters:?[{}]
2020-03-06?16:58:39.908??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobOneStep]
子任務(wù)一執(zhí)行步驟。。。
2020-03-06?16:58:39.940??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobOneStep]?executed?in?32ms
2020-03-06?16:58:39.960??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobOne]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?86ms
2020-03-06?16:58:39.983??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobOneStep]?executed?in?171ms
2020-03-06?16:58:40.019??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobTwoStep]
2020-03-06?16:58:40.067??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobTwo]]?launched?with?the?following?parameters:?[{}]
2020-03-06?16:58:40.102??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobTwoStep]
子任務(wù)二執(zhí)行步驟。。。
2020-03-06?16:58:40.130??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobTwoStep]?executed?in?28ms
2020-03-06?16:58:40.152??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobTwo]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?75ms
2020-03-06?16:58:40.157??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobTwoStep]?executed?in?138ms
2020-03-06?16:58:40.177??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=parentJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?398ms
本節(jié)源碼鏈接:https://github.com/wuyouzhuguli/SpringAll/tree/master/67.spring-batch-start。
參考資料
Spring Batch 官方文檔: https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/spring-batch-intro.html#spring-batch-intro
[2]https://mrbird.cc/Spring-Batch 入門.html: https://mrbird.cc/Spring-Batch入門.html
我是 Guide哥,一個工作2年有余,接觸編程已經(jīng)6年有余的程序員。大三開源 JavaGuide,目前已經(jīng) 100k+ Star。未來幾年,希望持續(xù)完善 JavaGuide,爭取能夠幫助更多學(xué)習(xí) Java 的小伙伴!共勉!凎!點擊即可了解我的個人經(jīng)歷。
歡迎點贊分享。咱們下期再會!
