SpringBoot + SpringBatch + Quartz整合定時(shí)批量任務(wù)
點(diǎn)擊關(guān)注公眾號(hào),Java干貨及時(shí)送達(dá)??

來源:blog.csdn.net/zxd1435513775/article/
details/99677223
一、引言
最近一周,被借調(diào)到其他部門,趕一個(gè)緊急需求,需求內(nèi)容如下:
PC網(wǎng)頁觸發(fā)一條設(shè)備升級(jí)記錄(下圖),后臺(tái)要定時(shí)批量設(shè)備更新。這里定時(shí)要用到Quartz,批量數(shù)據(jù)處理要用到SpringBatch,二者結(jié)合,可以完成該需求。
由于之前,沒有用過SpringBatch,于是上網(wǎng)查了下資料,發(fā)現(xiàn)可參考的不是很多,于是只能去慢慢的翻看官方文檔。
https://docs.spring.io/spring-batch/4.1.x/reference/html/
遇到不少問題,就記錄一下吧。

二、代碼具體實(shí)現(xiàn)
1、pom文件
??<dependencies>
????<dependency>
??????<groupId>org.springframework.bootgroupId>
??????<artifactId>spring-boot-starter-webartifactId>
????dependency>
????<dependency>
??????<groupId>org.postgresqlgroupId>
??????<artifactId>postgresqlartifactId>
????dependency>
????<dependency>
??????<groupId>org.springframework.bootgroupId>
??????<artifactId>spring-boot-starter-jdbcartifactId>
????dependency>
????<dependency>
??????<groupId>org.springframework.bootgroupId>
??????<artifactId>spring-boot-starter-batchartifactId>
????dependency>
????<dependency>
??????<groupId>org.projectlombokgroupId>
??????<artifactId>lombokartifactId>
????dependency>
????<dependency>
??????<groupId>org.springframework.bootgroupId>
??????<artifactId>spring-boot-starter-batchartifactId>
????dependency>
???dependencies>
2、application.yaml文件
spring:
??datasource:
????username:?thinklink
????password:?thinklink
????url:?jdbc:postgresql://172.16.205.54:5432/thinklink
????driver-class-name:?org.postgresql.Driver
??batch:
????job:
??????enabled:?false
server:
??port:?8073
#upgrade-dispatch-base-url:?http://172.16.205.125:8080/api/rpc/dispatch/command/
upgrade-dispatch-base-url:?http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/
#?每次批量處理的數(shù)據(jù)量,默認(rèn)為5000
batch-size:?5000
3、Service實(shí)現(xiàn)類,觸發(fā)批處理任務(wù)的入口,執(zhí)行一個(gè)job
@Service("batchService")
public?class?BatchServiceImpl?implements?BatchService?{
?//?框架自動(dòng)注入
????@Autowired
????private?JobLauncher?jobLauncher;
????@Autowired
????private?Job?updateDeviceJob;
????/**
?????*?根據(jù)?taskId?創(chuàng)建一個(gè)Job
?????*?@param?taskId
?????*?@throws?Exception
?????*/
????@Override
????public?void?createBatchJob(String?taskId)?throws?Exception?{
????????JobParameters?jobParameters?=?new?JobParametersBuilder()
????????????????.addString("taskId",?taskId)
????????????????.addString("uuid",?UUID.randomUUID().toString().replace("-",""))
????????????????.toJobParameters();
????????//?傳入一個(gè)Job任務(wù)和任務(wù)需要的參數(shù)
????????jobLauncher.run(updateDeviceJob,?jobParameters);
????}
}
4、SpringBatch配置類,此部分最重要(☆☆☆☆☆)
@Configuration
public?class?BatchConfiguration?{
????private?static?final?Logger?log?=?LoggerFactory.getLogger(BatchConfiguration.class);
????@Value("${batch-size:5000}")
????private?int?batchSize;
?//?框架自動(dòng)注入
????@Autowired
????public?JobBuilderFactory?jobBuilderFactory;
?//?框架自動(dòng)注入
????@Autowired
????public?StepBuilderFactory?stepBuilderFactory;
?//?數(shù)據(jù)過濾器,對(duì)從數(shù)據(jù)庫讀出來的數(shù)據(jù),注意進(jìn)行操作
????@Autowired
????public?TaskItemProcessor?taskItemProcessor;
????//?接收job參數(shù)
????public?Map?parameters;
????public?Object?taskId;
????@Autowired
????private?JdbcTemplate?jdbcTemplate;
?//?讀取數(shù)據(jù)庫操作
????@Bean
????@StepScope
????public?JdbcCursorItemReader?itemReader(DataSource?dataSource)? {
????????String?querySql?=?"?SELECT?"?+
????????????????"?e.?ID?AS?taskId,?"?+
????????????????"?e.user_id?AS?userId,?"?+
????????????????"?e.timing_startup?AS?startTime,?"?+
????????????????"?u.device_id?AS?deviceId,?"?+
????????????????"?d.app_name?AS?appName,?"?+
????????????????"?d.compose_file?AS?composeFile,?"?+
????????????????"?e.failure_retry?AS?failureRetry,?"?+
????????????????"?e.tetry_times?AS?retryTimes,?"?+
????????????????"?e.device_managered?AS?deviceManagered?"?+
????????????????"?FROM?"?+
????????????????"?eiot_upgrade_task?e?"?+
????????????????"?LEFT?JOIN?eiot_upgrade_device?u?ON?e.?ID?=?u.upgrade_task_id?"?+
????????????????"?LEFT?JOIN?eiot_app_detail?d?ON?e.app_id?=?d.?ID?"?+
????????????????"?WHERE?"?+
????????????????"?(?"?+
????????????????"?u.device_upgrade_status?=?0?"?+
????????????????"?OR?u.device_upgrade_status?=?2"?+
????????????????"?)"?+
????????????????"?AND?e.tetry_times?>?u.retry_times?"?+
????????????????"?AND?e.?ID?=??";
????????return?new?JdbcCursorItemReaderBuilder()
????????????????.name("itemReader")
????????????????.sql(querySql)
????????????????.dataSource(dataSource)
????????????????.queryArguments(new?Object[]{parameters.get("taskId").getValue()})
????????????????.rowMapper(new?DispatchRequest.DispatchRequestRowMapper())
????????????????.build();
????}
?//?將結(jié)果寫回?cái)?shù)據(jù)庫
????@Bean
????@StepScope
????public?ItemWriter?itemWriter()? {
????????return?new?ItemWriter()?{
????????????private?int?updateTaskStatus(DispatchRequest?dispatchRequest,?int?status)?{
????????????????log.info("update?taskId:?{},?deviceId:?{}?to?status?{}",?dispatchRequest.getTaskId(),?dispatchRequest.getDeviceId(),?status);
????????????????Integer?retryTimes?=?jdbcTemplate.queryForObject(
????????????????????????"select?retry_times?from?eiot_upgrade_device?where?device_id?=???and?upgrade_task_id?=??",
????????????????????????new?Object[]{?dispatchRequest.getDeviceId(),?dispatchRequest.getTaskId()},?Integer.class
????????????????);
????????????????retryTimes?+=?1;
????????????????int?updateCount?=?jdbcTemplate.update("update?eiot_upgrade_device?set?device_upgrade_status?=??,?retry_times?=???"?+
????????????????????????"where?device_id?=???and?upgrade_task_id?=??",?status,?retryTimes,?dispatchRequest.getDeviceId(),?dispatchRequest.getTaskId());
????????????????if?(updateCount?<=?0)?{
????????????????????log.warn("no?task?updated");
????????????????}?else?{
????????????????????log.info("count?of?{}?task?updated",?updateCount);
????????????????}
????????????????//?最后一次重試
????????????????if?(status?==?STATUS_DISPATCH_FAILED?&&?retryTimes?==?dispatchRequest.getRetryTimes())?{
????????????????????log.info("the?last?retry?of?{}?failed,?inc?deviceManagered",?dispatchRequest.getTaskId());
????????????????????return?1;
????????????????}?else?{
????????????????????return?0;
????????????????}
????????????}
????????????@Override
????????????@Transactional
????????????public?void?write(List?extends?ProcessResult>?list)?throws?Exception?{
????????????????Map?taskMap?=?jdbcTemplate.queryForMap(
????????????????????????"select?device_managered,?device_count,?task_status?from?eiot_upgrade_task?where?id?=??",
????????????????????????list.get(0).getDispatchRequest().getTaskId()?//?我們認(rèn)定一個(gè)批量里面,taskId都是一樣的
????????????????????????);
????????????????int?deviceManagered?=?(int)taskMap.get("device_managered");
????????????????Integer?deviceCount?=?(Integer)?taskMap.get("device_count");
????????????????if?(deviceCount?==?null)?{
????????????????????log.warn("deviceCount?of?task?{}?is?null",?list.get(0).getDispatchRequest().getTaskId());
????????????????}
????????????????int?taskStatus?=?(int)taskMap.get("task_status");
????????????????for?(ProcessResult?result:?list)?{
????????????????????deviceManagered?+=?updateTaskStatus(result.getDispatchRequest(),?result.getStatus());
????????????????}
????????????????if?(deviceCount?!=?null?&&?deviceManagered?==?deviceCount)?{
????????????????????taskStatus?=?2;?//任務(wù)狀態(tài)?0:待升級(jí),1:升級(jí)中,2:已完成
????????????????}
????????????????jdbcTemplate.update("update?eiot_upgrade_task??set?device_managered?=??,?task_status?=???"?+
????????????????????????"where?id?=??",?deviceManagered,?taskStatus,?list.get(0).getDispatchRequest().getTaskId());
????????????}
????????};
????}
????/**
?????*?定義一個(gè)下發(fā)更新的?job
?????*?@return
?????*/
????@Bean
????public?Job?updateDeviceJob(Step?updateDeviceStep)?{
????????return?jobBuilderFactory.get(UUID.randomUUID().toString().replace("-",?""))
????????????????.listener(new?JobListener())?//?設(shè)置Job的監(jiān)聽器
????????????????.flow(updateDeviceStep)//?執(zhí)行下發(fā)更新的Step
????????????????.end()
????????????????.build();
????}
????/**
?????*?定義一個(gè)下發(fā)更新的?step
?????*?@return
?????*/
????@Bean
????public?Step?updateDeviceStep(JdbcCursorItemReader?itemReader,ItemWriter?itemWriter) ?{
????????return?stepBuilderFactory.get(UUID.randomUUID().toString().replace("-",?""))
????????????????.?chunk(batchSize)
????????????????.reader(itemReader)?//根據(jù)taskId從數(shù)據(jù)庫讀取更新設(shè)備信息
????????????????.processor(taskItemProcessor)?//?每條更新信息,執(zhí)行下發(fā)更新接口
????????????????.writer(itemWriter)
????????????????.build();
????}
????//?job?監(jiān)聽器
????public?class?JobListener?implements?JobExecutionListener?{
????????@Override
????????public?void?beforeJob(JobExecution?jobExecution)?{
????????????log.info(jobExecution.getJobInstance().getJobName()?+?"?before...?");
????????????parameters?=?jobExecution.getJobParameters().getParameters();
????????????taskId?=?parameters.get("taskId").getValue();
????????????log.info("job?param?taskId?:?"?+?parameters.get("taskId"));
????????}
????????@Override
????????public?void?afterJob(JobExecution?jobExecution)?{
????????????log.info(jobExecution.getJobInstance().getJobName()?+?"?after...?");
????????????//?當(dāng)所有job執(zhí)行完之后,查詢?cè)O(shè)備更新狀態(tài),如果有失敗,則要定時(shí)重新執(zhí)行job
????????????String?sql?=?"?SELECT?"?+
????????????????????"?count(*)?"?+
????????????????????"?FROM?"?+
????????????????????"?eiot_upgrade_device?d?"?+
????????????????????"?LEFT?JOIN?eiot_upgrade_task?u?ON?d.upgrade_task_id?=?u.?ID?"?+
????????????????????"?WHERE?"?+
????????????????????"?u.?ID?=???"?+
????????????????????"?AND?d.retry_times??+
????????????????????"?AND?(?"?+
????????????????????"?d.device_upgrade_status?=?0?"?+
????????????????????"?OR?d.device_upgrade_status?=?2?"?+
????????????????????"?)?";
????????????//?獲取更新失敗的設(shè)備個(gè)數(shù)
????????????Integer?count?=?jdbcTemplate.queryForObject(sql,?new?Object[]{taskId},?Integer.class);
????????????log.info("update?device?failure?count?:?"?+?count);
????????????//?下面是使用Quartz觸發(fā)定時(shí)任務(wù)
????????????//?獲取任務(wù)時(shí)間,單位秒
//????????????String?time?=?jdbcTemplate.queryForObject(sql,?new?Object[]{taskId},?Integer.class);
????????????//?此處方便測(cè)試,應(yīng)該從數(shù)據(jù)庫中取taskId對(duì)應(yīng)的重試間隔,單位秒
????????????Integer?millSecond?=?10;
????????????if(count?!=?null?&&?count?>?0){
????????????????String?jobName?=?"UpgradeTask_"?+?taskId;
????????????????String?reTaskId?=?taskId.toString();
????????????????Map?params?=?new?HashMap<>();
????????????????params.put("jobName",jobName);
????????????????params.put("taskId",reTaskId);
????????????????if?(QuartzManager.checkNameNotExist(jobName))
????????????????{
????????????????????QuartzManager.scheduleRunOnceJob(jobName,?RunOnceJobLogic.class,params,millSecond);
????????????????}
????????????}
????????}
????}
}
5、Processor,處理每條數(shù)據(jù),可以在此對(duì)數(shù)據(jù)進(jìn)行過濾操作
@Component("taskItemProcessor")
public?class?TaskItemProcessor?implements?ItemProcessor<DispatchRequest,?ProcessResult>?{
????public?static?final?int?STATUS_DISPATCH_FAILED?=?2;
????public?static?final?int?STATUS_DISPATCH_SUCC?=?1;
????private?static?final?Logger?log?=?LoggerFactory.getLogger(TaskItemProcessor.class);
????@Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
????private?String?dispatchUrl;
????@Autowired
????JdbcTemplate?jdbcTemplate;
????/**
?????*?在這里,執(zhí)行?下發(fā)更新指令?的操作
?????*?@param?dispatchRequest
?????*?@return
?????*?@throws?Exception
?????*/
????@Override
????public?ProcessResult?process(final?DispatchRequest?dispatchRequest)?{
????????//?調(diào)用接口,下發(fā)指令
????????String?url?=?dispatchUrl?+?dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();
????????log.info("request?url:"?+?url);
????????RestTemplate?restTemplate?=?new?RestTemplate();
????????HttpHeaders?headers?=?new?HttpHeaders();
????????headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
????????MultiValueMap?params?=?new?LinkedMultiValueMap();
????????JSONObject?jsonOuter?=?new?JSONObject();
????????JSONObject?jsonInner?=?new?JSONObject();
????????try?{
????????????jsonInner.put("jobId",dispatchRequest.getTaskId());
????????????jsonInner.put("name",dispatchRequest.getName());
????????????jsonInner.put("composeFile",?Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
????????????jsonInner.put("policy",new?JSONObject().put("startTime",dispatchRequest.getPolicy()));
????????????jsonInner.put("timestamp",dispatchRequest.getTimestamp());
????????????jsonOuter.put("method","updateApp");
????????????jsonOuter.put("params",jsonInner);
????????}?catch?(JSONException?e)?{
????????????log.info("JSON?convert?Exception?:"?+?e);
????????}catch?(IOException?e)?{
????????????log.info("Base64Util?bytesToBase64Str?:"?+?e);
????????}
????????log.info("request?body?json?:"?+?jsonOuter);
????????HttpEntity?requestEntity?=?new?HttpEntity(jsonOuter.toString(),headers);
????????int?status;
????????try?{
????????????ResponseEntity?response?=?restTemplate.postForEntity(url,requestEntity,String.class);
????????????log.info("response?:"?+?response);
????????????if?(response.getStatusCode()?==?HttpStatus.OK)?{
????????????????status?=?STATUS_DISPATCH_SUCC;
????????????}?else?{
????????????????status?=?STATUS_DISPATCH_FAILED;
????????????}
????????}catch?(Exception?e){
????????????status?=?STATUS_DISPATCH_FAILED;
????????}
????????return?new?ProcessResult(dispatchRequest,?status);
????}
}
6、封裝數(shù)據(jù)庫返回?cái)?shù)據(jù)的實(shí)體Bean,注意靜態(tài)內(nèi)部類
public?class?DispatchRequest?{
????private?String?taskId;
????private?String?deviceId;
????private?String?userId;
????private?String?name;
????private?byte[]?composeFile;
????private?String?policy;
????private?String?timestamp;
????private?String?md5;
????private?int?failureRetry;
????private?int?retryTimes;
????private?int?deviceManagered;
???//?省略構(gòu)造函數(shù),setter/getter/tostring方法
???//......
???
????public?static?class?DispatchRequestRowMapper?implements?RowMapper<DispatchRequest>?{
????????@Override
????????public?DispatchRequest?mapRow(ResultSet?resultSet,?int?i)?throws?SQLException?{
????????????DispatchRequest?dispatchRequest?=?new?DispatchRequest();
????????????dispatchRequest.setTaskId(resultSet.getString("taskId"));
????????????dispatchRequest.setUserId(resultSet.getString("userId"));
????????????dispatchRequest.setPolicy(resultSet.getString("startTime"));
????????????dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
????????????dispatchRequest.setName(resultSet.getString("appName"));
????????????dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
????????????dispatchRequest.setTimestamp(DateUtil.DateToString(new?Date()));
????????????dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
????????????dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
????????????dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
????????????return?dispatchRequest;
????????}
????}
}
7、啟動(dòng)類上要加上注解
@SpringBootApplication
@EnableBatchProcessing
public?class?Application?{
????public?static?void?main(String[]?args)?{
????????SpringApplication.run(Application.class,?args);
????}
}
三、小結(jié)
其實(shí)SpringBatch并沒有想象中那么好用,當(dāng)從數(shù)據(jù)庫中每次取5000條數(shù)據(jù)后,進(jìn)入processor中是逐條處理的,這個(gè)時(shí)候不能不行操作,等5000條數(shù)據(jù)處理完之后,再一次性執(zhí)行ItemWriter方法。
在使用的過程中,最坑的地方是ItemReader和ItemWriter這兩個(gè)地方,如何執(zhí)行自定義的Sql,參考文中代碼就行。
至于Quartz定時(shí)功能,很簡(jiǎn)單,只要定時(shí)創(chuàng)建SpringBatch里面的Job,讓這個(gè)job啟動(dòng)就好了,此處就不在給出了,貼的代碼太多了。由于公司一些原因,代碼不能放到GitHub上。
1.?公司架構(gòu)師常常提起的DNS負(fù)載均衡是個(gè)什么鬼?
2.?喜馬拉雅自研網(wǎng)關(guān)架構(gòu)實(shí)踐
最近面試BAT,整理一份面試資料《Java面試BATJ通關(guān)手冊(cè)》,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。
獲取方式:點(diǎn)“在看”,關(guān)注公眾號(hào)并回復(fù)?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。
文章有幫助的話,在看,轉(zhuǎn)發(fā)吧。
謝謝支持喲 (*^__^*)

