<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          批處理框架 Spring Batch 這么強(qiáng),你真的會用嗎?

          共 40558字,需瀏覽 82分鐘

           ·

          2024-10-24 12:00

          來源:blog.csdn.net/qq_35387940/article/details/108193473

          • 前言
          • 正文

          前言

          概念詞就不多說了,我簡單地介紹下 , spring batch 是一個 方便使用的 較健全的 批處理 框架。

          為什么說是方便使用的,因?yàn)檫@是 基于spring的一個框架,接入簡單、易理解、流程分明。

          為什么說是較健全的, 因?yàn)樗峁┝送N覀冊趯Υ笈繑?shù)據(jù)進(jìn)行處理時(shí)需要考慮到的 日志跟蹤、事務(wù)粒度調(diào)配、可控執(zhí)行、失敗機(jī)制、重試機(jī)制、數(shù)據(jù)讀寫等。

          正文

          那么回到文章,我們該篇文章將會帶來給大家的是什么?(結(jié)合實(shí)例講解那是當(dāng)然的)

          從實(shí)現(xiàn)的業(yè)務(wù)場景來說,有以下兩個:

          1. 從  csv文件 讀取數(shù)據(jù),進(jìn)行業(yè)務(wù)處理再存儲
          2. 從 數(shù)據(jù)庫 讀取數(shù)據(jù),進(jìn)行業(yè)務(wù)處理再存儲

          也就是平時(shí)經(jīng)常遇到的數(shù)據(jù)清理或者數(shù)據(jù)過濾,又或者是數(shù)據(jù)遷移備份等等。大批量的數(shù)據(jù),自己實(shí)現(xiàn)分批處理需要考慮的東西太多了,又不放心,那么使用 Spring Batch 框架 是一個很好的選擇。

          首先,在進(jìn)入實(shí)例教程前,我們看看這次的實(shí)例里,我們使用springboot 整合spring batch 框架,要編碼的東西有什么?

          通過一張簡單的圖來了解:

          圖片

          可能大家看到這個圖,是不是多多少少想起來定時(shí)任務(wù)框架?確實(shí)有那么點(diǎn)像,但是我必須在這告訴大家,這是一個批處理框架,不是一個schuedling 框架。但是前面提到它提供了可執(zhí)行控制,也就是說,啥時(shí)候執(zhí)行是可控的,那么顯然就是自己可以進(jìn)行擴(kuò)展結(jié)合定時(shí)任務(wù)框架,實(shí)現(xiàn)你心中所想。

          ok,回到主題,相信大家能從圖中簡單明了地看到我們這次實(shí)例,需要實(shí)現(xiàn)的東西有什么了。所以我就不在對各個小組件進(jìn)行大批量文字的描述了。

          那么我們事不宜遲,開始我們的實(shí)例教程。

          首先準(zhǔn)備一個數(shù)據(jù)庫,里面建一張簡單的表,用于實(shí)例數(shù)據(jù)的寫入存儲或者說是讀取等等。

          bloginfo表

          圖片

          相關(guān)建表sql語句:

          CREATE TABLE `bloginfo`  (
            `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
            `blogAuthor` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客作者標(biāo)識',
            `blogUrl` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客鏈接',
            `blogTitle` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客標(biāo)題',
            `blogItem` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '博客欄目',
            PRIMARY KEY (`id`) USING BTREE
          ) ENGINE = InnoDB AUTO_INCREMENT = 89031 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

          pom文件里的核心依賴:

          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
          </dependency>

          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
          </dependency>

          <!--  spring batch -->
          <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-batch</artifactId>
          </dependency>


          <!-- hibernate validator -->
          <dependency>
              <groupId>org.hibernate</groupId>
              <artifactId>hibernate-validator</artifactId>
              <version>6.0.7.Final</version>
          </dependency>
          <!--  mybatis -->
          <dependency>
              <groupId>org.mybatis.spring.boot</groupId>
              <artifactId>mybatis-spring-boot-starter</artifactId>
              <version>2.0.0</version>
          </dependency>
          <!--  mysql -->
          <dependency>
              <groupId>mysql</groupId>
              <artifactId>mysql-connector-java</artifactId>
              <scope>runtime</scope>
          </dependency>


          <!-- druid數(shù)據(jù)源驅(qū)動 1.1.10解決springboot從1.0——2.0版本問題-->
          <dependency>
              <groupId>com.alibaba</groupId>
              <artifactId>druid-spring-boot-starter</artifactId>
              <version>1.1.18</version>
          </dependency>

          yml文件:

          spring:
            batch:
              job:
          #設(shè)置為 false -需要jobLaucher.run執(zhí)行
                enabled: false
              initialize-schema: always

          #    table-prefix: my-batch
           
            datasource:
              druid:
                username: root
                password: root
                url: jdbc:mysql://localhost:3306/hellodemo?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&zeroDateTimeBehavior=convertToNull
                driver-class-name: com.mysql.cj.jdbc.Driver
                initialSize: 5
                minIdle: 5
                maxActive: 20
                maxWait: 60000
                timeBetweenEvictionRunsMillis: 60000
                minEvictableIdleTimeMillis: 300000
                validationQuery: SELECT 1 FROM DUAL
                testWhileIdle: true
                testOnBorrow: false
                testOnReturn: false
                poolPreparedStatements: true
                maxPoolPreparedStatementPerConnectionSize: 20
                useGlobalDataSourceStat: true
                connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
          server:
            port: 8665

          圖片

          ps:這里我們用到了druid數(shù)據(jù)庫連接池,其實(shí)有個小坑,后面文章會講到。

          因?yàn)槲覀冞@次的實(shí)例最終數(shù)據(jù)處理完之后,是寫入數(shù)據(jù)庫存儲(當(dāng)然你也可以輸出到文件等等)。

          所以我們前面也建了一張表,pom文件里面我們也整合的mybatis,那么我們在整合spring batch 主要編碼前,我們先把這些關(guān)于數(shù)據(jù)庫打通用到的簡單過一下。

          pojo 層

          BlogInfo.java :

          /**
           * @Author : JCccc
           * @Description :
           **/
          public class BlogInfo {
           
              private Integer id;
              private String blogAuthor;
              private String blogUrl;
              private String blogTitle;
              private String blogItem;
           
              @Override
              public String toString() {
                  return "BlogInfo{" +
                          "id=" + id +
                          ", blogAuthor='" + blogAuthor + '\'' +
                          ", blogUrl='
          " + blogUrl + '\'' +
                          "
          , blogTitle='" + blogTitle + '\'' +
                          ", blogItem='" + blogItem + '\'' +
                          '
          }';
              }
           
              public Integer getId() {
                  return id;
              }
           
              public void setId(Integer id) {
                  this.id = id;
              }
           
              public String getBlogAuthor() {
                  return blogAuthor;
              }
           
              public void setBlogAuthor(String blogAuthor) {
                  this.blogAuthor = blogAuthor;
              }
           
              public String getBlogUrl() {
                  return blogUrl;
              }
           
              public void setBlogUrl(String blogUrl) {
                  this.blogUrl = blogUrl;
              }
           
              public String getBlogTitle() {
                  return blogTitle;
              }
           
              public void setBlogTitle(String blogTitle) {
                  this.blogTitle = blogTitle;
              }
           
              public String getBlogItem() {
                  return blogItem;
              }
           
              public void setBlogItem(String blogItem) {
                  this.blogItem = blogItem;
              }
          }
          mapper層

          BlogMapper.java :

          ps:可以看到這個實(shí)例我用的是注解的方式,哈哈為了省事,而且我還不寫servcie層和impl層,也是為了省事,因?yàn)樵撈恼轮攸c(diǎn)不在這些,所以這些不好的大家不要學(xué)。

          import com.example.batchdemo.pojo.BlogInfo;
          import org.apache.ibatis.annotations.*;
          import java.util.List;
          import java.util.Map;
           
          /**
           * @Author : JCccc
           * @Description :
           **/
          @Mapper
          public interface BlogMapper {
              @Insert("INSERT INTO bloginfo ( blogAuthor, blogUrl, blogTitle, blogItem )   VALUES ( #{blogAuthor}, #{blogUrl},#{blogTitle},#{blogItem}) ")
              @Options(useGeneratedKeys = true, keyProperty = "id")
              int insert(BlogInfo bloginfo);
           
           
              @Select("select blogAuthor, blogUrl, blogTitle, blogItem from bloginfo where blogAuthor < #{authorId}")
               List<BlogInfo> queryInfoById(Map<String , Integer> map);
           
          }

          接下來 ,重頭戲,我們開始對前邊那張圖里涉及到的各個小組件進(jìn)行編碼。

          首先創(chuàng)建一個 配置類, MyBatchConfig.java

          從我起名來看,可以知道這基本就是咱們整合spring batch 涉及到的一些配置組件都會寫在這里了。

          首先我們按照咱們上面的圖來看,里面包含內(nèi)容有:

          JobRepository job的注冊/存儲器
          JobLauncher job的執(zhí)行器 
          Job job任務(wù),包含一個或多個Step
          Step 包含(ItemReader、ItemProcessor和ItemWriter) 
          ItemReader 數(shù)據(jù)讀取器 
          ItemProcessor 數(shù)據(jù)處理器
          ItemWriter 數(shù)據(jù)輸出器

          首先,在MyBatchConfig類前加入注解:

          @Configuration 用于告訴spring,咱們這個類是一個自定義配置類,里面很多bean都需要加載到spring容器里面

          @EnableBatchProcessing 開啟批處理支持

          圖片

          然后開始往MyBatchConfig類里,編寫各個小組件。

          JobRepository

          寫在MyBatchConfig類里

          /**
           * JobRepository定義:Job的注冊容器以及和數(shù)據(jù)庫打交道(事務(wù)管理等)
           * @param dataSource
           * @param transactionManager
           * @return
           * @throws Exception
           */
          @Bean
          public JobRepository myJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
              JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
              jobRepositoryFactoryBean.setDatabaseType("mysql");
              jobRepositoryFactoryBean.setTransactionManager(transactionManager);
              jobRepositoryFactoryBean.setDataSource(dataSource);
              return jobRepositoryFactoryBean.getObject();
          }
          JobLauncher

          寫在MyBatchConfig類里

          /**
           * jobLauncher定義:job的啟動器,綁定相關(guān)的jobRepository
           * @param dataSource
           * @param transactionManager
           * @return
           * @throws Exception
           */
          @Bean
          public SimpleJobLauncher myJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception{
              SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
              // 設(shè)置jobRepository
              jobLauncher.setJobRepository(myJobRepository(dataSource, transactionManager));
              return jobLauncher;
          }
          Job

          寫在MyBatchConfig類里

          /**
           * 定義job
           * @param jobs
           * @param myStep
           * @return
           */
          @Bean
          public Job myJob(JobBuilderFactory jobs, Step myStep){
              return jobs.get("myJob")
                      .incrementer(new RunIdIncrementer())
                      .flow(myStep)
                      .end()
                      .listener(myJobListener())
                      .build();
          }

          對于Job的運(yùn)行,是可以配置監(jiān)聽器的

          JobListener

          寫在MyBatchConfig類里

          /**
           * 注冊job監(jiān)聽器
           * @return
           */
          @Bean
          public MyJobListener myJobListener(){
              return new MyJobListener();
          }

          這是一個我們自己自定義的監(jiān)聽器,所以是單獨(dú)創(chuàng)建的,MyJobListener.java

          /**
           * @Author : JCccc
           * @Description :監(jiān)聽Job執(zhí)行情況,實(shí)現(xiàn)JobExecutorListener,且在batch配置類里,Job的Bean上綁定該監(jiān)聽器
           **/
           
          public class MyJobListener implements JobExecutionListener {
           
              private Logger logger = LoggerFactory.getLogger(MyJobListener.class);
           
              @Override
              public void beforeJob(JobExecution jobExecution) {
                  logger.info("job 開始, id={}",jobExecution.getJobId());
              }
           
              @Override
              public void afterJob(JobExecution jobExecution) {
                  logger.info("job 結(jié)束, id={}",jobExecution.getJobId());
              }
          }
          Step(ItemReader  ItemProcessor  ItemWriter)

          step里面包含數(shù)據(jù)讀取器,數(shù)據(jù)處理器,數(shù)據(jù)輸出器三個小組件的的實(shí)現(xiàn)。

          我們也是一個個拆解來進(jìn)行編寫。

          文章前邊說到,該篇實(shí)現(xiàn)的場景包含兩種,一種是從csv文件讀入大量數(shù)據(jù)進(jìn)行處理,另一種是從數(shù)據(jù)庫表讀入大量數(shù)據(jù)進(jìn)行處理。

          從CSV文件讀取數(shù)據(jù)
          ItemReader

          寫在MyBatchConfig類里

          /**
           * ItemReader定義:讀取文件數(shù)據(jù)+entirty實(shí)體類映射
           * @return
           */
          @Bean
          public ItemReader<BlogInfo> reader(){
              // 使用FlatFileItemReader去讀cvs文件,一行即一條數(shù)據(jù)
              FlatFileItemReader<BlogInfo> reader = new FlatFileItemReader<>();
              // 設(shè)置文件處在路徑
              reader.setResource(new ClassPathResource("static/bloginfo.csv"));
              // entity與csv數(shù)據(jù)做映射
              reader.setLineMapper(new DefaultLineMapper<BlogInfo>() {
                  {
                      setLineTokenizer(new DelimitedLineTokenizer() {
                          {
                              setNames(new String[]{"blogAuthor","blogUrl","blogTitle","blogItem"});
                          }
                      });
                      setFieldSetMapper(new BeanWrapperFieldSetMapper<BlogInfo>() {
                          {
                              setTargetType(BlogInfo.class);
                          }
                      });
                  }
              });
              return reader;
          }

          簡單代碼解析:

          圖片

          對于數(shù)據(jù)讀取器 ItemReader ,我們給它安排了一個讀取監(jiān)聽器,創(chuàng)建 MyReadListener.java

          /**
           * @Author : JCccc
           * @Description :
           **/
           
          public class MyReadListener implements ItemReadListener<BlogInfo> {
           
              private Logger logger = LoggerFactory.getLogger(MyReadListener.class);
           
              @Override
              public void beforeRead() {
              }
           
              @Override
              public void afterRead(BlogInfo item) {
              }
           
              @Override
              public void onReadError(Exception ex) {
                  try {
                      logger.info(format("%s%n", ex.getMessage()));
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }
          ItemProcessor

          寫在MyBatchConfig類里

          /**
           * 注冊ItemProcessor: 處理數(shù)據(jù)+校驗(yàn)數(shù)據(jù)
           * @return
           */
          @Bean
          public ItemProcessor<BlogInfo, BlogInfo> processor(){
              MyItemProcessor myItemProcessor = new MyItemProcessor();
              // 設(shè)置校驗(yàn)器
              myItemProcessor.setValidator(myBeanValidator());
              return myItemProcessor;
          }

          數(shù)據(jù)處理器,是我們自定義的,里面主要是包含我們對數(shù)據(jù)處理的業(yè)務(wù)邏輯,并且我們設(shè)置了一些數(shù)據(jù)校驗(yàn)器,我們這里使用 JSR-303的Validator來作為校驗(yàn)器。

          校驗(yàn)器

          寫在MyBatchConfig類里

          /**
           * 注冊校驗(yàn)器
           * @return
           */
          @Bean
          public MyBeanValidator myBeanValidator(){
              return new MyBeanValidator<BlogInfo>();
          }

          創(chuàng)建MyItemProcessor.java

          ps:里面我的數(shù)據(jù)處理邏輯是,獲取出讀取數(shù)據(jù)里面的每條數(shù)據(jù)的blogItem字段,如果是springboot,那就對title字段值進(jìn)行替換。

          其實(shí)也就是模擬一個簡單地?cái)?shù)據(jù)處理場景。

          import com.example.batchdemo.pojo.BlogInfo;
          import org.springframework.batch.item.validator.ValidatingItemProcessor;
          import org.springframework.batch.item.validator.ValidationException;
           
          /**
           * @Author : JCccc
           * @Description :
           **/
          public class MyItemProcessor extends ValidatingItemProcessor<BlogInfo> {
              @Override
              public BlogInfo process(BlogInfo item) throws ValidationException {
                  /**
                   * 需要執(zhí)行super.process(item)才會調(diào)用自定義校驗(yàn)器
                   */
                  super.process(item);
                  /**
                   * 對數(shù)據(jù)進(jìn)行簡單的處理
                   */
                  if (item.getBlogItem().equals("springboot")) {
                      item.setBlogTitle("springboot 系列還請看看我Jc");
                  } else {
                      item.setBlogTitle("未知系列");
                  }
                  return item;
              }
          }

          創(chuàng)建MyBeanValidator.java:

          import org.springframework.batch.item.validator.ValidationException;
          import org.springframework.batch.item.validator.Validator;
          import org.springframework.beans.factory.InitializingBean;
          import javax.validation.ConstraintViolation;
          import javax.validation.Validation;
          import javax.validation.ValidatorFactory;
          import java.util.Set;
           
          /**
           * @Author : JCccc
           * @Description :
           **/
          public class MyBeanValidator<T> implements Validator<T>, InitializingBean {
           
              private javax.validation.Validator validator;
           
              @Override
              public void validate(T value) throws ValidationException {
                  /**
                   * 使用Validator的validate方法校驗(yàn)數(shù)據(jù)
                   */
                  Set<ConstraintViolation<T>> constraintViolations =
                          validator.validate(value);
                  if (constraintViolations.size() > 0) {
                      StringBuilder message = new StringBuilder();
                      for (ConstraintViolation<T> constraintViolation : constraintViolations) {
                          message.append(constraintViolation.getMessage() + "\n");
                      }
                      throw new ValidationException(message.toString());
                  }
              }
           
              /**
               * 使用JSR-303的Validator來校驗(yàn)我們的數(shù)據(jù),在此進(jìn)行JSR-303的Validator的初始化
               * @throws Exception
               */
              @Override
              public void afterPropertiesSet() throws Exception {
                  ValidatorFactory validatorFactory =
                          Validation.buildDefaultValidatorFactory();
                  validator = validatorFactory.usingContext().getValidator();
              }
           
          }

          ps:其實(shí)該篇文章沒有使用這個數(shù)據(jù)校驗(yàn)器,大家想使用的話,可以在實(shí)體類上添加一些校驗(yàn)器的注解@NotNull @Max @Email等等。我偏向于直接在處理器里面進(jìn)行處理,想把關(guān)于數(shù)據(jù)處理的代碼都寫在一塊。

          ItemWriter

          寫在MyBatchConfig類里

          /**
           * ItemWriter定義:指定datasource,設(shè)置批量插入sql語句,寫入數(shù)據(jù)庫
           * @param dataSource
           * @return
           */
          @Bean
          public ItemWriter<BlogInfo> writer(DataSource dataSource){
              // 使用jdbcBcatchItemWrite寫數(shù)據(jù)到數(shù)據(jù)庫中
              JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
              // 設(shè)置有參數(shù)的sql語句
              writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
              String sql = "insert into bloginfo "+" (blogAuthor,blogUrl,blogTitle,blogItem) "
                      +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
              writer.setSql(sql);
              writer.setDataSource(dataSource);
              return writer;
          }

          簡單代碼解析:

          圖片

          同樣 對于數(shù)據(jù)讀取器 ItemWriter ,我們給它也安排了一個輸出監(jiān)聽器,創(chuàng)建 MyWriteListener.java

          import com.example.batchdemo.pojo.BlogInfo;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;
          import org.springframework.batch.core.ItemWriteListener;
          import java.util.List;
          import static java.lang.String.format;
           
          /**
           * @Author : JCccc
           * @Description :
           **/
          public class MyWriteListener implements ItemWriteListener<BlogInfo> {
              private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);
           
              @Override
              public void beforeWrite(List<? extends BlogInfo> items) {
              }
           
              @Override
              public void afterWrite(List<? extends BlogInfo> items) {
              }
           
              @Override
              public void onWriteError(Exception exception, List<? extends BlogInfo> items) {
                  try {
                      logger.info(format("%s%n", exception.getMessage()));
                      for (BlogInfo message : items) {
                          logger.info(format("Failed writing BlogInfo : %s", message.toString()));
                      }
           
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
           
              }
          }

          ItemReaderItemProcessor、ItemWriter,這三個小組件到這里,我們都實(shí)現(xiàn)了,那么接下來就是把這三個小組件跟我們的step去綁定起來。

          寫在MyBatchConfig類里

          /**
           * step定義:
           * 包括
           * ItemReader 讀取
           * ItemProcessor  處理
           * ItemWriter 輸出
           * @param stepBuilderFactory
           * @param reader
           * @param writer
           * @param processor
           * @return
           */

          @Bean
          public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<BlogInfo> reader,
                           ItemWriter<BlogInfo> writer, ItemProcessor<BlogInfo, BlogInfo> processor){
              return stepBuilderFactory
                      .get("myStep")
                      .<BlogInfo, BlogInfo>chunk(65000) // Chunk的機(jī)制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫入操作)
                      .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
                      .listener(new MyReadListener())
                      .processor(processor)
                      .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)
                      .listener(new MyWriteListener())
                      .build();
          }

          這個Step,稍作講解。

          前邊提到了,spring batch框架,提供了事務(wù)的控制,重啟,檢測跳過等等機(jī)制。

          那么,這些東西的實(shí)現(xiàn),很多都在于這個step環(huán)節(jié)的設(shè)置。

          首先看到我們代碼出現(xiàn)的第一個設(shè)置,chunk( 6500 ) ,Chunk的機(jī)制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫入操作。

          沒錯,對于整個step環(huán)節(jié),就是數(shù)據(jù)的讀取,處理最后到輸出。

          這個chunk機(jī)制里,我們傳入的 6500,也就是是告訴它,讀取處理數(shù)據(jù),累計(jì)達(dá)到 6500條進(jìn)行一次批次處理,去執(zhí)行寫入操作。

          這個傳值,是根據(jù)具體業(yè)務(wù)而定,可以是500條一次,1000條一次,也可以是20條一次,50條一次。

          通過一張簡單的小圖來幫助理解:

          圖片

          在我們大量數(shù)據(jù)處理,不管是讀取或者說是寫入,都肯定會涉及到一些未知或者已知因素導(dǎo)致某條數(shù)據(jù)失敗了。

          那么如果說咱們啥也不設(shè)置,失敗一條數(shù)據(jù),那么我們就當(dāng)作整個失敗了?。顯然這個太不人性,所以spring batch 提供了 retry 和 skip 兩個設(shè)置(其實(shí)還有restart) ,通過這兩個設(shè)置來人性化地解決一些數(shù)據(jù)操作失敗場景。

          retryLimit(3).retry(Exception.class)  

          沒錯,這個就是設(shè)置重試,當(dāng)出現(xiàn)異常的時(shí)候,重試多少次。我們設(shè)置為3,也就是說當(dāng)一條數(shù)據(jù)操作失敗,那我們會對這條數(shù)據(jù)進(jìn)行重試3次,還是失敗就是 當(dāng)做失敗了, 那么我們?nèi)绻信渲胹kip(推薦配置使用),那么這個數(shù)據(jù)失敗記錄就會留到給 skip 來處理。

          skip(Exception.class).skipLimit(2)  

          skip,跳過,也就是說我們?nèi)绻O(shè)置3, 那么就是可以容忍 3條數(shù)據(jù)的失敗。只有達(dá)到失敗數(shù)據(jù)達(dá)到3次,我們才中斷這個step。

          對于失敗的數(shù)據(jù),我們做了相關(guān)的監(jiān)聽器以及異常信息記錄,供與后續(xù)手動補(bǔ)救。

          那么記下來我們開始去調(diào)用這個批處理job,我們通過接口去觸發(fā)這個批處理事件,新建一個Controller,TestController.java

          /**
           * @Author : JCccc
           * @Description :
           **/
          @RestController
          public class TestController {
              @Autowired
              SimpleJobLauncher jobLauncher;
           
              @Autowired
              Job myJob;
           
              @GetMapping("testJob")
              public  void testJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
               //    后置參數(shù):使用JobParameters中綁定參數(shù) addLong  addString 等方法
                  JobParameters jobParameters = new JobParametersBuilder().toJobParameters();
                  jobLauncher.run(myJob, jobParameters);
           
              }
          }

          對了,我準(zhǔn)備了一個csv文件 bloginfo.csv,里面大概8萬多條數(shù)據(jù),用來進(jìn)行批處理測試:

          圖片

          這個文件的路徑跟我們的數(shù)據(jù)讀取器里面讀取的路徑要一直,

          圖片

          目前我們數(shù)據(jù)庫是這個樣子,

          圖片

          接下來我們把我們的項(xiàng)目啟動起來,再看一眼數(shù)據(jù)庫,生成了一些batch用來跟蹤記錄job的一些數(shù)據(jù)表:

          圖片

          我們來調(diào)用一下testJob接口,

          圖片

          然后看下數(shù)據(jù)庫,可以看的數(shù)據(jù)全部都進(jìn)行了相關(guān)的邏輯處理并插入到了數(shù)據(jù)庫:

          圖片

          到這里,我們對Springboot 整合 spring batch 其實(shí)已經(jīng)操作完畢了,也實(shí)現(xiàn)了從csv文件讀取數(shù)據(jù)處理存儲的業(yè)務(wù)場景。

          從數(shù)據(jù)庫讀取數(shù)據(jù)

          ps:前排提示使用druid有坑。后面會講到。

          那么接下來實(shí)現(xiàn)場景,從數(shù)據(jù)庫表內(nèi)讀取數(shù)據(jù)進(jìn)行處理輸出到新的表里面。

          那么基于我們上邊的整合,我們已經(jīng)實(shí)現(xiàn)了

          JobRepository job的注冊/存儲器
          JobLauncher job的執(zhí)行器 
          Job job任務(wù),包含一個或多個Step
          Step 包含(ItemReader、ItemProcessor和ItemWriter) 
          ItemReader 數(shù)據(jù)讀取器 
          ItemProcessor 數(shù)據(jù)處理器
          ItemWriter 數(shù)據(jù)輸出器
          job 監(jiān)聽器
          reader 監(jiān)聽器
          writer 監(jiān)聽器
          process 數(shù)據(jù)校驗(yàn)器

          那么對于我們新寫一個job完成 一個新的場景,我們需要全部重寫么?

          顯然沒必要,當(dāng)然完全新寫一套也是可以的。

          那么該篇,對于一個新的也出場景,從csv文件讀取數(shù)據(jù)轉(zhuǎn)換到數(shù)據(jù)庫表讀取數(shù)據(jù),我們重新新建的有:

          1. 數(shù)據(jù)讀取器: 原先使用的是 FlatFileItemReader ,我們現(xiàn)在改為使用 MyBatisCursorItemReader
          2. 數(shù)據(jù)處理器: 新的場景,業(yè)務(wù)為了好擴(kuò)展,所以我們處理器最好也新建一個
          3. 數(shù)據(jù)輸出器:  新的場景,業(yè)務(wù)為了好擴(kuò)展,所以我們數(shù)據(jù)輸出器最好也新建一個
          4. step的綁定設(shè)置: 新的場景,業(yè)務(wù)為了好擴(kuò)展,所以我們step最好也新建一個
          5. Job: 當(dāng)然是要重新寫一個了

          其他我們照用原先的就行,JobRepository,JobLauncher以及各種監(jiān)聽器啥的,暫且不重新建了。

          新建MyItemProcessorNew.java

          import org.springframework.batch.item.validator.ValidatingItemProcessor;
          import org.springframework.batch.item.validator.ValidationException;
           
          /**
           * @Author : JCccc
           * @Description :
           **/
          public class MyItemProcessorNew extends ValidatingItemProcessor<BlogInfo> {
              @Override
              public BlogInfo process(BlogInfo item) throws ValidationException {
                  /**
                   * 需要執(zhí)行super.process(item)才會調(diào)用自定義校驗(yàn)器
                   */
                  super.process(item);
                  /**
                   * 對數(shù)據(jù)進(jìn)行簡單的處理
                   */
                  Integer authorId= Integer.valueOf(item.getBlogAuthor());
                  if (authorId<20000) {
                      item.setBlogTitle("這是都是小于20000的數(shù)據(jù)");
                  } else if (authorId>20000 && authorId<30000){
                      item.setBlogTitle("這是都是小于30000但是大于20000的數(shù)據(jù)");
                  }else {
                      item.setBlogTitle("舊書不厭百回讀");
                  }
                  return item;
              }
          }

          然后其他重新定義的小組件,寫在MyBatchConfig類里:

          /**
           * 定義job
           * @param jobs
           * @param stepNew
           * @return
           */
          @Bean
          public Job myJobNew(JobBuilderFactory jobs, Step stepNew){
              return jobs.get("myJobNew")
                      .incrementer(new RunIdIncrementer())
                      .flow(stepNew)
                      .end()
                      .listener(myJobListener())
                      .build();

          }


          @Bean
          public Step stepNew(StepBuilderFactory stepBuilderFactory, MyBatisCursorItemReader<BlogInfo> itemReaderNew,
                              ItemWriter<BlogInfo> writerNew, ItemProcessor<BlogInfo, BlogInfo> processorNew){
              return stepBuilderFactory
                      .get("stepNew")
                      .<BlogInfo, BlogInfo>chunk(65000) // Chunk的機(jī)制(即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫入操作)
                      .reader(itemReaderNew).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(10)
                      .listener(new MyReadListener())
                      .processor(processorNew)
                      .writer(writerNew).faultTolerant().skip(Exception.class).skipLimit(2)
                      .listener(new MyWriteListener())
                      .build();

          }

          @Bean
          public ItemProcessor<BlogInfo, BlogInfo> processorNew(){
              MyItemProcessorNew csvItemProcessor = new MyItemProcessorNew();
              // 設(shè)置校驗(yàn)器
              csvItemProcessor.setValidator(myBeanValidator());
              return csvItemProcessor;
          }



          @Autowired
          private SqlSessionFactory sqlSessionFactory;

          @Bean
          @StepScope
          //Spring Batch提供了一個特殊的bean scope類(StepScope:作為一個自定義的Spring bean scope)。這個step scope的作用是連接batches的各個steps。這個機(jī)制允許配置在Spring的beans當(dāng)steps開始時(shí)才實(shí)例化并且允許你為這個step指定配置和參數(shù)。
          public MyBatisCursorItemReader<BlogInfo> itemReaderNew(@Value("#{jobParameters[authorId]}") String authorId) {

                  System.out.println("開始查詢數(shù)據(jù)庫");

                  MyBatisCursorItemReader<BlogInfo> reader = new MyBatisCursorItemReader<>();

                  reader.setQueryId("com.example.batchdemo.mapper.BlogMapper.queryInfoById");

                  reader.setSqlSessionFactory(sqlSessionFactory);
                   Map<String , Object> map = new HashMap<>();

                    map.put("authorId" , Integer.valueOf(authorId));
                   reader.setParameterValues(map);
                  return reader;
          }

          /**
           * ItemWriter定義:指定datasource,設(shè)置批量插入sql語句,寫入數(shù)據(jù)庫
           * @param dataSource
           * @return
           */
          @Bean
          public ItemWriter<BlogInfo> writerNew(DataSource dataSource){
              // 使用jdbcBcatchItemWrite寫數(shù)據(jù)到數(shù)據(jù)庫中
              JdbcBatchItemWriter<BlogInfo> writer = new JdbcBatchItemWriter<>();
              // 設(shè)置有參數(shù)的sql語句
              writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<BlogInfo>());
              String sql = "insert into bloginfonew "+" (blogAuthor,blogUrl,blogTitle,blogItem) "
                      +" values(:blogAuthor,:blogUrl,:blogTitle,:blogItem)";
              writer.setSql(sql);
              writer.setDataSource(dataSource);
              return writer;
          }

          代碼需要注意的點(diǎn)

          數(shù)據(jù)讀取器 MyBatisCursorItemReader

          圖片

          對應(yīng)的mapper方法:

          圖片

          數(shù)據(jù)處理器 MyItemProcessorNew:

          圖片

          數(shù)據(jù)輸出器,新插入到別的數(shù)據(jù)庫表去,特意這樣為了測試:

          圖片

          當(dāng)然我們的數(shù)據(jù)庫為了測試這個場景,也是新建了一張表,bloginfonew 表。

          圖片

          接下來,我們新寫一個接口來執(zhí)行新的這個job:

          圖片
          @Autowired
          SimpleJobLauncher jobLauncher;

          @Autowired
          Job myJobNew;

          @GetMapping("testJobNew")
          public  void testJobNew(@RequestParam("authorId") String authorId) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {

              JobParameters jobParametersNew = new JobParametersBuilder().addLong("timeNew", System.currentTimeMillis())
                      .addString("authorId",authorId)
                      .toJobParameters();
              jobLauncher.run(myJobNew,jobParametersNew);

          }

          ok,我們來調(diào)用一些這個接口:

          圖片

          看下控制臺:

          圖片

          沒錯,這就是失敗的,原因是因?yàn)楦鷇ruid有關(guān),報(bào)了一個數(shù)據(jù)庫功能不支持。這是在數(shù)據(jù)讀取的時(shí)候報(bào)的錯。

          我初步測試認(rèn)為是MyBatisCursorItemReader ,druid 數(shù)據(jù)庫連接池不支持。

          那么,我們只需要:

          1. 注釋掉druid連接池 jar依賴

          圖片
          1. yml里替換連接池配置

          圖片

          其實(shí)我們不配置其他連接池,springboot 2.X 版本已經(jīng)為我們整合了默認(rèn)的連接池 HikariCP 。

          在Springboot2.X版本,數(shù)據(jù)庫的連接池官方推薦使用HikariCP

          如果不是為了druid的那些后臺監(jiān)控?cái)?shù)據(jù),sql分析等等,完全是優(yōu)先使用HikariCP的。

          官方的原話:

          We preferHikariCPfor its performance and concurrency. If HikariCP is available, we always choose it.

          翻譯:

          我們更喜歡hikaricpf的性能和并發(fā)性。如果有HikariCP,我們總是選擇它。

          所以我們就啥連接池也不配了,使用默認(rèn)的HikariCP 連接池。

          當(dāng)然你想配,也是可以的:

          圖片

          所以我們剔除掉druid鏈接池后,我們再來調(diào)用一下新接口:

          圖片

          可以看到,從數(shù)據(jù)庫獲取數(shù)據(jù)并進(jìn)行批次處理寫入job是成功的:

          圖片

          新的表里面插入的數(shù)據(jù)都進(jìn)行了自己寫的邏輯處理:

          圖片

          好了,springboot 整合 spring batch 批處理框架, 就到此吧。

              

          程序汪接私活項(xiàng)目目錄,2023年總結(jié)

          Java項(xiàng)目分享  最新整理全集,找項(xiàng)目不累啦 07版

          程序汪10萬接的無線共享充電寶項(xiàng)目,開發(fā)周期3個月

          程序汪1萬接的企業(yè)官網(wǎng)項(xiàng)目,開發(fā)周期15天

          程序汪8萬接的共享口罩項(xiàng)目,開發(fā)周期1個月

          程序汪8萬塊的飲水機(jī)物聯(lián)網(wǎng)私活項(xiàng)目經(jīng)驗(yàn)分享

          程序汪接的4萬智慧餐飲項(xiàng)目

          程序汪8萬接的自助洗車小程序
          程序汪9萬接的無人自助洗寵物機(jī)項(xiàng)目,開發(fā)周期40天


          歡迎添加程序汪個人微信 itwang005  進(jìn)粉絲群或圍觀朋友圈

          瀏覽 348
          2點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          2點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  婷婷免费| 操鼻免费素材网站 | 亚洲无码观看视频 | 91精品国久久久久久无码一区二区三区 | 青青草三级片 |