<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          flink sql 知其所以然(十四):維表 join 的性能優(yōu)化之路(上)附源碼

          共 7284字,需瀏覽 15分鐘

           ·

          2021-12-24 21:51

          看了那么多的技術(shù)文,你能明白作者想讓你在讀完文章后學(xué)到什么嗎?

          大數(shù)據(jù)羊說的文章會(huì)讓你明白

          1. 博主會(huì)闡明博主期望本文能給小伙伴們帶來什么幫助,讓小伙伴萌能直觀明白博主的心思

          2. 博主會(huì)以實(shí)際的應(yīng)用場(chǎng)景和案例入手,不只是知識(shí)點(diǎn)的簡(jiǎn)單堆砌

          3. 博主會(huì)把重要的知識(shí)點(diǎn)的原理進(jìn)行剖析,讓小伙伴萌做到深入淺出

          1.序篇

          源碼公眾號(hào)后臺(tái)回復(fù)1.13.2 sql lookup join獲取。

          廢話不多說,咱們先直接上本文的目錄和結(jié)論,小伙伴可以先看結(jié)論快速了解博主期望本文能給小伙伴們帶來什么幫助:

          1. 背景及應(yīng)用場(chǎng)景介紹:博主期望你能了解到,flink sql 提供了輕松訪問外部存儲(chǔ)的 lookup join(與上節(jié)不同,上節(jié)說的是流與流的 join)。lookup join 可以簡(jiǎn)單理解為使用 flatmap 訪問外部存儲(chǔ)數(shù)據(jù)然后將維度字段拼接到當(dāng)前這條數(shù)據(jù)上面
          2. 來一個(gè)實(shí)戰(zhàn)案例:博主以曝光用戶日志流關(guān)聯(lián)用戶畫像(年齡、性別)維表為例介紹 lookup join 應(yīng)該達(dá)到的關(guān)聯(lián)的預(yù)期效果。
          3. flink sql lookup join 的解決方案以及原理的介紹:主要介紹 lookup join 的在上述實(shí)戰(zhàn)案例的 sql 寫法,博主期望你能了解到,lookup join 是基于處理時(shí)間的,并且 lookup join 經(jīng)常會(huì)由于訪問外部存儲(chǔ)的 qps 過高而導(dǎo)致背壓,產(chǎn)出延遲等性能問題。我們可以借鑒在 DataStream api 中的維表 join 優(yōu)化思路在 flink sql 使用 local cache異步訪問維表,批量訪問維表三種方式去解決性能問題。
          4. 總結(jié)及展望:官方并沒有提供 批量訪問維表 的能力,因此博主自己實(shí)現(xiàn)了一套,具體使用方式和原理實(shí)現(xiàn)敬請(qǐng)期待下篇文章。

          2.背景及應(yīng)用場(chǎng)景介紹

          維表作為 sql 任務(wù)中一種常見表的類型,其本質(zhì)就是關(guān)聯(lián)表數(shù)據(jù)的額外數(shù)據(jù)屬性,通常在 join 語句中進(jìn)行使用。比如源數(shù)據(jù)有人的 id,你現(xiàn)在想要得到人的性別、年齡,那么可以通過用戶 id 去關(guān)聯(lián)人的性別、年齡,就可以得到更全的數(shù)據(jù)。

          維表 join 在離線數(shù)倉中是最常見的一種數(shù)據(jù)處理方式了,在實(shí)時(shí)數(shù)倉的場(chǎng)景中,flink sql 目前也支持了維表的 join,即 lookup join,生產(chǎn)環(huán)境可以用 mysql,redis,hbase 來作為高速維表存儲(chǔ)引擎。

          Notes:

          在實(shí)時(shí)數(shù)倉中,常用實(shí)時(shí)維表有兩種更新頻率

          1. 實(shí)時(shí)的更新:維度信息是實(shí)時(shí)新建的,實(shí)時(shí)寫入到高速存儲(chǔ)引擎中。然后其他實(shí)時(shí)任務(wù)在做處理時(shí)實(shí)時(shí)的關(guān)聯(lián)這些維度信息。
          2. 周期性的更新:對(duì)于一些緩慢變化維度,比如年齡、性別的用戶畫像等,幾萬年都不變化一次的東西??,實(shí)時(shí)維表的更新可以是小時(shí)級(jí)別,天級(jí)別的。

          3.來一個(gè)實(shí)戰(zhàn)案例

          來看看在具體場(chǎng)景下,對(duì)應(yīng)輸入值的輸出值應(yīng)該長(zhǎng)啥樣。

          需求指標(biāo):使用曝光用戶日志流(show_log)關(guān)聯(lián)用戶畫像維表(user_profile)關(guān)聯(lián)到用戶的維度之后,提供給下游計(jì)算分性別,年齡段的曝光用戶數(shù)使用。此處我們只關(guān)心關(guān)聯(lián)維表這一部分的輸入輸出數(shù)據(jù)。

          來一波輸入數(shù)據(jù):

          曝光用戶日志流(show_log)數(shù)據(jù)(數(shù)據(jù)存儲(chǔ)在 kafka 中):

          log_idtimestampuser_id
          12021-11-01 00:01:03a
          22021-11-01 00:03:00b
          32021-11-01 00:05:00c
          42021-11-01 00:06:00b
          52021-11-01 00:07:00c

          用戶畫像維表(user_profile)數(shù)據(jù)(數(shù)據(jù)存儲(chǔ)在 redis 中):

          user_id(主鍵)agesex
          a12-18
          b18-24
          c18-24

          注意:redis 中的數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)是按照 key,value 去存儲(chǔ)的。其中 key 為 user_id,value 為 age,sex 的 json。如下圖所示:

          user_profile redis

          預(yù)期輸出數(shù)據(jù)如下:

          log_idtimestampuser_idagesex
          12021-11-01 00:01:03a12-18
          22021-11-01 00:03:00b18-24
          32021-11-01 00:05:00c18-24
          42021-11-01 00:06:00b18-24
          52021-11-01 00:07:00c18-24

          flink sql lookup join 登場(chǎng)。下面是官網(wǎng)的鏈接。

          https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join

          4.flink sql lookup join

          4.1.lookup join 定義

          以上述案例來說,lookup join 其實(shí)簡(jiǎn)單理解來,就是每來一條數(shù)據(jù)去 redis 里面摟一次數(shù)據(jù)。然后把關(guān)聯(lián)到的維度數(shù)據(jù)給拼接到當(dāng)前數(shù)據(jù)中。

          熟悉 DataStream api 的小伙伴萌,簡(jiǎn)單來理解,就是 lookup join 的算子就是 DataStream api 中的 flatmap 算子中處理每一條來的數(shù)據(jù),針對(duì)每一條數(shù)據(jù)去訪問用戶畫像的 redis。(實(shí)際上,flink sql api 中也確實(shí)是這樣實(shí)現(xiàn)的!sql 生成的 lookup join 代碼就是繼承了 flatmap)

          4.2.上述案例解決方案

          來看看上述案例的 flink sql lookup join sql 怎么寫:

          CREATE?TABLE?show_log?(
          ????log_id?BIGINT,
          ????`timestamp`?as?cast(CURRENT_TIMESTAMP?as?timestamp(3)),
          ????user_id?STRING,
          ????proctime?AS?PROCTIME()
          )
          WITH?(
          ??'connector'?=?'datagen',
          ??'rows-per-second'?=?'10',
          ??'fields.user_id.length'?=?'1',
          ??'fields.log_id.min'?=?'1',
          ??'fields.log_id.max'?=?'10'
          );

          CREATE?TABLE?user_profile?(
          ????user_id?STRING,
          ????age?STRING,
          ????sex?STRING
          ????)?WITH?(
          ??'connector'?=?'redis',
          ??'hostname'?=?'127.0.0.1',
          ??'port'?=?'6379',
          ??'format'?=?'json',
          ??'lookup.cache.max-rows'?=?'500',
          ??'lookup.cache.ttl'?=?'3600',
          ??'lookup.max-retries'?=?'1'
          );

          CREATE?TABLE?sink_table?(
          ????log_id?BIGINT,
          ????`timestamp`?TIMESTAMP(3),
          ????user_id?STRING,
          ????proctime?TIMESTAMP(3),
          ????age?STRING,
          ????sex?STRING
          )?WITH?(
          ??'connector'?=?'print'
          );

          --?lookup?join?的?query?邏輯
          INSERT?INTO?sink_table
          SELECT?
          ????s.log_id?as?log_id
          ????,?s.`timestamp`?as?`timestamp`
          ????,?s.user_id?as?user_id
          ????,?s.proctime?as?proctime
          ????,?u.sex?as?sex
          ????,?u.age?as?age
          FROM?show_log?AS?s
          LEFT?JOIN?user_profile?FOR?SYSTEM_TIME?AS?OF?s.proctime?AS?u
          ON?s.user_id?=?u.user_id

          這里使用了 for SYSTEM_TIME as of 時(shí)態(tài)表的語法來作為維表關(guān)聯(lián)的標(biāo)識(shí)語法。

          Notes:

          實(shí)時(shí)的 lookup 維表關(guān)聯(lián)能使用處理時(shí)間去做關(guān)聯(lián)。

          運(yùn)行結(jié)果如下:

          log_idtimestampuser_idagesex
          12021-11-01 00:01:03a12-18
          22021-11-01 00:03:00b18-24
          32021-11-01 00:05:00c18-24
          42021-11-01 00:06:00b18-24
          52021-11-01 00:07:00c18-24

          flink web ui 算子圖如下:

          flink web ui

          但是?。?!但是?。?!但是?。?!

          flink 官方并沒有提供 redis 的維表 connector 實(shí)現(xiàn)。

          沒錯(cuò),博主自己實(shí)現(xiàn)了一套。關(guān)于 redis 維表的 connector 實(shí)現(xiàn),直接參考下面的文章。都是可以從 github 上找到源碼拿來用的!

          flink sql 知其所以然(二)| 自定義 redis 數(shù)據(jù)維表(附源碼)


          4.3.關(guān)于維表使用的一些注意事項(xiàng)

          1. 同一條數(shù)據(jù)關(guān)聯(lián)到的維度數(shù)據(jù)可能不同:實(shí)時(shí)數(shù)倉中常用的實(shí)時(shí)維表都是在不斷的變化中的,當(dāng)前流表數(shù)據(jù)關(guān)聯(lián)完維表數(shù)據(jù)后,如果同一個(gè) key 的維表的數(shù)據(jù)發(fā)生了變化,已關(guān)聯(lián)到的維表的結(jié)果數(shù)據(jù)不會(huì)再同步更新。舉個(gè)例子,維表中 user_id 為 1 的數(shù)據(jù)在 08:00 時(shí) age 由 12-18 變?yōu)榱?18-24,那么當(dāng)我們的任務(wù)在 08:01 failover 之后從 07:59 開始回溯數(shù)據(jù)時(shí),原本應(yīng)該關(guān)聯(lián)到 12-18 的數(shù)據(jù)會(huì)關(guān)聯(lián)到 18-24 的 age 數(shù)據(jù)。這是有可能會(huì)影響數(shù)據(jù)質(zhì)量的。所以小伙伴萌在評(píng)估你們的實(shí)時(shí)任務(wù)時(shí)要考慮到這一點(diǎn)。
          2. 會(huì)發(fā)生實(shí)時(shí)的新建及更新的維表博主建議小伙伴萌應(yīng)該建立起數(shù)據(jù)延遲的監(jiān)控機(jī)制,防止出現(xiàn)流表數(shù)據(jù)先于維表數(shù)據(jù)到達(dá),導(dǎo)致關(guān)聯(lián)不到維表數(shù)據(jù)

          4.4.再說說維表常見的性能問題及優(yōu)化思路

          所有的維表性能問題都可以總結(jié)為:高 qps 下訪問維表存儲(chǔ)引擎產(chǎn)生的任務(wù)背壓,數(shù)據(jù)產(chǎn)出延遲問題。

          舉個(gè)例子:

          • 在沒有使用維表的情況下:一條數(shù)據(jù)從輸入 flink 任務(wù)到輸出 flink 任務(wù)的時(shí)延假如為 0.1 ms,那么并行度為 1 的任務(wù)的吞吐可以達(dá)到 1 query / 0.1 ms = 1w qps
          • 在使用維表之后:每條數(shù)據(jù)訪問維表的外部存儲(chǔ)的時(shí)長(zhǎng)為 2 ms,那么一條數(shù)據(jù)從輸入 flink 任務(wù)到輸出 flink 任務(wù)的時(shí)延就會(huì)變成 2.1 ms,那么同樣并行度為 1 的任務(wù)的吞吐只能達(dá)到 1 query / 2.1 ms = 476 qps。兩者的吞吐量相差 21 倍。

          這就是為什么維表 join 的算子會(huì)產(chǎn)生背壓,任務(wù)產(chǎn)出會(huì)延遲。

          那么當(dāng)然,解決方案也是有很多的。拋開 flink sql 想一下,如果我們使用 DataStream api,甚至是在做一個(gè)后端應(yīng)用,需要訪問外部存儲(chǔ)時(shí),常用的優(yōu)化方案有哪些?這里列舉一下:

          1. 按照 redis 維表的 key 分桶 + local cache:通過按照 key 分桶的方式,讓大多數(shù)據(jù)的維表關(guān)聯(lián)的數(shù)據(jù)訪問走之前訪問過得 local cache 即可。這樣就可以把訪問外部存儲(chǔ) 2.1 ms 處理一個(gè) query 變?yōu)樵L問內(nèi)存的 0.1 ms 處理一個(gè) query 的時(shí)長(zhǎng)。
          2. 異步訪問外存:DataStream api 有異步算子,可以利用線程池去同時(shí)多次請(qǐng)求維表外部存儲(chǔ)。這樣就可以把 2.1 ms 處理 1 個(gè) query 變?yōu)?2.1 ms 處理 10 個(gè) query。吞吐可變優(yōu)化到 10 / 2.1 ms = 4761 qps。
          3. 批量訪問外存:除了異步訪問之外,我們還可以批量訪問外部存儲(chǔ)。舉一個(gè)例子:在訪問 redis 維表的 1 query 占用 2.1 ms 時(shí)長(zhǎng)中,其中可能有 2 ms 都是在網(wǎng)絡(luò)請(qǐng)求上面的耗時(shí) ,其中只有 0.1 ms 是 redis server 處理請(qǐng)求的時(shí)長(zhǎng)。那么我們就可以使用 redis 提供的 pipeline 能力,在客戶端(也就是 flink 任務(wù) lookup join 算子中),攢一批數(shù)據(jù),使用 pipeline 去同時(shí)訪問 redis sever。這樣就可以把 2.1 ms 處理 1 個(gè) query 變?yōu)?7ms(2ms + 50 * 0.1ms) 處理 50 個(gè) query。吞吐可變?yōu)?50 query / 7 ms = 7143 qps。博主這里測(cè)試了下使用 redis pipeline 和未使用的時(shí)長(zhǎng)消耗對(duì)比。如下圖所示。

          redis pipeline

          博主認(rèn)為上述優(yōu)化效果中,最好用的是 1 + 3,2 相比 3 還是一條一條發(fā)請(qǐng)求,性能會(huì)差一些。

          既然 DataStream 可以這樣做,flink sql 必須必的也可以借鑒上面的這些優(yōu)化方案。具體怎么操作呢?看下文騷操作

          4.5.lookup join 的具體性能優(yōu)化方案

          1. 按照 redis 維表的 key 分桶 + local cache:sql 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf 中做訪問 redis 處理,并且 udaf 產(chǎn)出的結(jié)果只能是一條,所以這種實(shí)現(xiàn)起來非常復(fù)雜。我們選擇不做 keyby 分桶。但是我們可以直接使用 local cache 去做本地緩存,雖然【直接緩存】的效果比【先按照 key 分桶再做緩存】的效果差,但是也能一定程度上減少訪問 redis 壓力。在博主實(shí)現(xiàn)的 redis connector 中,內(nèi)置了 local cache 的實(shí)現(xiàn),小伙伴萌可以參考下面這部篇文章進(jìn)行配置。
          2. 異步訪問外存:目前博主實(shí)現(xiàn)的 redis connector 不支持異步訪問,但是官方實(shí)現(xiàn)的 hbase connector 支持這個(gè)功能,參考下面鏈接文章的,點(diǎn)開之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/
          3. 批量訪問外存:這玩意官方必然沒有實(shí)現(xiàn)啊,但是,但是,但是,經(jīng)過博主周末兩天的瘋狂 debug,改了改源碼,搞定了基于 redis 的批量訪問外存優(yōu)化的功能。

          4.6.基于 redis connector 的批量訪問機(jī)制優(yōu)化

          先描述一下大概是個(gè)什么東西,具體怎么用。

          你只需要在 StreamTableEnvironment 中的 table config 配置上 is.dim.batch.modetrue,sql 不用做任何改動(dòng)的情況下,flink lookup join 算子會(huì)自動(dòng)優(yōu)化,優(yōu)化效果如下:

          lookup join 算子的每個(gè) task 上,每攢夠 30 條數(shù)據(jù) or 每隔五秒(處理時(shí)間) 去觸發(fā)一次批量訪問 redis 的請(qǐng)求,使用的是 jedis client 的 pipeline 功能訪問 redis server。實(shí)測(cè)性能有很大提升。

          關(guān)于這個(gè)批量訪問機(jī)制的優(yōu)化介紹和使用方式介紹,小伙伴們先別急,下篇文章會(huì)詳細(xì)介紹到。

          5.總結(jié)與展望

          源碼公眾號(hào)后臺(tái)回復(fù)1.13.2 sql lookup join獲取。

          本文主要介紹了 flink sql lookup join 的使用方式,并介紹了一些經(jīng)常出現(xiàn)的性能問題以及優(yōu)化思路,總結(jié)如下:

          1. 背景及應(yīng)用場(chǎng)景介紹:博主期望你能了解到,flink sql 提供了輕松訪問外部存儲(chǔ)的 lookup join(與上節(jié)不同,上節(jié)說的是流與流的 join)。lookup join 可以簡(jiǎn)單理解為使用 flatmap 訪問外部存儲(chǔ)數(shù)據(jù)然后將維度字段拼接到當(dāng)前這條數(shù)據(jù)上面
          2. 來一個(gè)實(shí)戰(zhàn)案例:博主以曝光用戶日志流關(guān)聯(lián)用戶畫像(年齡、性別)維表為例介紹 lookup join 應(yīng)該達(dá)到的關(guān)聯(lián)的預(yù)期效果。
          3. flink sql lookup join 的解決方案以及原理的介紹:主要介紹 lookup join 的在上述實(shí)戰(zhàn)案例的 sql 寫法,博主期望你能了解到,lookup join 是基于處理時(shí)間的,并且 lookup join 經(jīng)常會(huì)由于訪問外部存儲(chǔ)的 qps 過高而導(dǎo)致背壓,產(chǎn)出延遲等性能問題。我們可以借鑒在 DataStream api 中的維表 join 優(yōu)化思路在 flink sql 使用 local cache,異步訪問維表批量訪問維表三種方式去解決性能問題。
          4. 總結(jié)及展望:官方并沒有提供 批量訪問維表 的能力,因此博主自己實(shí)現(xiàn)了一套,具體使用方式和原理實(shí)現(xiàn)敬請(qǐng)期待下篇文章。

          往期推薦

          flink sql 知其所以然(十三):流 join 很難嘛???(下)

          flink sql 知其所以然(十二):流 join 很難嘛???(上)

          flink sql 知其所以然(十一):去重不僅僅有 count distinct 還有強(qiáng)大的 deduplication

          flink sql 知其所以然(十):大家都用 cumulate window 計(jì)算累計(jì)指標(biāo)啦

          flink sql 知其所以然(九):window tvf tumble window 的奇思妙解

          當(dāng)我們?cè)谧隽髋惑w時(shí),我們?cè)谧鍪裁矗?/p>


          --END--

          非常歡迎大家加我個(gè)人微信,有關(guān)大數(shù)據(jù)的問題我們一起討論

          長(zhǎng)按上方掃碼二維碼,加我微信



          動(dòng)動(dòng)小手,讓更多需要的人看到~

          瀏覽 102
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  天天干夜夜操www | 欧美性猛交XXX 乱大交3 欧美一級黃色A片免費看野花 | 操B视频动漫免费网站 | 亚洲高清人妻 | 日韩黄色操少妇视频 |