【凱哥】第三屆數(shù)據(jù)庫(kù)大賽創(chuàng)新上云性能挑戰(zhàn)賽-ADB賽道參賽總結(jié)
前言:經(jīng)過(guò)三個(gè)多月的角逐,成績(jī)最終以 34.380 的成績(jī)定格在第四名,大佬太多,膜拜。文章末尾會(huì)附上 git 倉(cāng)庫(kù)地址,如果有疑問(wèn)的地方歡迎到 github 上與我交流。
隊(duì)伍排名
賽題簡(jiǎn)介
硬件資源:8C8G
存儲(chǔ)介質(zhì):英特爾? 傲騰? PMem 非易失性持久化內(nèi)存(APP Direct Mode)
數(shù)據(jù)總量:兩張表,每張表兩列,每列 10 億條數(shù)據(jù),總量為 32 億個(gè) Long
數(shù)據(jù)特征:均勻分布
因?yàn)閿?shù)據(jù)均勻分布,考慮使用從高位截取若干位分桶的方式對(duì)數(shù)據(jù)做索引。
更多賽題信息這里不再贅述,請(qǐng)至 https://tianchi.aliyun.com/competition/entrance/531895/information 查看
整體方案
Load 階段:整體采用純異步(四組線程)的方式,每組線程負(fù)責(zé)消費(fèi)加工上游線程組的產(chǎn)品,并將產(chǎn)出物交付給下游線程組,同時(shí)根據(jù)每組線程特性及負(fù)載調(diào)整該組線程數(shù)量、獨(dú)享或共享,線程組有共享或獨(dú)享能力后可以精細(xì)化調(diào)整線程數(shù)量(因?yàn)槊拷M線程耗時(shí)不均勻,需要精細(xì)調(diào)整線程數(shù)量來(lái)彌補(bǔ)“木桶短板”),解析分桶分離后分桶線程 CPU Cache 更易處理,各組線程間通訊均使用 ConcurrentLinkedQueue + Thread.sleep,分桶數(shù)量為 2048,每桶一個(gè)文件。
查詢階段:采用多線程+桶排序+快速選擇方式。

讀取
讀取采用雙文件并行處理,每個(gè)文件配置 3 個(gè)讀取線程,首先將每個(gè)文件分成三份,第三份略小(因?yàn)榻Y(jié)尾可能不足一個(gè)讀取塊),線程內(nèi) MMAP 讀取,每塊讀取大小為 8MB + 64 字節(jié),多讀取的 64 字節(jié)用于行尾溢出處理,讀取完成后將數(shù)據(jù)塊投遞到解析 Long 線程組。

解析 Long
解析 Long 線程組為共享線程組,數(shù)量為 3,解析 Long 時(shí)先倒序找到逗號(hào)或換行符,然后我們可以得到一個(gè) Long 數(shù)字的起始位置,有了起始位置那么我們可以對(duì)解析 Long 這個(gè)定長(zhǎng)循環(huán)做循環(huán)展開(kāi)。
1private?static?long?read_long_19(long?array)?{??
2????long?v0?=?(unsafe.getByte(array?+?0)?-?'0')?*?1000000000000000000L;??
3????long?v1?=?(unsafe.getByte(array?+?1)?-?'0')?*?100000000000000000L;??
4????long?v2?=?(unsafe.getByte(array?+?2)?-?'0')?*?10000000000000000L;??
5????long?v3?=?(unsafe.getByte(array?+?3)?-?'0')?*?1000000000000000L;??
6????long?v4?=?(unsafe.getByte(array?+?4)?-?'0')?*?100000000000000L;??
7????long?v5?=?(unsafe.getByte(array?+?5)?-?'0')?*?10000000000000L;??
8????long?v6?=?(unsafe.getByte(array?+?6)?-?'0')?*?1000000000000L;??
9????long?v7?=?(unsafe.getByte(array?+?7)?-?'0')?*?100000000000L;??
10????long?v8?=?(unsafe.getByte(array?+?8)?-?'0')?*?10000000000L;??
11????long?v9?=?(unsafe.getByte(array?+?9)?-?'0')?*?1000000000L;??
12????long?v10?=?(unsafe.getByte(array?+?10)?-?'0')?*?100000000L;??
13????long?v11?=?(unsafe.getByte(array?+?11)?-?'0')?*?10000000L;??
14????long?v12?=?(unsafe.getByte(array?+?12)?-?'0')?*?1000000L;??
15????long?v13?=?(unsafe.getByte(array?+?13)?-?'0')?*?100000L;??
16????long?v14?=?(unsafe.getByte(array?+?14)?-?'0')?*?10000L;??
17????long?v15?=?(unsafe.getByte(array?+?15)?-?'0')?*?1000L;??
18????long?v16?=?(unsafe.getByte(array?+?16)?-?'0')?*?100L;??
19????long?v17?=?(unsafe.getByte(array?+?17)?-?'0')?*?10L;??
20????long?v18?=?(unsafe.getByte(array?+?18)?-?'0')?*?1L;??
21????return?v0?+?v1?+?v2?+?v3?+?v4?+?v5?+?v6?+?v7?+?v8?+?v9?+?v10?+?v11?+?v12?+?v13?+?v14?+?v15?+?v16?+?v17?+?v18;??
22}
分桶
分桶線程為獨(dú)享線程,每個(gè)文件配置一個(gè)分桶線程,線程內(nèi)分兩階段對(duì)兩列進(jìn)行分桶,為加快分桶速度,我們從 CPU Cache 緩存命中和幫助 CPU 分支預(yù)測(cè)兩個(gè)方面做優(yōu)化,同時(shí)為了加快落盤(pán)速度,我們可以對(duì)數(shù)據(jù)做些簡(jiǎn)單壓縮。
數(shù)據(jù)壓縮:因?yàn)槲覀兎滞俺^(guò)了 256,所以在存儲(chǔ)數(shù)據(jù)時(shí)可以舍棄掉高位的一個(gè)字節(jié)(服務(wù)器字節(jié)序?yàn)樾《四J?,可以剛好通過(guò)覆蓋的方式舍棄掉高位一個(gè)字節(jié)),僅存儲(chǔ)剩余的 7 個(gè)字節(jié)。
CPU Cache:首先我們申請(qǐng)一塊與 CPU L2 緩存大小匹配的內(nèi)存作為臨時(shí)緩存,經(jīng)查詢 L2 的緩存大小為 1M, 我的分桶數(shù)量為 2048, 那么每桶的數(shù)量應(yīng)為 1M / 8 /2048 = 64, 因?yàn)閷?duì)數(shù)據(jù)做了壓縮,所以實(shí)際申請(qǐng)的內(nèi)存數(shù)量為 7 * 64 * 2048 = 917504 可以被 CPU Cache 覆蓋,分桶時(shí)先將數(shù)據(jù)寫(xiě)到臨時(shí)緩存,等臨時(shí)緩存滿時(shí)再一次性將該桶的數(shù)據(jù)復(fù)制到寫(xiě)緩存,寫(xiě)緩存滿時(shí)再將寫(xiě)緩存投遞到落盤(pán)線程。
CPU 分支預(yù)測(cè):因?yàn)閿?shù)據(jù)大致是均勻的,因此當(dāng)一個(gè)桶的數(shù)據(jù)滿時(shí),其他桶的數(shù)據(jù)也接近滿了,這時(shí)我們可以直接將所有桶的數(shù)據(jù)都復(fù)制到寫(xiě)緩存,這樣基本可以消除判斷臨時(shí)緩存是否已滿帶來(lái)的損耗。

分桶代碼:
1private?void?store_order_values(??
2????????ByteBuf?long_buf,?long?lv_1_buf,?int[]?pos_array)?throws?InterruptedException?{??
3????long?buf_offset?=?long_buf.offset;??
4????long?buf_write_index?=?long_buf.write_index;??
5????int?size?=?(int)?(buf_write_index?-?buf_offset)?>>>?3;??
6????long?value;??
7????for?(int?i?=?0;?i? 8????????value?=?unsafe.getLong(buf_offset?+?((long)?i?<3));??
9????????int?index?=?index(value);??
10????????int?put_pos?=?pos_array[index]++;??
11????????if?(((put_pos)?&?CP_LV_1_COUNT_MASK)?==?CP_LV_1_COUNT_MASK)?{??
12????????????long?put_address?=?lv_1_buf?+?put_pos?*?7L;??
13????????????for?(int?j?=?0;?j?7;?j++)?{??
14????????????????unsafe.putByte(put_address++,?(byte)?value);??
15????????????????value?>>=?8;??
16????????????}??
17????????????do_flush(lv_1_buf,?pos_array,?order_size_array,?order_byte_buf_array,?table.order_key_bucket_array);??
18????????}?else?{??
19????????????unsafe.putLong(lv_1_buf?+?put_pos?*?7L,?value);??
20????????}??
21????}??
22}
分桶線程雖然只給了兩個(gè),但是分桶依舊快于其他線程(四組線程耗時(shí)分別是:23.5+,23+,22+,22.5+),于是在分桶線程等待獲取 Long 數(shù)組時(shí),讓分桶線程分擔(dān)一部分寫(xiě)操作。
1parse_task?=?work_parse_tasks.poll();??
2if?(parse_task?==?null)?{??
3????WriteTask?write_task?=?WriteThread.work_write_tasks.poll();??
4????if?(write_task?==?null)?{??
5????????Thread.sleep(1);??
6????}?else?{??
7????????write_task.write();??
8????????WriteThread.free_write_tasks.offer(write_task);??
9????}??
10????continue;??
11}
落盤(pán)
落盤(pán)線程為共享線程,數(shù)量為 5 個(gè),落盤(pán)時(shí)拉取到寫(xiě) Task 后使用 FileChannel 寫(xiě)入到對(duì)應(yīng)的文件,落盤(pán)結(jié)束后將每個(gè)桶的大小記錄到 Meta 文件。
查詢
查詢階段 load 時(shí)將桶信息從 Meta 文件中恢復(fù)到內(nèi)存。
查詢采用 N*3 個(gè)線程并行查詢,即每次查詢請(qǐng)求附帶兩個(gè)輔助查詢線程配合主查詢線程并行查詢,線程內(nèi)先對(duì)讀取的數(shù)據(jù)進(jìn)行桶排序,桶排序完成后在主線程使用快速選擇定位查詢結(jié)果,線程間通訊采用阻塞隊(duì)列進(jìn)行通信。經(jīng)過(guò)多線程處理和桶排序后,快速選擇起到的提升作用不明顯,和直接 Arrays.sort 區(qū)別不大。
可探索的改進(jìn)
一桶一文件的劣勢(shì)非常明顯,需要初始化大量文件(2048 * 4),需要寫(xiě)大量小文件,查詢時(shí)相比于多桶一文件的 4096 分桶需要多讀取一倍的數(shù)據(jù),改成多桶一個(gè)文件可以充分利用傲騰強(qiáng)大的隨機(jī)讀取能力,將查詢性能提升一倍。
總結(jié)
拆分解析 Long 與分桶線程,以減少分桶線程數(shù)提高緩存命中,同時(shí)留給其他線程組更多線程使線程資源分配更合理。
解析 Long 時(shí)循環(huán)展開(kāi)以減少分支指令帶來(lái)的開(kāi)銷
使用臨時(shí)緩存以增加分桶速度提升緩存命中
緩存復(fù)制時(shí)根據(jù)數(shù)據(jù)均勻特性幫助分支預(yù)測(cè)
分桶線程分擔(dān)寫(xiě)任務(wù)以更大發(fā)揮 PMem 多線程寫(xiě)能力
數(shù)據(jù)壓縮提升寫(xiě)入速度
多線程查詢提升 IO 利用率
源碼地址:https://github.com/wangkaish/ali_race2021_r2_adb

