<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Spring 官方批處理框架真香!Spring 全家桶永遠滴神!

          共 26945字,需瀏覽 54分鐘

           ·

          2021-10-08 20:22

          假期余額嚴(yán)重不足,難受呀!今天早上 8 點半出門坐車,晚上 9 點才到家,差不多坐了一天車才到家,屁股都坐疼了......

          推薦一個很多小伙伴沒注意到的 Spring 官方的批處理框架。

          Spring Batch 是一個輕量級但功能又十分全面的批處理框架,主要用于批處理場景比如從數(shù)據(jù)庫、文件或隊列中讀取大量記錄。不過,需要注意的是:Spring Batch 不是調(diào)度框架。商業(yè)和開源領(lǐng)域都有許多優(yōu)秀的企業(yè)調(diào)度框架比如 Quartz、XXL-JOB、Elastic-Job。它旨在與調(diào)度程序一起工作,而不是取代調(diào)度程序。

          目前,Spring Batch 也已經(jīng)被收錄進了開源項目 awesome-java (非常棒的 Java 開源項目集合)。

          項目地址:https://github.com/CodingDocs/awesome-java

          關(guān)于 Spring Batch 的詳細介紹可以參考 Spring Batch 官方文檔[1],入門教程可以參考下面的內(nèi)容,原文地址:https://mrbird.cc/Spring-Batch 入門.html?。

          項目搭建

          新建一個 Spring Boot 項目,版本為 2.2.4.RELEASE,artifactId 為 spring-batch-start,項目結(jié)構(gòu)如下圖所示:

          然后在 pom.xml 中引入 Spring Batch、MySQL 和 JDBC 依賴,引入后 pom.xml 內(nèi)容如下所示:


          <project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          ?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd">

          ????<modelVersion>4.0.0modelVersion>
          ????<parent>
          ????????<groupId>org.springframework.bootgroupId>
          ????????<artifactId>spring-boot-starter-parentartifactId>
          ????????<version>2.2.5.RELEASEversion>
          ????????<relativePath/>?
          ????parent>
          ????<groupId>cc.mrbirdgroupId>
          ????<artifactId>spring-batch-startartifactId>
          ????<version>0.0.1-SNAPSHOTversion>
          ????<name>spring-batch-startname>
          ????<description>Demo?project?for?Spring?Bootdescription>

          ????<properties>
          ????????<java.version>1.8java.version>
          ????properties>

          ????<dependencies>
          ????????<dependency>
          ????????????<groupId>org.springframework.bootgroupId>
          ????????????<artifactId>spring-boot-starter-batchartifactId>
          ????????dependency>

          ????????<dependency>
          ????????????<groupId>mysqlgroupId>
          ????????????<artifactId>mysql-connector-javaartifactId>
          ????????dependency>
          ????????<dependency>
          ????????????<groupId>org.springframework.bootgroupId>
          ????????????<artifactId>spring-boot-starter-jdbcartifactId>
          ????????dependency>
          ????dependencies>

          ????<build>
          ????????<plugins>
          ????????????<plugin>
          ????????????????<groupId>org.springframework.bootgroupId>
          ????????????????<artifactId>spring-boot-maven-pluginartifactId>
          ????????????plugin>
          ????????plugins>
          ????build>
          project>

          在編寫代碼之前,我們先來簡單了解下 Spring Batch 的組成:


          • Spring Batch 里最基本的單元就是任務(wù) Job,一個 Job 由若干個步驟 Step 組成。
          • 任務(wù)啟動器 Job Launcher 負(fù)責(zé)運行 Job。
          • 任務(wù)存儲倉庫 Job Repository 存儲著 Job 的執(zhí)行狀態(tài),參數(shù)和日志等信息。Job 處理任務(wù)又可以分為三大類:
            • 數(shù)據(jù)讀取 Item Reader
            • 數(shù)據(jù)中間處理 Item Processor
            • 數(shù)據(jù)輸出 Item Writer。

          任務(wù)存儲倉庫可以是關(guān)系型數(shù)據(jù)庫 MySQL,非關(guān)系型數(shù)據(jù)庫 MongoDB 或者直接存儲在內(nèi)存中,本篇使用的是 MySQL 作為任務(wù)存儲倉庫。

          新建一個名稱為 springbatch 的 MySQL 數(shù)據(jù)庫,然后導(dǎo)入 org.springframework.batch.core 目錄下的 schema-mysql.sql 文件:

          導(dǎo)入后,庫表如下圖所示:


          然后在項目的配置文件 application.yml 里添加 MySQL 相關(guān)配置:

          spring:
          ??datasource:
          ????driver-class-name:?com.mysql.cj.jdbc.Driver
          ????url:?jdbc:mysql://127.0.0.1:3306/springbatch
          ????username:?root
          ????password:?123456

          接著在 Spring Boot 的入口類上添加 @EnableBatchProcessing 注解,表示開啟 Spring Batch 批處理功能:

          @SpringBootApplication
          @EnableBatchProcessing
          public?class?SpringBatchStartApplication?{
          ????public?static?void?main(String[]?args)?{
          ????????SpringApplication.run(SpringBatchStartApplication.class,?args);
          ????}
          }

          至此,基本框架搭建好了,下面開始配置一個簡單的任務(wù)。

          編寫第一個任務(wù)

          cc.mrbird.batch 目錄下新建 job 包,然后在該包下新建一個 FirstJobDemo 類,代碼如下所示:

          @Component
          public?class?FirstJobDemo?{

          ????@Autowired
          ????private?JobBuilderFactory?jobBuilderFactory;
          ????@Autowired
          ????private?StepBuilderFactory?stepBuilderFactory;

          ????@Bean
          ????public?Job?firstJob()?{
          ????????return?jobBuilderFactory.get("firstJob")
          ????????????????.start(step())
          ????????????????.build();
          ????}

          ????private?Step?step()?{
          ????????return?stepBuilderFactory.get("step")
          ????????????????.tasklet((contribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟....");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}
          }

          上面代碼中,我們注入了JobBuilderFactory任務(wù)創(chuàng)建工廠和StepBuilderFactory步驟創(chuàng)建工廠,分別用于創(chuàng)建任務(wù) Job 和步驟 Step。JobBuilderFactoryget方法用于創(chuàng)建一個指定名稱的任務(wù),start方法指定任務(wù)的開始步驟,步驟通過StepBuilderFactory構(gòu)建。

          步驟 Step 由若干個小任務(wù) Tasklet 組成,所以我們通過tasklet方法創(chuàng)建。tasklet方法接收一個Tasklet類型參數(shù),Tasklet是一個函數(shù)是接口,源碼如下:

          public?interface?Tasklet?{
          ????@Nullable
          ????RepeatStatus?execute(StepContribution?contribution,?ChunkContext?chunkContext)?throws?Exception;
          }

          所以我們可以使用 lambda 表達式創(chuàng)建一個匿名實現(xiàn):

          (contribution,?chunkContext)?->?{
          ????System.out.println("執(zhí)行步驟....");
          ????return?RepeatStatus.FINISHED;
          }

          該匿名實現(xiàn)必須返回一個明確的執(zhí)行狀態(tài),這里返回RepeatStatus.FINISHED表示該小任務(wù)執(zhí)行成功,正常結(jié)束。

          此外,需要注意的是,我們配置的任務(wù) Job 必須注冊到 Spring IOC 容器中,并且任務(wù)的名稱和步驟的名稱組成唯一。比如上面的例子,我們的任務(wù)名稱為 firstJob,步驟的名稱為 step,如果存在別的任務(wù)和步驟組合也叫這個名稱的話,則會執(zhí)行失敗。

          啟動項目,控制臺打印日志如下:

          ...
          2020-03-06?11:01:11.785??INFO?17324?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=firstJob]]?launched?with?the?following?parameters:?[{}]
          2020-03-06?11:01:11.846??INFO?17324?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step]
          執(zhí)行步驟....
          2020-03-06?11:01:11.886??INFO?17324?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step]?executed?in?40ms
          2020-03-06?11:01:11.909??INFO?17324?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=firstJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?101ms

          可以看到,任務(wù)成功執(zhí)行了,數(shù)據(jù)庫的庫表也將記錄相關(guān)運行日志。

          重新啟動項目,控制臺并不會再次打印出任務(wù)執(zhí)行日志,因為 Job 名稱和 Step 名稱組成唯一,執(zhí)行完的不可重復(fù)的任務(wù),不會再次執(zhí)行。

          多步驟任務(wù)

          一個復(fù)雜的任務(wù)一般包含多個步驟,下面舉個多步驟任務(wù)的例子。在 job 包下新建MultiStepJobDemo類:

          @Component
          public?class?MultiStepJobDemo?{

          ????@Autowired
          ????private?JobBuilderFactory?jobBuilderFactory;
          ????@Autowired
          ????private?StepBuilderFactory?stepBuilderFactory;

          ????@Bean
          ????public?Job?multiStepJob()?{
          ????????return?jobBuilderFactory.get("multiStepJob")
          ????????????????.start(step1())
          ????????????????.next(step2())
          ????????????????.next(step3())
          ????????????????.build();
          ????}

          ????private?Step?step1()?{
          ????????return?stepBuilderFactory.get("step1")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟一操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Step?step2()?{
          ????????return?stepBuilderFactory.get("step2")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟二操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Step?step3()?{
          ????????return?stepBuilderFactory.get("step3")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟三操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}
          }

          上面代碼中,我們通過step1()step2()step3()三個方法創(chuàng)建了三個步驟。Job 里要使用這些步驟,只需要通過JobBuilderFactorystart方法指定第一個步驟,然后通過next方法不斷地指定下一個步驟即可。

          啟動項目,控制臺打印日志如下:

          2020-03-06?13:52:52.188??INFO?18472?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=multiStepJob]]?launched?with?the?following?parameters:?[{}]
          2020-03-06?13:52:52.222??INFO?18472?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
          執(zhí)行步驟一操作。。。
          2020-03-06?13:52:52.251??INFO?18472?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?29ms
          2020-03-06?13:52:52.292??INFO?18472?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
          執(zhí)行步驟二操作。。。
          2020-03-06?13:52:52.323??INFO?18472?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?30ms
          2020-03-06?13:52:52.375??INFO?18472?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
          執(zhí)行步驟三操作。。。
          2020-03-06?13:52:52.405??INFO?18472?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?29ms
          2020-03-06?13:52:52.428??INFO?18472?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=multiStepJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?231ms

          三個步驟依次執(zhí)行成功。

          多個步驟在執(zhí)行過程中也可以通過上一個步驟的執(zhí)行狀態(tài)來決定是否執(zhí)行下一個步驟,修改上面的代碼:

          @Component
          public?class?MultiStepJobDemo?{

          ????@Autowired
          ????private?JobBuilderFactory?jobBuilderFactory;
          ????@Autowired
          ????private?StepBuilderFactory?stepBuilderFactory;

          ????@Bean
          ????public?Job?multiStepJob()?{
          ????????return?jobBuilderFactory.get("multiStepJob2")
          ????????????????.start(step1())
          ????????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step2())
          ????????????????.from(step2())
          ????????????????.on(ExitStatus.COMPLETED.getExitCode()).to(step3())
          ????????????????.from(step3()).end()
          ????????????????.build();
          ????}

          ????private?Step?step1()?{
          ????????return?stepBuilderFactory.get("step1")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟一操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Step?step2()?{
          ????????return?stepBuilderFactory.get("step2")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟二操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Step?step3()?{
          ????????return?stepBuilderFactory.get("step3")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟三操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}
          }

          multiStepJob()方法的含義是:multiStepJob2 任務(wù)先執(zhí)行 step1,當(dāng) step1 狀態(tài)為完成時,接著執(zhí)行 step2,當(dāng) step2 的狀態(tài)為完成時,接著執(zhí)行 step3。ExitStatus.COMPLETED常量表示任務(wù)順利執(zhí)行完畢,正常退出,該類還包含以下幾種退出狀態(tài):

          public?class?ExitStatus?implements?Serializable,?Comparable<ExitStatus>?{

          ????/**
          ?????*?Convenient?constant?value?representing?unknown?state?-?assumed?not
          ?????*?continuable.
          ?????*/

          ????public?static?final?ExitStatus?UNKNOWN?=?new?ExitStatus("UNKNOWN");

          ????/**
          ?????*?Convenient?constant?value?representing?continuable?state?where?processing
          ?????*?is?still?taking?place,?so?no?further?action?is?required.?Used?for
          ?????*?asynchronous?execution?scenarios?where?the?processing?is?happening?in
          ?????*?another?thread?or?process?and?the?caller?is?not?required?to?wait?for?the
          ?????*?result.
          ?????*/

          ????public?static?final?ExitStatus?EXECUTING?=?new?ExitStatus("EXECUTING");

          ????/**
          ?????*?Convenient?constant?value?representing?finished?processing.
          ?????*/

          ????public?static?final?ExitStatus?COMPLETED?=?new?ExitStatus("COMPLETED");

          ????/**
          ?????*?Convenient?constant?value?representing?job?that?did?no?processing?(e.g.
          ?????*?because?it?was?already?complete).
          ?????*/

          ????public?static?final?ExitStatus?NOOP?=?new?ExitStatus("NOOP");

          ????/**
          ?????*?Convenient?constant?value?representing?finished?processing?with?an?error.
          ?????*/

          ????public?static?final?ExitStatus?FAILED?=?new?ExitStatus("FAILED");

          ????/**
          ?????*?Convenient?constant?value?representing?finished?processing?with
          ?????*?interrupted?status.
          ?????*/

          ????public?static?final?ExitStatus?STOPPED?=?new?ExitStatus("STOPPED");

          ????...
          }

          啟動項目,控制臺日志打印如下:

          2020-03-06?14:21:49.384??INFO?18745?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=multiStepJob2]]?launched?with?the?following?parameters:?[{}]
          2020-03-06?14:21:49.427??INFO?18745?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
          執(zhí)行步驟一操作。。。
          2020-03-06?14:21:49.456??INFO?18745?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?29ms
          2020-03-06?14:21:49.501??INFO?18745?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
          執(zhí)行步驟二操作。。。
          2020-03-06?14:21:49.527??INFO?18745?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?26ms
          2020-03-06?14:21:49.576??INFO?18745?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
          執(zhí)行步驟三操作。。。
          2020-03-06?14:21:49.604??INFO?18745?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?28ms
          2020-03-06?14:21:49.629??INFO?18745?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=multiStepJob2]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?238ms

          Flow 的用法

          Flow 的作用就是可以將多個步驟 Step 組合在一起然后再組裝到任務(wù) Job 中。舉個 Flow 的例子,在 job 包下新建FlowJobDemo類:

          @Component
          public?class?FlowJobDemo?{

          ????@Autowired
          ????private?JobBuilderFactory?jobBuilderFactory;
          ????@Autowired
          ????private?StepBuilderFactory?stepBuilderFactory;

          ????@Bean
          ????public?Job?flowJob()?{
          ????????return?jobBuilderFactory.get("flowJob")
          ????????????????.start(flow())
          ????????????????.next(step3())
          ????????????????.end()
          ????????????????.build();
          ????}

          ????private?Step?step1()?{
          ????????return?stepBuilderFactory.get("step1")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟一操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Step?step2()?{
          ????????return?stepBuilderFactory.get("step2")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟二操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Step?step3()?{
          ????????return?stepBuilderFactory.get("step3")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟三操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????//?創(chuàng)建一個flow對象,包含若干個step
          ????private?Flow?flow()?{
          ????????return?new?FlowBuilder("flow")
          ????????????????.start(step1())
          ????????????????.next(step2())
          ????????????????.build();
          ????}
          }

          上面代碼中,我們通過FlowBuilder將 step1 和 step2 組合在一起,創(chuàng)建了一個名為 flow 的 Flow,然后再將其賦給任務(wù) Job。使用 Flow 和 Step 構(gòu)建 Job 的區(qū)別是,Job 流程中包含 Flow 類型的時候需要在build()方法前調(diào)用end()方法。

          啟動程序,控制臺日志打印如下:

          2020-03-06?14:36:42.621??INFO?18865?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=flowJob]]?launched?with?the?following?parameters:?[{}]
          2020-03-06?14:36:42.667??INFO?18865?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
          執(zhí)行步驟一操作。。。
          2020-03-06?14:36:42.697??INFO?18865?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?30ms
          2020-03-06?14:36:42.744??INFO?18865?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
          執(zhí)行步驟二操作。。。
          2020-03-06?14:36:42.771??INFO?18865?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?27ms
          2020-03-06?14:36:42.824??INFO?18865?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
          執(zhí)行步驟三操作。。。
          2020-03-06?14:36:42.850??INFO?18865?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?25ms
          2020-03-06?14:36:42.874??INFO?18865?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=flowJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?245ms

          并行執(zhí)行

          任務(wù)中的步驟除了可以串行執(zhí)行(一個接著一個執(zhí)行)外,還可以并行執(zhí)行,并行執(zhí)行在特定的業(yè)務(wù)需求下可以提供任務(wù)執(zhí)行效率。

          將任務(wù)并行化只需兩個簡單步驟:

          1. 將步驟 Step 轉(zhuǎn)換為 Flow;
          2. 任務(wù) Job 中指定并行 Flow。

          舉個例子,在 job 包下新建SplitJobDemo類:

          @Component
          public?class?SplitJobDemo?{

          ????@Autowired
          ????private?JobBuilderFactory?jobBuilderFactory;
          ????@Autowired
          ????private?StepBuilderFactory?stepBuilderFactory;

          ????@Bean
          ????public?Job?splitJob()?{
          ????????return?jobBuilderFactory.get("splitJob")
          ????????????????.start(flow1())
          ????????????????.split(new?SimpleAsyncTaskExecutor()).add(flow2())
          ????????????????.end()
          ????????????????.build();

          ????}

          ????private?Step?step1()?{
          ????????return?stepBuilderFactory.get("step1")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟一操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Step?step2()?{
          ????????return?stepBuilderFactory.get("step2")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟二操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Step?step3()?{
          ????????return?stepBuilderFactory.get("step3")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟三操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Flow?flow1()?{
          ????????return?new?FlowBuilder("flow1")
          ????????????????.start(step1())
          ????????????????.next(step2())
          ????????????????.build();
          ????}

          ????private?Flow?flow2()?{
          ????????return?new?FlowBuilder("flow2")
          ????????????????.start(step3())
          ????????????????.build();
          ????}
          }

          上面例子中,我們創(chuàng)建了兩個 Flow:flow1(包含 step1 和 step2)和 flow2(包含 step3)。然后通過JobBuilderFactorysplit方法,指定一個異步執(zhí)行器,將 flow1 和 flow2 異步執(zhí)行(也就是并行)。

          啟動項目,控制臺日志打印如下:

          2020-03-06?15:25:43.602??INFO?19449?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=splitJob]]?launched?with?the?following?parameters:?[{}]
          2020-03-06?15:25:43.643??INFO?19449?---?[cTaskExecutor-1]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
          2020-03-06?15:25:43.650??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
          執(zhí)行步驟三操作。。。
          執(zhí)行步驟一操作。。。
          2020-03-06?15:25:43.673??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?23ms
          2020-03-06?15:25:43.674??INFO?19449?---?[cTaskExecutor-1]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?31ms
          2020-03-06?15:25:43.714??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step2]
          執(zhí)行步驟二操作。。。
          2020-03-06?15:25:43.738??INFO?19449?---?[cTaskExecutor-2]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step2]?executed?in?24ms
          2020-03-06?15:25:43.758??INFO?19449?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=splitJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?146ms

          可以看到 step3 并沒有在 step2 后才執(zhí)行,說明步驟已經(jīng)是并行化的(開啟并行化后,并行的步驟執(zhí)行順序并不能 100%確定,因為線程調(diào)度具有不確定性)。

          任務(wù)決策器

          決策器的作用就是可以指定程序在不同的情況下運行不同的任務(wù)流程,比如今天是周末,則讓任務(wù)執(zhí)行 step1 和 step2,如果是工作日,則之心 step1 和 step3。

          使用決策器前,我們需要自定義一個決策器的實現(xiàn)。在 cc.mrbird.batch 包下新建 decider 包,然后創(chuàng)建MyDecider類,實現(xiàn)JobExecutionDecider接口:

          @Component
          public?class?MyDecider?implements?JobExecutionDecider?{
          ????@Override
          ????public?FlowExecutionStatus?decide(JobExecution?jobExecution,?StepExecution?stepExecution)?{
          ????????LocalDate?now?=?LocalDate.now();
          ????????DayOfWeek?dayOfWeek?=?now.getDayOfWeek();

          ????????if?(dayOfWeek?==?DayOfWeek.SATURDAY?||?dayOfWeek?==?DayOfWeek.SUNDAY)?{
          ????????????return?new?FlowExecutionStatus("weekend");
          ????????}?else?{
          ????????????return?new?FlowExecutionStatus("workingDay");
          ????????}
          ????}
          }

          MyDecider實現(xiàn)JobExecutionDecider接口的decide方法,該方法返回FlowExecutionStatus。上面的邏輯是:判斷今天是否是周末,如果是,返回FlowExecutionStatus("weekend")狀態(tài),否則返回FlowExecutionStatus("workingDay")狀態(tài)。

          下面演示如何在任務(wù) Job 里使用決策器。在 job 包下新建DeciderJobDemo

          @Component
          public?class?DeciderJobDemo?{

          ????@Autowired
          ????private?JobBuilderFactory?jobBuilderFactory;
          ????@Autowired
          ????private?StepBuilderFactory?stepBuilderFactory;
          ????@Autowired
          ????private?MyDecider?myDecider;

          ????@Bean
          ????public?Job?deciderJob()?{
          ????????return?jobBuilderFactory.get("deciderJob")
          ????????????????.start(step1())
          ????????????????.next(myDecider)
          ????????????????.from(myDecider).on("weekend").to(step2())
          ????????????????.from(myDecider).on("workingDay").to(step3())
          ????????????????.from(step3()).on("*").to(step4())
          ????????????????.end()
          ????????????????.build();
          ????}

          ????private?Step?step1()?{
          ????????return?stepBuilderFactory.get("step1")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟一操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Step?step2()?{
          ????????return?stepBuilderFactory.get("step2")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟二操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          ????private?Step?step3()?{
          ????????return?stepBuilderFactory.get("step3")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟三操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}


          ????private?Step?step4()?{
          ????????return?stepBuilderFactory.get("step4")
          ????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????System.out.println("執(zhí)行步驟四操作。。。");
          ????????????????????return?RepeatStatus.FINISHED;
          ????????????????}).build();
          ????}

          上面代碼中,我們注入了自定義決策器MyDecider,然后在jobDecider()方法里使用了該決策器:

          @Bean
          public?Job?deciderJob()?{
          ????return?jobBuilderFactory.get("deciderJob")
          ????????????.start(step1())
          ????????????.next(myDecider)
          ????????????.from(myDecider).on("weekend").to(step2())
          ????????????.from(myDecider).on("workingDay").to(step3())
          ????????????.from(step3()).on("*").to(step4())
          ????????????.end()
          ????????????.build();
          }

          這段代碼的含義是:任務(wù) deciderJob 首先執(zhí)行 step1,然后指定自定義決策器,如果決策器返回 weekend,那么執(zhí)行 step2,如果決策器返回 workingDay,那么執(zhí)行 step3。如果執(zhí)行了 step3,那么無論 step3 的結(jié)果是什么,都將執(zhí)行 step4。

          啟動項目,控制臺輸出如下所示:

          2020-03-06?16:09:10.541??INFO?19873?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=deciderJob]]?launched?with?the?following?parameters:?[{}]
          2020-03-06?16:09:10.609??INFO?19873?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step1]
          執(zhí)行步驟一操作。。。
          2020-03-06?16:09:10.641??INFO?19873?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step1]?executed?in?32ms
          2020-03-06?16:09:10.692??INFO?19873?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step3]
          執(zhí)行步驟三操作。。。
          2020-03-06?16:09:10.723??INFO?19873?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step3]?executed?in?31ms
          2020-03-06?16:09:10.769??INFO?19873?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[step4]
          執(zhí)行步驟四操作。。。
          2020-03-06?16:09:10.797??INFO?19873?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[step4]?executed?in?27ms
          2020-03-06?16:09:10.818??INFO?19873?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[FlowJob:?[name=deciderJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?256ms

          因為今天是 2020 年 03 月 06 日星期五,是工作日,所以任務(wù)執(zhí)行了 step1、step3 和 step4。

          任務(wù)嵌套

          任務(wù) Job 除了可以由 Step 或者 Flow 構(gòu)成外,我們還可以將多個任務(wù) Job 轉(zhuǎn)換為特殊的 Step,然后再賦給另一個任務(wù) Job,這就是任務(wù)的嵌套。

          舉個例子,在 job 包下新建NestedJobDemo類:

          @Component
          public?class?NestedJobDemo?{

          ????@Autowired
          ????private?JobBuilderFactory?jobBuilderFactory;
          ????@Autowired
          ????private?StepBuilderFactory?stepBuilderFactory;
          ????@Autowired
          ????private?JobLauncher?jobLauncher;
          ????@Autowired
          ????private?JobRepository?jobRepository;
          ????@Autowired
          ????private?PlatformTransactionManager?platformTransactionManager;

          ????//?父任務(wù)
          ????@Bean
          ????public?Job?parentJob()?{
          ????????return?jobBuilderFactory.get("parentJob")
          ????????????????.start(childJobOneStep())
          ????????????????.next(childJobTwoStep())
          ????????????????.build();
          ????}


          ????//?將任務(wù)轉(zhuǎn)換為特殊的步驟
          ????private?Step?childJobOneStep()?{
          ????????return?new?JobStepBuilder(new?StepBuilder("childJobOneStep"))
          ????????????????.job(childJobOne())
          ????????????????.launcher(jobLauncher)
          ????????????????.repository(jobRepository)
          ????????????????.transactionManager(platformTransactionManager)
          ????????????????.build();
          ????}

          ????//?將任務(wù)轉(zhuǎn)換為特殊的步驟
          ????private?Step?childJobTwoStep()?{
          ????????return?new?JobStepBuilder(new?StepBuilder("childJobTwoStep"))
          ????????????????.job(childJobTwo())
          ????????????????.launcher(jobLauncher)
          ????????????????.repository(jobRepository)
          ????????????????.transactionManager(platformTransactionManager)
          ????????????????.build();
          ????}

          ????//?子任務(wù)一
          ????private?Job?childJobOne()?{
          ????????return?jobBuilderFactory.get("childJobOne")
          ????????????????.start(
          ????????????????????stepBuilderFactory.get("childJobOneStep")
          ????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????????????????System.out.println("子任務(wù)一執(zhí)行步驟。。。");
          ????????????????????????????????return?RepeatStatus.FINISHED;
          ????????????????????????????}).build()
          ????????????????).build();
          ????}

          ????//?子任務(wù)二
          ????private?Job?childJobTwo()?{
          ????????return?jobBuilderFactory.get("childJobTwo")
          ????????????????.start(
          ????????????????????stepBuilderFactory.get("childJobTwoStep")
          ????????????????????????????.tasklet((stepContribution,?chunkContext)?->?{
          ????????????????????????????????System.out.println("子任務(wù)二執(zhí)行步驟。。。");
          ????????????????????????????????return?RepeatStatus.FINISHED;
          ????????????????????????????}).build()
          ????????????????).build();
          ????}
          }

          上面代碼中,我們通過childJobOne()childJobTwo()方法創(chuàng)建了兩個任務(wù) Job,這里沒什么好說的,前面都介紹過。關(guān)鍵在于childJobOneStep()方法和childJobTwoStep()方法。在childJobOneStep()方法中,我們通過JobStepBuilder構(gòu)建了一個名稱為childJobOneStep的 Step,顧名思義,它是一個任務(wù)型 Step 的構(gòu)造工廠,可以將任務(wù)轉(zhuǎn)換為“特殊”的步驟。在構(gòu)建過程中,我們還需要傳入任務(wù)執(zhí)行器 JobLauncher、任務(wù)倉庫 JobRepository 和事務(wù)管理器 PlatformTransactionManager。

          將任務(wù)轉(zhuǎn)換為特殊的步驟后,將其賦給父任務(wù) parentJob 即可,流程和前面介紹的一致。

          配置好后,啟動項目,控制臺輸出如下所示:

          2020-03-06?16:58:39.771??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=parentJob]]?launched?with?the?following?parameters:?[{}]
          2020-03-06?16:58:39.812??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobOneStep]
          2020-03-06?16:58:39.866??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobOne]]?launched?with?the?following?parameters:?[{}]
          2020-03-06?16:58:39.908??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobOneStep]
          子任務(wù)一執(zhí)行步驟。。。
          2020-03-06?16:58:39.940??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobOneStep]?executed?in?32ms
          2020-03-06?16:58:39.960??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobOne]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?86ms
          2020-03-06?16:58:39.983??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobOneStep]?executed?in?171ms
          2020-03-06?16:58:40.019??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobTwoStep]
          2020-03-06?16:58:40.067??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobTwo]]?launched?with?the?following?parameters:?[{}]
          2020-03-06?16:58:40.102??INFO?21588?---?[???????????main]?o.s.batch.core.job.SimpleStepHandler?????:?Executing?step:?[childJobTwoStep]
          子任務(wù)二執(zhí)行步驟。。。
          2020-03-06?16:58:40.130??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobTwoStep]?executed?in?28ms
          2020-03-06?16:58:40.152??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=childJobTwo]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?75ms
          2020-03-06?16:58:40.157??INFO?21588?---?[???????????main]?o.s.batch.core.step.AbstractStep?????????:?Step:?[childJobTwoStep]?executed?in?138ms
          2020-03-06?16:58:40.177??INFO?21588?---?[???????????main]?o.s.b.c.l.support.SimpleJobLauncher??????:?Job:?[SimpleJob:?[name=parentJob]]?completed?with?the?following?parameters:?[{}]?and?the?following?status:?[COMPLETED]?in?398ms

          本節(jié)源碼鏈接:https://github.com/wuyouzhuguli/SpringAll/tree/master/67.spring-batch-start。

          參考資料

          [1]

          Spring Batch 官方文檔: https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/spring-batch-intro.html#spring-batch-intro

          [2]

          https://mrbird.cc/Spring-Batch 入門.html: https://mrbird.cc/Spring-Batch入門.html


          < END >

          也許你還想看
          ? |?我在 B 站淘了 2 個 Java 實戰(zhàn)項目! 小破站,YYDS!
          ??|?我常用的20+個學(xué)習(xí)編程的網(wǎng)站!蕪湖起飛!
          ? |?1w+字的 Dubbo 面試題/知識點總結(jié)?。?021 最新版)
          ? |?7年前,24歲,出版了一本 Redis 神書
          ? |?京東二面:為什么需要分布式ID?你項目中是怎么做的?
          ? |?再見 Spring Task,這個定時任務(wù)框架真香!
          ? |?一鍵生成數(shù)據(jù)庫文檔,堪稱數(shù)據(jù)庫界的Swagger
          ? |?來看看這個超好用的項目腳手架吧!5分鐘搭建一個Spring Boot 前后端分離系統(tǒng)!
          ? |?Spring 官宣,干掉 Spring 5.3.x!

          我是 Guide哥,一個工作2年有余,接觸編程已經(jīng)6年有余的程序員。大三開源 JavaGuide,目前已經(jīng) 100k+ Star。未來幾年,希望持續(xù)完善 JavaGuide,爭取能夠幫助更多學(xué)習(xí) Java 的小伙伴!共勉!凎!點擊即可了解我的個人經(jīng)歷

          歡迎點贊分享。咱們下期再會!

          瀏覽 31
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  干片网| avtt在线看 | 国产激情AV在线播放 | 午夜理理伦电影A片无码蜜桃av | 成人黄色AV网站 |