<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          SpringBoot + SpringBatch + Quartz整合定時批量任務(wù)

          共 4953字,需瀏覽 10分鐘

           ·

          2022-01-10 21:45

          點擊上方藍色字體,選擇“標(biāo)星公眾號”

          優(yōu)質(zhì)文章,第一時間送達


          一、引言

          最近一周,被借調(diào)到其他部門,趕一個緊急需求,需求內(nèi)容如下:

          PC網(wǎng)頁觸發(fā)一條設(shè)備升級記錄(下圖),后臺要定時批量設(shè)備更新。這里定時要用到Quartz,批量數(shù)據(jù)處理要用到SpringBatch,二者結(jié)合,可以完成該需求。

          由于之前,沒有用過SpringBatch,于是上網(wǎng)查了下資料,發(fā)現(xiàn)可參考的不是很多,于是只能去慢慢的翻看官方文檔。

          https://docs.spring.io/spring-batch/4.1.x/reference/html/

          遇到不少問題,就記錄一下吧。

          推薦下自己做的 Spring Boot 的實戰(zhàn)項目:

          https://github.com/YunaiV/ruoyi-vue-pro

          二、代碼具體實現(xiàn)

          1、pom文件

          ??<dependencies>
          ????<dependency>
          ??????<groupId>org.springframework.bootgroupId>
          ??????<artifactId>spring-boot-starter-webartifactId>
          ????dependency>
          ????<dependency>
          ??????<groupId>org.postgresqlgroupId>
          ??????<artifactId>postgresqlartifactId>
          ????dependency>
          ????<dependency>
          ??????<groupId>org.springframework.bootgroupId>
          ??????<artifactId>spring-boot-starter-jdbcartifactId>
          ????dependency>
          ????<dependency>
          ??????<groupId>org.springframework.bootgroupId>
          ??????<artifactId>spring-boot-starter-batchartifactId>
          ????dependency>
          ????<dependency>
          ??????<groupId>org.projectlombokgroupId>
          ??????<artifactId>lombokartifactId>
          ????dependency>
          ????<dependency>
          ??????<groupId>org.springframework.bootgroupId>
          ??????<artifactId>spring-boot-starter-batchartifactId>
          ????dependency>
          ???dependencies>

          2、application.yaml文件

          spring:
          ??datasource:
          ????username:?thinklink
          ????password:?thinklink
          ????url:?jdbc:postgresql://172.16.205.54:5432/thinklink
          ????driver-class-name:?org.postgresql.Driver
          ??batch:
          ????job:
          ??????enabled:?false
          server:
          ??port:?8073

          #upgrade-dispatch-base-url:?http://172.16.205.125:8080/api/rpc/dispatch/command/
          upgrade-dispatch-base-url:?http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/


          >?推薦下自己做的?Spring?Cloud?的實戰(zhàn)項目:
          >
          >?

          #?每次批量處理的數(shù)據(jù)量,默認為5000
          batch-size:?5000

          3、Service實現(xiàn)類,觸發(fā)批處理任務(wù)的入口,執(zhí)行一個job

          @Service("batchService")
          public?class?BatchServiceImpl?implements?BatchService?{

          ?//?框架自動注入
          ????@Autowired
          ????private?JobLauncher?jobLauncher;
          ????@Autowired
          ????private?Job?updateDeviceJob;
          ????/**
          ?????*?根據(jù)?taskId?創(chuàng)建一個Job
          ?????*?@param?taskId
          ?????*?@throws?Exception
          ?????*/

          ????@Override
          ????public?void?createBatchJob(String?taskId)?throws?Exception?{
          ????????JobParameters?jobParameters?=?new?JobParametersBuilder()
          ????????????????.addString("taskId",?taskId)
          ????????????????.addString("uuid",?UUID.randomUUID().toString().replace("-",""))
          ????????????????.toJobParameters();
          ????????//?傳入一個Job任務(wù)和任務(wù)需要的參數(shù)
          ????????jobLauncher.run(updateDeviceJob,?jobParameters);
          ????}
          }

          4、SpringBatch配置類,此部分最重要(☆☆☆☆☆)

          @Configuration
          public?class?BatchConfiguration?{

          ????private?static?final?Logger?log?=?LoggerFactory.getLogger(BatchConfiguration.class);

          ????@Value("${batch-size:5000}")
          ????private?int?batchSize;

          ?//?框架自動注入
          ????@Autowired
          ????public?JobBuilderFactory?jobBuilderFactory;

          ?//?框架自動注入
          ????@Autowired
          ????public?StepBuilderFactory?stepBuilderFactory;

          ?//?數(shù)據(jù)過濾器,對從數(shù)據(jù)庫讀出來的數(shù)據(jù),注意進行操作
          ????@Autowired
          ????public?TaskItemProcessor?taskItemProcessor;

          ????//?接收job參數(shù)
          ????public?Map?parameters;

          ????public?Object?taskId;

          ????@Autowired
          ????private?JdbcTemplate?jdbcTemplate;

          ?//?讀取數(shù)據(jù)庫操作
          ????@Bean
          ????@StepScope
          ????public?JdbcCursorItemReader?itemReader(DataSource?dataSource)?{

          ????????String?querySql?=?"?SELECT?"?+
          ????????????????"?e.?ID?AS?taskId,?"?+
          ????????????????"?e.user_id?AS?userId,?"?+
          ????????????????"?e.timing_startup?AS?startTime,?"?+
          ????????????????"?u.device_id?AS?deviceId,?"?+
          ????????????????"?d.app_name?AS?appName,?"?+
          ????????????????"?d.compose_file?AS?composeFile,?"?+
          ????????????????"?e.failure_retry?AS?failureRetry,?"?+
          ????????????????"?e.tetry_times?AS?retryTimes,?"?+
          ????????????????"?e.device_managered?AS?deviceManagered?"?+
          ????????????????"?FROM?"?+
          ????????????????"?eiot_upgrade_task?e?"?+
          ????????????????"?LEFT?JOIN?eiot_upgrade_device?u?ON?e.?ID?=?u.upgrade_task_id?"?+
          ????????????????"?LEFT?JOIN?eiot_app_detail?d?ON?e.app_id?=?d.?ID?"?+
          ????????????????"?WHERE?"?+
          ????????????????"?(?"?+
          ????????????????"?u.device_upgrade_status?=?0?"?+
          ????????????????"?OR?u.device_upgrade_status?=?2"?+
          ????????????????"?)"?+
          ????????????????"?AND?e.tetry_times?>?u.retry_times?"?+
          ????????????????"?AND?e.?ID?=??";

          ????????return?new?JdbcCursorItemReaderBuilder()
          ????????????????.name("itemReader")
          ????????????????.sql(querySql)
          ????????????????.dataSource(dataSource)
          ????????????????.queryArguments(new?Object[]{parameters.get("taskId").getValue()})
          ????????????????.rowMapper(new?DispatchRequest.DispatchRequestRowMapper())
          ????????????????.build();
          ????}

          ?//?將結(jié)果寫回數(shù)據(jù)庫
          ????@Bean
          ????@StepScope
          ????public?ItemWriter?itemWriter()?{
          ????????return?new?ItemWriter()?{

          ????????????private?int?updateTaskStatus(DispatchRequest?dispatchRequest,?int?status)?{
          ????????????????log.info("update?taskId:?{},?deviceId:?{}?to?status?{}",?dispatchRequest.getTaskId(),?dispatchRequest.getDeviceId(),?status);

          ????????????????Integer?retryTimes?=?jdbcTemplate.queryForObject(
          ????????????????????????"select?retry_times?from?eiot_upgrade_device?where?device_id?=???and?upgrade_task_id?=??",
          ????????????????????????new?Object[]{?dispatchRequest.getDeviceId(),?dispatchRequest.getTaskId()},?Integer.class
          ????????????????)
          ;
          ????????????????retryTimes?+=?1;
          ????????????????int?updateCount?=?jdbcTemplate.update("update?eiot_upgrade_device?set?device_upgrade_status?=??,?retry_times?=???"?+
          ????????????????????????"where?device_id?=???and?upgrade_task_id?=??",?status,?retryTimes,?dispatchRequest.getDeviceId(),?dispatchRequest.getTaskId());
          ????????????????if?(updateCount?<=?0)?{
          ????????????????????log.warn("no?task?updated");
          ????????????????}?else?{
          ????????????????????log.info("count?of?{}?task?updated",?updateCount);
          ????????????????}

          ????????????????//?最后一次重試
          ????????????????if?(status?==?STATUS_DISPATCH_FAILED?&&?retryTimes?==?dispatchRequest.getRetryTimes())?{
          ????????????????????log.info("the?last?retry?of?{}?failed,?inc?deviceManagered",?dispatchRequest.getTaskId());
          ????????????????????return?1;
          ????????????????}?else?{
          ????????????????????return?0;
          ????????????????}
          ????????????}

          ????????????@Override
          ????????????@Transactional
          ????????????public?void?write(List?list)?throws?Exception?{
          ????????????????Map?taskMap?=?jdbcTemplate.queryForMap(
          ????????????????????????"select?device_managered,?device_count,?task_status?from?eiot_upgrade_task?where?id?=??",
          ????????????????????????list.get(0).getDispatchRequest().getTaskId()?//?我們認定一個批量里面,taskId都是一樣的
          ????????????????????????);
          ????????????????int?deviceManagered?=?(int)taskMap.get("device_managered");
          ????????????????Integer?deviceCount?=?(Integer)?taskMap.get("device_count");
          ????????????????if?(deviceCount?==?null)?{
          ????????????????????log.warn("deviceCount?of?task?{}?is?null",?list.get(0).getDispatchRequest().getTaskId());
          ????????????????}
          ????????????????int?taskStatus?=?(int)taskMap.get("task_status");
          ????????????????for?(ProcessResult?result:?list)?{
          ????????????????????deviceManagered?+=?updateTaskStatus(result.getDispatchRequest(),?result.getStatus());
          ????????????????}
          ????????????????if?(deviceCount?!=?null?&&?deviceManagered?==?deviceCount)?{
          ????????????????????taskStatus?=?2;?//任務(wù)狀態(tài)?0:待升級,1:升級中,2:已完成
          ????????????????}
          ????????????????jdbcTemplate.update("update?eiot_upgrade_task??set?device_managered?=??,?task_status?=???"?+
          ????????????????????????"where?id?=??",?deviceManagered,?taskStatus,?list.get(0).getDispatchRequest().getTaskId());
          ????????????}
          ????????};
          ????}

          ????/**
          ?????*?定義一個下發(fā)更新的?job
          ?????*?@return
          ?????*/

          ????@Bean
          ????public?Job?updateDeviceJob(Step?updateDeviceStep)?{
          ????????return?jobBuilderFactory.get(UUID.randomUUID().toString().replace("-",?""))
          ????????????????.listener(new?JobListener())?//?設(shè)置Job的監(jiān)聽器
          ????????????????.flow(updateDeviceStep)//?執(zhí)行下發(fā)更新的Step
          ????????????????.end()
          ????????????????.build();
          ????}

          ????/**
          ?????*?定義一個下發(fā)更新的?step
          ?????*?@return
          ?????*/

          ????@Bean
          ????public?Step?updateDeviceStep(JdbcCursorItemReader?itemReader,ItemWriter?itemWriter)?{
          ????????return?stepBuilderFactory.get(UUID.randomUUID().toString().replace("-",?""))
          ????????????????.?chunk(batchSize)
          ????????????????.reader(itemReader)?//根據(jù)taskId從數(shù)據(jù)庫讀取更新設(shè)備信息
          ????????????????.processor(taskItemProcessor)?//?每條更新信息,執(zhí)行下發(fā)更新接口
          ????????????????.writer(itemWriter)
          ????????????????.build();
          ????}

          ????//?job?監(jiān)聽器
          ????public?class?JobListener?implements?JobExecutionListener?{

          ????????@Override
          ????????public?void?beforeJob(JobExecution?jobExecution)?{
          ????????????log.info(jobExecution.getJobInstance().getJobName()?+?"?before...?");
          ????????????parameters?=?jobExecution.getJobParameters().getParameters();
          ????????????taskId?=?parameters.get("taskId").getValue();
          ????????????log.info("job?param?taskId?:?"?+?parameters.get("taskId"));
          ????????}

          ????????@Override
          ????????public?void?afterJob(JobExecution?jobExecution)?{

          ????????????log.info(jobExecution.getJobInstance().getJobName()?+?"?after...?");
          ????????????//?當(dāng)所有job執(zhí)行完之后,查詢設(shè)備更新狀態(tài),如果有失敗,則要定時重新執(zhí)行job
          ????????????String?sql?=?"?SELECT?"?+
          ????????????????????"?count(*)?"?+
          ????????????????????"?FROM?"?+
          ????????????????????"?eiot_upgrade_device?d?"?+
          ????????????????????"?LEFT?JOIN?eiot_upgrade_task?u?ON?d.upgrade_task_id?=?u.?ID?"?+
          ????????????????????"?WHERE?"?+
          ????????????????????"?u.?ID?=???"?+
          ????????????????????"?AND?d.retry_times??+
          ????????????????????"?AND?(?"?+
          ????????????????????"?d.device_upgrade_status?=?0?"?+
          ????????????????????"?OR?d.device_upgrade_status?=?2?"?+
          ????????????????????"?)?";

          ????????????//?獲取更新失敗的設(shè)備個數(shù)
          ????????????Integer?count?=?jdbcTemplate.queryForObject(sql,?new?Object[]{taskId},?Integer.class);

          ????????????log.info("update?device?failure?count?:?"?+?count);

          ????????????//?下面是使用Quartz觸發(fā)定時任務(wù)
          ????????????//?獲取任務(wù)時間,單位秒
          //????????????String?time?=?jdbcTemplate.queryForObject(sql,?new?Object[]{taskId},?Integer.class);
          ????????????//?此處方便測試,應(yīng)該從數(shù)據(jù)庫中取taskId對應(yīng)的重試間隔,單位秒
          ????????????Integer?millSecond?=?10;

          ????????????if(count?!=?null?&&?count?>?0){
          ????????????????String?jobName?=?"UpgradeTask_"?+?taskId;
          ????????????????String?reTaskId?=?taskId.toString();
          ????????????????Map?params?=?new?HashMap<>();
          ????????????????params.put("jobName",jobName);
          ????????????????params.put("taskId",reTaskId);
          ????????????????if?(QuartzManager.checkNameNotExist(jobName))
          ????????????????{
          ????????????????????QuartzManager.scheduleRunOnceJob(jobName,?RunOnceJobLogic.class,params,millSecond);
          ????????????????}
          ????????????}

          ????????}
          ????}
          }

          5、Processor,處理每條數(shù)據(jù),可以在此對數(shù)據(jù)進行過濾操作

          @Component("taskItemProcessor")
          public?class?TaskItemProcessor?implements?ItemProcessor<DispatchRequest,?ProcessResult>?{

          ????public?static?final?int?STATUS_DISPATCH_FAILED?=?2;
          ????public?static?final?int?STATUS_DISPATCH_SUCC?=?1;

          ????private?static?final?Logger?log?=?LoggerFactory.getLogger(TaskItemProcessor.class);

          ????@Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
          ????private?String?dispatchUrl;

          ????@Autowired
          ????JdbcTemplate?jdbcTemplate;

          ????/**
          ?????*?在這里,執(zhí)行?下發(fā)更新指令?的操作
          ?????*?@param?dispatchRequest
          ?????*?@return
          ?????*?@throws?Exception
          ?????*/

          ????@Override
          ????public?ProcessResult?process(final?DispatchRequest?dispatchRequest)?{
          ????????//?調(diào)用接口,下發(fā)指令
          ????????String?url?=?dispatchUrl?+?dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();

          ????????log.info("request?url:"?+?url);
          ????????RestTemplate?restTemplate?=?new?RestTemplate();
          ????????HttpHeaders?headers?=?new?HttpHeaders();
          ????????headers.setContentType(MediaType.APPLICATION_JSON_UTF8);

          ????????MultiValueMap?params?=?new?LinkedMultiValueMap();

          ????????JSONObject?jsonOuter?=?new?JSONObject();
          ????????JSONObject?jsonInner?=?new?JSONObject();
          ????????try?{
          ????????????jsonInner.put("jobId",dispatchRequest.getTaskId());
          ????????????jsonInner.put("name",dispatchRequest.getName());
          ????????????jsonInner.put("composeFile",?Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
          ????????????jsonInner.put("policy",new?JSONObject().put("startTime",dispatchRequest.getPolicy()));
          ????????????jsonInner.put("timestamp",dispatchRequest.getTimestamp());

          ????????????jsonOuter.put("method","updateApp");
          ????????????jsonOuter.put("params",jsonInner);
          ????????}?catch?(JSONException?e)?{
          ????????????log.info("JSON?convert?Exception?:"?+?e);
          ????????}catch?(IOException?e)?{
          ????????????log.info("Base64Util?bytesToBase64Str?:"?+?e);
          ????????}

          ????????log.info("request?body?json?:"?+?jsonOuter);
          ????????HttpEntity?requestEntity?=?new?HttpEntity(jsonOuter.toString(),headers);
          ????????int?status;
          ????????try?{
          ????????????ResponseEntity?response?=?restTemplate.postForEntity(url,requestEntity,String.class);
          ????????????log.info("response?:"?+?response);
          ????????????if?(response.getStatusCode()?==?HttpStatus.OK)?{
          ????????????????status?=?STATUS_DISPATCH_SUCC;
          ????????????}?else?{
          ????????????????status?=?STATUS_DISPATCH_FAILED;
          ????????????}

          ????????}catch?(Exception?e){
          ????????????status?=?STATUS_DISPATCH_FAILED;
          ????????}

          ????????return?new?ProcessResult(dispatchRequest,?status);
          ????}
          }

          6、封裝數(shù)據(jù)庫返回數(shù)據(jù)的實體Bean,注意靜態(tài)內(nèi)部類

          public?class?DispatchRequest?{

          ????private?String?taskId;
          ????private?String?deviceId;
          ????private?String?userId;
          ????private?String?name;
          ????private?byte[]?composeFile;
          ????private?String?policy;
          ????private?String?timestamp;
          ????private?String?md5;
          ????private?int?failureRetry;
          ????private?int?retryTimes;
          ????private?int?deviceManagered;

          ???//?省略構(gòu)造函數(shù),setter/getter/tostring方法
          ???//......
          ???
          ????public?static?class?DispatchRequestRowMapper?implements?RowMapper<DispatchRequest>?{
          ????????@Override
          ????????public?DispatchRequest?mapRow(ResultSet?resultSet,?int?i)?throws?SQLException?{

          ????????????DispatchRequest?dispatchRequest?=?new?DispatchRequest();
          ????????????dispatchRequest.setTaskId(resultSet.getString("taskId"));
          ????????????dispatchRequest.setUserId(resultSet.getString("userId"));
          ????????????dispatchRequest.setPolicy(resultSet.getString("startTime"));
          ????????????dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
          ????????????dispatchRequest.setName(resultSet.getString("appName"));
          ????????????dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
          ????????????dispatchRequest.setTimestamp(DateUtil.DateToString(new?Date()));
          ????????????dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
          ????????????dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
          ????????????dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
          ????????????return?dispatchRequest;
          ????????}
          ????}
          }

          7、啟動類上要加上注解

          @SpringBootApplication
          @EnableBatchProcessing
          public?class?Application?{

          ????public?static?void?main(String[]?args)?{
          ????????SpringApplication.run(Application.class,?args);
          ????}
          }

          三、小結(jié)

          其實SpringBatch并沒有想象中那么好用,當(dāng)從數(shù)據(jù)庫中每次取5000條數(shù)據(jù)后,進入processor中是逐條處理的,這個時候不能不行操作,等5000條數(shù)據(jù)處理完之后,再一次性執(zhí)行ItemWriter方法。

          在使用的過程中,最坑的地方是ItemReader和ItemWriter這兩個地方,如何執(zhí)行自定義的Sql,參考文中代碼就行。

          至于Quartz定時功能,很簡單,只要定時創(chuàng)建SpringBatch里面的Job,讓這個job啟動就好了,此處就不在給出了,貼的代碼太多了。由于公司一些原因,代碼不能放到GitHub上。



          來源:blog.csdn.net/zxd1435513775/
          article/details/99677223/


          瀏覽 21
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线免费观看日韩一级 | 人人看人人爱人人搞 | 国产美女高潮 | 先锋AV资源网 | 亚洲成人在线影视 |