Spring Batch 批處理框架,真心強(qiáng)啊??!
點擊關(guān)注公眾號,Java干貨及時送達(dá)
牛逼!又發(fā)現(xiàn)了一款面試題庫,太全了??!
(點擊查看)
spring batch簡介
無需用戶交互即可最有效地處理大量信息的自動化,復(fù)雜處理。這些操作通常包括基于時間的事件(例如月末計算,通知或通信)。 在非常大的數(shù)據(jù)集中重復(fù)處理復(fù)雜業(yè)務(wù)規(guī)則的定期應(yīng)用(例如,保險利益確定或費(fèi)率調(diào)整)。 集成從內(nèi)部和外部系統(tǒng)接收的信息,這些信息通常需要以事務(wù)方式格式化,驗證和處理到記錄系統(tǒng)中。批處理用于每天為企業(yè)處理數(shù)十億的交易。
「Spring Batch架構(gòu)介紹」
一個典型的批處理應(yīng)用程序大致如下:
從數(shù)據(jù)庫,文件或隊列中讀取大量記錄。 以某種方式處理數(shù)據(jù)。 以修改之后的形式寫回數(shù)據(jù)。
其對應(yīng)的示意圖如下:

spring batch的一個總體的架構(gòu)如下:

「Spring Batch核心概念介紹」
下面是一些概念是Spring batch框架中的核心概念。
什么是Job
Job和Step是spring batch執(zhí)行批處理任務(wù)最為核心的兩個概念。
/**
* Batch domain object representing a job. Job is an explicit abstraction
* representing the configuration of a job specified by a developer. It should
* be noted that restart policy is applied to the job as a whole and not to a
* step.
*/
public interface Job {
String getName();
boolean isRestartable();
void execute(JobExecution execution);
JobParametersIncrementer getJobParametersIncrementer();
JobParametersValidator getJobParametersValidator();
}
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
這個配置的意思是:首先給這個job起了一個名字叫footballJob,接著指定了這個job的三個step,他們分別由方法,playerLoad,gameLoad, playerSummarization實現(xiàn)。
什么是JobInstance
我們在上文已經(jīng)提到了JobInstance,他是Job的更加底層的一個抽象,他的定義如下:
public interface JobInstance {
/**
* Get unique id for this JobInstance.
* @return instance id
*/
public long getInstanceId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
}
他的方法很簡單,一個是返回Job的id,另一個是返回Job的名字。
JobInstance指的是job運(yùn)行當(dāng)中,作業(yè)執(zhí)行過程當(dāng)中的概念。Instance本就是實例的意思。
什么是JobParameters

因此,我么可以通過Jobparameter來操作正確的JobInstance
什么是JobExecution
JobExecution指的是單次嘗試運(yùn)行一個我們定義好的Job的代碼層面的概念。job的一次執(zhí)行可能以失敗也可能成功。只有當(dāng)執(zhí)行成功完成時,給定的與執(zhí)行相對應(yīng)的JobInstance才也被視為完成。
還是以前面描述的EndOfDay的job作為示例,假設(shè)第一次運(yùn)行01-01-2019的JobInstance結(jié)果是失敗。那么此時如果使用與第一次運(yùn)行相同的Jobparameter參數(shù)(即01-01-2019)作業(yè)參數(shù)再次運(yùn)行,那么就會創(chuàng)建一個對應(yīng)于之前jobInstance的一個新的JobExecution實例,JobInstance仍然只有一個。
JobExecution的接口定義如下:
public interface JobExecution {
/**
* Get unique id for this JobExecution.
* @return execution id
*/
public long getExecutionId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
/**
* Get batch status of this execution.
* @return batch status value.
*/
public BatchStatus getBatchStatus();
/**
* Get time execution entered STARTED status.
* @return date (time)
*/
public Date getStartTime();
/**
* Get time execution entered end status: COMPLETED, STOPPED, FAILED
* @return date (time)
*/
public Date getEndTime();
/**
* Get execution exit status.
* @return exit status.
*/
public String getExitStatus();
/**
* Get time execution was created.
* @return date (time)
*/
public Date getCreateTime();
/**
* Get time execution was last updated updated.
* @return date (time)
*/
public Date getLastUpdatedTime();
/**
* Get job parameters for this execution.
* @return job parameters
*/
public Properties getJobParameters();
}
每一個方法的注釋已經(jīng)解釋的很清楚,這里不再多做解釋。只提一下BatchStatus,JobExecution當(dāng)中提供了一個方法getBatchStatus用于獲取一個job某一次特地執(zhí)行的一個狀態(tài)。BatchStatus是一個代表job狀態(tài)的枚舉類,其定義如下:
public enum BatchStatus {STARTING, STARTED, STOPPING,
STOPPED, FAILED, COMPLETED, ABANDONED }
這些屬性對于一個job的執(zhí)行來說是非常關(guān)鍵的信息,并且spring batch會將他們持久到數(shù)據(jù)庫當(dāng)中. 在使用Spring batch的過程當(dāng)中spring batch會自動創(chuàng)建一些表用于存儲一些job相關(guān)的信息,用于存儲JobExecution的表為batch_job_execution,下面是一個從數(shù)據(jù)庫當(dāng)中截圖的實例:

什么是Step
每一個Step對象都封裝了批處理作業(yè)的一個獨(dú)立的階段。事實上,每一個Job本質(zhì)上都是由一個或多個步驟組成。每一個step包含定義和控制實際批處理所需的所有信息。任何特定的內(nèi)容都由編寫Job的開發(fā)人員自行決定。
一個step可以非常簡單也可以非常復(fù)雜。例如,一個step的功能是將文件中的數(shù)據(jù)加載到數(shù)據(jù)庫中,那么基于現(xiàn)在spring batch的支持則幾乎不需要寫代碼。更復(fù)雜的step可能具有復(fù)雜的業(yè)務(wù)邏輯,這些邏輯作為處理的一部分。
與Job一樣,Step具有與JobExecution類似的StepExecution,如下圖所示:

什么是StepExecution
StepExecution表示一次執(zhí)行Step, 每次運(yùn)行一個Step時都會創(chuàng)建一個新的StepExecution,類似于JobExecution。但是,某個步驟可能由于其之前的步驟失敗而無法執(zhí)行。且僅當(dāng)Step實際啟動時才會創(chuàng)建StepExecution。
一次step執(zhí)行的實例由StepExecution類的對象表示。每個StepExecution都包含對其相應(yīng)步驟的引用以及JobExecution和事務(wù)相關(guān)的數(shù)據(jù),例如提交和回滾計數(shù)以及開始和結(jié)束時間。
此外,每個步驟執(zhí)行都包含一個ExecutionContext,其中包含開發(fā)人員需要在批處理運(yùn)行中保留的任何數(shù)據(jù),例如重新啟動所需的統(tǒng)計信息或狀態(tài)信息。下面是一個從數(shù)據(jù)庫當(dāng)中截圖的實例:

什么是ExecutionContext
ExecutionContext即每一個StepExecution 的執(zhí)行環(huán)境。它包含一系列的鍵值對。我們可以用如下代碼獲取ExecutionContext
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
什么是JobRepository
JobRepository是一個用于將上述job,step等概念進(jìn)行持久化的一個類。它同時給Job和Step以及下文會提到的JobLauncher實現(xiàn)提供CRUD操作。
首次啟動Job時,將從repository中獲取JobExecution,并且在執(zhí)行批處理的過程中,StepExecution和JobExecution將被存儲到repository當(dāng)中。
@EnableBatchProcessing注解可以為JobRepository提供自動配置。
關(guān)注公眾號,學(xué)習(xí)更多 Java 干貨!
什么是JobLauncher
JobLauncher這個接口的功能非常簡單,它是用于啟動指定了JobParameters的Job,為什么這里要強(qiáng)調(diào)指定了JobParameter,原因其實我們在前面已經(jīng)提到了,jobparameter和job一起才能組成一次job的執(zhí)行。下面是代碼實例:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
上面run方法實現(xiàn)的功能是根據(jù)傳入的job以及jobparamaters從JobRepository獲取一個JobExecution并執(zhí)行Job。
什么是Item Reader
ItemReader是一個讀數(shù)據(jù)的抽象,它的功能是為每一個Step提供數(shù)據(jù)輸入。當(dāng)ItemReader以及讀完所有數(shù)據(jù)時,它會返回null來告訴后續(xù)操作數(shù)據(jù)已經(jīng)讀完。Spring Batch為ItemReader提供了非常多的有用的實現(xiàn)類,比如JdbcPagingItemReader,JdbcCursorItemReader等等。
ItemReader支持的讀入的數(shù)據(jù)源也是非常豐富的,包括各種類型的數(shù)據(jù)庫,文件,數(shù)據(jù)流,等等。幾乎涵蓋了我們的所有場景。
下面是一個JdbcPagingItemReader的例子代碼:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
JdbcPagingItemReader必須指定一個PagingQueryProvider,負(fù)責(zé)提供SQL查詢語句來按分頁返回數(shù)據(jù)。
下面是一個JdbcCursorItemReader的例子代碼:
private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
String tenant) {
JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
itemReader.setDataSource(dataSource);
itemReader.setSql("sql here");
itemReader.setRowMapper(new RowMapper());
return itemReader;
}
什么是Item Writer
既然ItemReader是讀數(shù)據(jù)的一個抽象,那么ItemWriter自然就是一個寫數(shù)據(jù)的抽象,它是為每一個step提供數(shù)據(jù)寫出的功能。寫的單位是可以配置的,我們可以一次寫一條數(shù)據(jù),也可以一次寫一個chunk的數(shù)據(jù),關(guān)于chunk下文會有專門的介紹。ItemWriter對于讀入的數(shù)據(jù)是不能做任何操作的。
Spring Batch為ItemWriter也提供了非常多的有用的實現(xiàn)類,當(dāng)然我們也可以去實現(xiàn)自己的writer功能。
什么是Item Processor
ItemProcessor對項目的業(yè)務(wù)邏輯處理的一個抽象, 當(dāng)ItemReader讀取到一條記錄之后,ItemWriter還未寫入這條記錄之前,I我們可以借助temProcessor提供一個處理業(yè)務(wù)邏輯的功能,并對數(shù)據(jù)進(jìn)行相應(yīng)操作。如果我們在ItemProcessor發(fā)現(xiàn)一條數(shù)據(jù)不應(yīng)該被寫入,可以通過返回null來表示。ItemProcessor和ItemReader以及ItemWriter可以非常好的結(jié)合在一起工作,他們之間的數(shù)據(jù)傳輸也非常方便。我們直接使用即可。
chunk 處理流程
spring batch提供了讓我們按照chunk處理數(shù)據(jù)的能力,一個chunk的示意圖如下:

它的意思就和圖示的一樣,由于我們一次batch的任務(wù)可能會有很多的數(shù)據(jù)讀寫操作,因此一條一條的處理并向數(shù)據(jù)庫提交的話效率不會很高,因此spring batch提供了chunk這個概念,我們可以設(shè)定一個chunk size,spring batch 將一條一條處理數(shù)據(jù),但不提交到數(shù)據(jù)庫,只有當(dāng)處理的數(shù)據(jù)數(shù)量達(dá)到chunk size設(shè)定的值得時候,才一起去commit.
java的實例定義代碼如下:

在上面這個step里面,chunk size被設(shè)為了10,當(dāng)ItemReader讀的數(shù)據(jù)數(shù)量達(dá)到10的時候,這一批次的數(shù)據(jù)就一起被傳到itemWriter,同時transaction被提交。
skip策略和失敗處理
一個batch的job的step,可能會處理非常大數(shù)量的數(shù)據(jù),難免會遇到出錯的情況,出錯的情況雖出現(xiàn)的概率較小,但是我們不得不考慮這些情況,因為我們做數(shù)據(jù)遷移最重要的是要保證數(shù)據(jù)的最終一致性。spring batch當(dāng)然也考慮到了這種情況,并且為我們提供了相關(guān)的技術(shù)支持,請看如下bean的配置:

我們需要留意這三個方法,分別是skipLimit(),skip(),noSkip(),
skip方法我們可以指定我們可以跳過的異常,因為有些異常的出現(xiàn),我們是可以忽略的。
那么對于這個step來說,F(xiàn)ileNotFoundException就是一個fatal的exception,拋出這個exception的時候step就會直接fail
「批處理操作指南」
本部分是一些使用spring batch時的值得注意的點
批處理原則
在構(gòu)建批處理解決方案時,應(yīng)考慮以下關(guān)鍵原則和注意事項。
最大限度地減少系統(tǒng)資源的使用,尤其是I / O. 在internal memory中執(zhí)行盡可能多的操作。
查看應(yīng)用程序I / O(分析SQL語句)以確保避免不必要的物理I / O. 特別是,需要尋找以下四個常見缺陷:
當(dāng)數(shù)據(jù)可以被讀取一次并緩存或保存在工作存儲中時,讀取每個事務(wù)的數(shù)據(jù)。 重新讀取先前在同一事務(wù)中讀取數(shù)據(jù)的事務(wù)的數(shù)據(jù)。 導(dǎo)致不必要的表或索引掃描。 未在SQL語句的WHERE子句中指定鍵值。 在批處理運(yùn)行中不要做兩次一樣的事情。例如,如果需要數(shù)據(jù)匯總以用于報告目的,則應(yīng)該(如果可能)在最初處理數(shù)據(jù)時遞增存儲的總計,因此您的報告應(yīng)用程序不必重新處理相同的數(shù)據(jù)。
在批處理應(yīng)用程序開始時分配足夠的內(nèi)存,以避免在此過程中進(jìn)行耗時的重新分配。
總是假設(shè)數(shù)據(jù)完整性最差。插入適當(dāng)?shù)臋z查和記錄驗證以維護(hù)數(shù)據(jù)完整性。
盡可能實施校驗和以進(jìn)行內(nèi)部驗證。例如,對于一個文件里的數(shù)據(jù)應(yīng)該有一個數(shù)據(jù)條數(shù)紀(jì)錄,告訴文件中的記錄總數(shù)以及關(guān)鍵字段的匯總。
在具有真實數(shù)據(jù)量的類似生產(chǎn)環(huán)境中盡早計劃和執(zhí)行壓力測試。
在大批量系統(tǒng)中,數(shù)據(jù)備份可能具有挑戰(zhàn)性,特別是如果系統(tǒng)以24-7在線的情況運(yùn)行。數(shù)據(jù)庫備份通常在在線設(shè)計中得到很好的處理,但文件備份應(yīng)該被視為同樣重要。如果系統(tǒng)依賴于文件,則文件備份過程不僅應(yīng)該到位并記錄在案,還應(yīng)定期進(jìn)行測試。
「如何默認(rèn)不啟動job」
在使用java config使用spring batch的job時,如果不做任何配置,項目在啟動時就會默認(rèn)去跑我們定義好的批處理job。那么如何讓項目在啟動時不自動去跑job呢?
spring batch的job會在項目啟動時自動run,如果我們不想讓他在啟動時run的話,可以在application.properties中添加如下屬性:
spring.batch.job.enabled=false
在讀數(shù)據(jù)時內(nèi)存不夠
在使用spring batch做數(shù)據(jù)遷移時,發(fā)現(xiàn)在job啟動后,執(zhí)行到一定時間點時就卡在一個地方不動了,且log也不再打印,等待一段時間之后,得到如下錯誤:

紅字的信息為:Resource exhaustion event:the JVM was unable to allocate memory from the heap.
翻譯過來的意思就是項目發(fā)出了一個資源耗盡的事件,告訴我們java虛擬機(jī)無法再為堆分配內(nèi)存。
造成這個錯誤的原因是: 這個項目里的batch job的reader是一次性拿回了數(shù)據(jù)庫里的所有數(shù)據(jù),并沒有進(jìn)行分頁,當(dāng)這個數(shù)據(jù)量太大時,就會導(dǎo)致內(nèi)存不夠用。解決的辦法有兩個:
調(diào)整reader讀數(shù)據(jù)邏輯,按分頁讀取,但實現(xiàn)上會麻煩一些,且運(yùn)行效率會下降 增大service內(nèi)存
來源:blog.csdn.net/topdeveloperr/article/details/84337956
如有文章對你有幫助,
歡迎關(guān)注??、點贊??、轉(zhuǎn)發(fā)??!
推薦, Java面試題庫,詳情點擊: 牛逼!又發(fā)現(xiàn)了一款牛逼的Java面試題庫,史上最強(qiáng)! 點擊文末“閱讀原文”可直達(dá)


