SpringBoot+ThreadPoolTaskExecutor 批量插入百萬(wàn)級(jí)數(shù)據(jù)實(shí)測(cè)
前言
開(kāi)發(fā)目的: 提高百萬(wàn)級(jí)數(shù)據(jù)插入效率。 采取方案: 利用 ThreadPoolTaskExecutor多線(xiàn)程批量插入。采用技術(shù): springboot2.1.1+mybatisPlus3.0.6+swagger2.5.0+Lombok1.18.4+postgresql+ThreadPoolTaskExecutor等。
具體實(shí)現(xiàn)細(xì)節(jié)
application-dev.properties添加線(xiàn)程池配置信息
#?異步線(xiàn)程配置
#?配置核心線(xiàn)程數(shù)
async.executor.thread.core_pool_size?=?30
#?配置最大線(xiàn)程數(shù)
async.executor.thread.max_pool_size?=?30
#?配置隊(duì)列大小
async.executor.thread.queue_capacity?=?99988
#?配置線(xiàn)程池中的線(xiàn)程的名稱(chēng)前綴
async.executor.thread.name.prefix?=?async-importDB-
spring容器注入線(xiàn)程池bean對(duì)象
@Configuration
?
@EnableAsync
?
@Slf4j
?
public?class?ExecutorConfig?{
????@Value("${async.executor.thread.core_pool_size}")
????private?int?corePoolSize;
????@Value("${async.executor.thread.max_pool_size}")
????private?int?maxPoolSize;
????@Value("${async.executor.thread.queue_capacity}")
????private?int?queueCapacity;
????@Value("${async.executor.thread.name.prefix}")
????private?String?namePrefix;
?
????@Bean(name?=?"asyncServiceExecutor")
????public?Executor?asyncServiceExecutor()?{
????????log.warn("start?asyncServiceExecutor");
????????//在這里修改
????????ThreadPoolTaskExecutor?executor?=?new?VisiableThreadPoolTaskExecutor();
????????//配置核心線(xiàn)程數(shù)
????????executor.setCorePoolSize(corePoolSize);
????????//配置最大線(xiàn)程數(shù)
????????executor.setMaxPoolSize(maxPoolSize);
????????//配置隊(duì)列大小
????????executor.setQueueCapacity(queueCapacity);
????????//配置線(xiàn)程池中的線(xiàn)程的名稱(chēng)前綴
????????executor.setThreadNamePrefix(namePrefix);
????????// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
????????// CALLER_RUNS:不在新線(xiàn)程中執(zhí)行任務(wù),而是有調(diào)用者所在的線(xiàn)程來(lái)執(zhí)行
????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
????????//執(zhí)行初始化
????????executor.initialize();
????????return?executor;
????}
}
創(chuàng)建異步線(xiàn)程 業(yè)務(wù)類(lèi)
@Service
@Slf4j
public?class?AsyncServiceImpl?implements?AsyncService?{
@Override
????@Async("asyncServiceExecutor")
????public?void?executeAsync(List?logOutputResults,?LogOutputResultMapper?logOutputResultMapper,?CountDownLatch?countDownLatch) ?{
????????try{
????????????log.warn("start?executeAsync");
????????????//異步線(xiàn)程要做的事情
????????????logOutputResultMapper.addLogOutputResultBatch(logOutputResults);
????????????log.warn("end?executeAsync");
????????}finally?{
????????????countDownLatch.countDown();//?很關(guān)鍵,?無(wú)論上面程序是否異常必須執(zhí)行countDown,否則await無(wú)法釋放
????????}
????}
}
創(chuàng)建多線(xiàn)程批量插入具體業(yè)務(wù)方法
@Override
????public?int?testMultiThread()?{
????????List?logOutputResults?=?getTestData();
????????//測(cè)試每100條數(shù)據(jù)插入開(kāi)一個(gè)線(xiàn)程
????????List>?lists?=?ConvertHandler.splitList(logOutputResults,?100);
????????CountDownLatch?countDownLatch?=?new?CountDownLatch(lists.size());
????????for?(List?listSub:lists)?{
????????????asyncService.executeAsync(listSub,?logOutputResultMapper,countDownLatch);
????????}
????????try?{
????????????countDownLatch.await();?//保證之前的所有的線(xiàn)程都執(zhí)行完成,才會(huì)走下面的;
????????????//?這樣就可以在下面拿到所有線(xiàn)程執(zhí)行完的集合結(jié)果
????????}?catch?(Exception?e)?{
????????????log.error("阻塞異常:"+e.getMessage());
????????}
????????return?logOutputResults.size();
????}
模擬2000003 條數(shù)據(jù)進(jìn)行測(cè)試

多線(xiàn)程 測(cè)試 2000003 ?耗時(shí)如下:耗時(shí)1.67分鐘


本次開(kāi)啟30個(gè)線(xiàn)程,截圖如下:

單線(xiàn)程測(cè)試2000003 ?耗時(shí)如下:耗時(shí)5.75分鐘


檢查多線(xiàn)程入庫(kù)的數(shù)據(jù),檢查是否存在重復(fù)入庫(kù)的問(wèn)題:
根據(jù)id分組,查看是否有id重復(fù)的數(shù)據(jù),通過(guò)sql語(yǔ)句檢查,沒(méi)有發(fā)現(xiàn)重復(fù)入庫(kù)的問(wèn)題

檢查數(shù)據(jù)完整性:通過(guò)sql語(yǔ)句查詢(xún),多線(xiàn)程錄入數(shù)據(jù)完整

測(cè)試結(jié)果
不同線(xiàn)程數(shù)測(cè)試:


總結(jié)
通過(guò)以上測(cè)試案列,同樣是導(dǎo)入2000003 ?條數(shù)據(jù),多線(xiàn)程耗時(shí)1.67分鐘,單線(xiàn)程耗時(shí)5.75分鐘。通過(guò)對(duì)不同線(xiàn)程數(shù)的測(cè)試,發(fā)現(xiàn)不是線(xiàn)程數(shù)越多越好,具體多少合適,網(wǎng)上有一個(gè)不成文的算法:
CPU核心數(shù)量*2 +2 個(gè)線(xiàn)程。
附:測(cè)試電腦配置
來(lái)源:azdebug.blog.csdn.net/article/details/103697108
-End-
最近有一些小伙伴,讓我?guī)兔φ乙恍?面試題?資料,于是我翻遍了收藏的 5T 資料后,匯總整理出來(lái),可以說(shuō)是程序員面試必備!所有資料都整理到網(wǎng)盤(pán)了,歡迎下載!

面試題】即可獲取評(píng)論
圖片
表情
