性能爆表:SpringBoot利用ThreadPoolTaskExecutor批量插入百萬級數(shù)...
程序員的成長之路互聯(lián)網(wǎng)/程序員/技術/資料共享?
關注
閱讀本文大概需要 2.8 分鐘。
來自:azdebug.blog.csdn.net/article/details/103697108
前言
開發(fā)目的: 提高百萬級數(shù)據(jù)插入效率。采取方案: 利用ThreadPoolTaskExecutor多線程批量插入。采用技術:
- springboot2.1.1
- mybatisPlus3.0.6
- swagger2.5.0
- Lombok1.18.4
- postgresql
- ThreadPoolTaskExecutor
具體實現(xiàn)細節(jié)
application-dev.properties添加線程池配置信息
#?異步線程配置
#?配置核心線程數(shù)
async.executor.thread.core_pool_size?=?30
#?配置最大線程數(shù)
async.executor.thread.max_pool_size?=?30
#?配置隊列大小
async.executor.thread.queue_capacity?=?99988
#?配置線程池中的線程的名稱前綴
async.executor.thread.name.prefix?=?async-importDB-
@Configuration
@EnableAsync
@Slf4j
public?class?ExecutorConfig?{
????@Value("${async.executor.thread.core_pool_size}")
????private?int?corePoolSize;
????@Value("${async.executor.thread.max_pool_size}")
????private?int?maxPoolSize;
????@Value("${async.executor.thread.queue_capacity}")
????private?int?queueCapacity;
????@Value("${async.executor.thread.name.prefix}")
????private?String?namePrefix;
?
????@Bean(name?=?"asyncServiceExecutor")
????public?Executor?asyncServiceExecutor()?{
????????log.warn("start?asyncServiceExecutor");
????????//在這里修改
????????ThreadPoolTaskExecutor?executor?=?new?VisiableThreadPoolTaskExecutor();
????????//配置核心線程數(shù)
????????executor.setCorePoolSize(corePoolSize);
????????//配置最大線程數(shù)
????????executor.setMaxPoolSize(maxPoolSize);
????????//配置隊列大小
????????executor.setQueueCapacity(queueCapacity);
????????//配置線程池中的線程的名稱前綴
????????executor.setThreadNamePrefix(namePrefix);
????????// rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務
????????// CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調(diào)用者所在的線程來執(zhí)行
????????executor.setRejectedExecutionHandler(new?ThreadPoolExecutor.CallerRunsPolicy());
????????//執(zhí)行初始化
????????executor.initialize();
????????return?executor;
????}
}
@Service
@Slf4j
public?class?AsyncServiceImpl?implements?AsyncService?{
@Override
????@Async("asyncServiceExecutor")
????public?void?executeAsync(List<LogOutputResult>?logOutputResults,?LogOutputResultMapper?logOutputResultMapper,?CountDownLatch?countDownLatch)?{
????????try{
????????????log.warn("start?executeAsync");
????????????//異步線程要做的事情
????????????logOutputResultMapper.addLogOutputResultBatch(logOutputResults);
????????????log.warn("end?executeAsync");
????????}finally?{
????????????countDownLatch.countDown();//?很關鍵,?無論上面程序是否異常必須執(zhí)行countDown,否則await無法釋放
????????}
????}
}
@Override
public?int?testMultiThread()?{
????List<LogOutputResult>?logOutputResults?=?getTestData();
????//測試每100條數(shù)據(jù)插入開一個線程
????List<List<LogOutputResult>>?lists?=?ConvertHandler.splitList(logOutputResults,?100);
????CountDownLatch?countDownLatch?=?new?CountDownLatch(lists.size());
????for?(List<LogOutputResult>?listSub:lists)?{
????????asyncService.executeAsync(listSub,?logOutputResultMapper,countDownLatch);
????}
????try?{
??????? countDownLatch.await();?//保證之前的所有的線程都執(zhí)行完成,才會走下面的;
????????//?這樣就可以在下面拿到所有線程執(zhí)行完的集合結(jié)果
????}?catch?(Exception?e)?{
????????log.error("阻塞異常:"+e.getMessage());
????}
????return?logOutputResults.size();
}
圖片多線程 測試 2000003 ?耗時如下:耗時1.67分鐘
圖片
圖片本次開啟30個線程,截圖如下:
圖片單線程測試2000003 ?耗時如下:耗時5.75分鐘
圖片
圖片檢查多線程入庫的數(shù)據(jù),檢查是否存在重復入庫的問題:
根據(jù)id分組,查看是否有id重復的數(shù)據(jù),通過sql語句檢查,沒有發(fā)現(xiàn)重復入庫的問題
圖片檢查數(shù)據(jù)完整性:
通過sql語句查詢,多線程錄入數(shù)據(jù)完整
圖片測試結(jié)果
不同線程數(shù)測試:
圖片
圖片總結(jié)
通過以上測試案列,同樣是導入2000003 ?條數(shù)據(jù),多線程耗時1.67分鐘,單線程耗時5.75分鐘。通過對不同線程數(shù)的測試,發(fā)現(xiàn)不是線程數(shù)越多越好,具體多少合適,網(wǎng)上有一個不成文的算法:CPU核心數(shù)量*2 +2 個線程。
附:測試電腦配置
圖片<END>
推薦閱讀:
互聯(lián)網(wǎng)初中高級大廠面試題(9個G)
內(nèi)容包含Java基礎、JavaWeb、MySQL性能優(yōu)化、JVM、鎖、百萬并發(fā)、消息隊列、高性能緩存、反射、Spring全家桶原理、微服務、Zookeeper......等技術棧!
?戳閱讀原文領取!
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??朕已閱?
![]()
