<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 解讀 | Apache Flink 1.16 功能解讀

          共 8329字,需瀏覽 17分鐘

           ·

          2023-03-04 05:38

          摘要:本文整理自阿里云高級(jí)開發(fā)工程師 Apache Flink Committer、Flink 1.16 Release Manager 黃興勃(斷塵),在 FFA 2022 核心技術(shù)專場(chǎng)的分享。本篇內(nèi)容主要分為四個(gè)部分:

          1. 綜述

          2. 持續(xù)領(lǐng)先的流處理

          3. 更穩(wěn)定易用高性能的批處理

          4. 蓬勃發(fā)展的生態(tài)


          Tips:點(diǎn)擊「閱讀原文」查看原文視頻&演講 ppt

          01

          綜述

           

          Flink 1.16 同 Flink 1.15 相比,在 Commits、Issues、Contributors 上,保持了較高的水準(zhǔn)。最大的不同是,我們?cè)?Flink 1.16 中大部分的功能和代碼,主要由中國(guó)開發(fā)者主導(dǎo)完成。

          非常感謝二百四十多位中國(guó) Contributors 對(duì) Flink 1.16 的貢獻(xiàn)。接下來,我們?cè)敿?xì)的看一下 Flink 1.16 在三個(gè)方面的改進(jìn)。

          02

          持續(xù)領(lǐng)先的流處理


          Flink 作為流式計(jì)算引擎的標(biāo)準(zhǔn),在 Flink 1.16 的流處理方面,依然做了許多的改進(jìn)和探索。

          State 是 Flink 中非常重要的概念。有了 State 的存在,才使得 Flink 在流處理上能夠保證端到端的 Exactly Once 的語義。State 經(jīng)過多年的持續(xù)發(fā)展,在 Flink 1.15 提出了 Changelog State Backend,為了解決 RocksDB 由于 TM 需要同時(shí)進(jìn)行 Compaction 和 Upload,導(dǎo)致它的 CPU 和帶寬出現(xiàn)周期性抖動(dòng)的問題。這部分在引入 Changelog 后得到了解決。

          它的基本原理是,當(dāng) TM 做 State 操作時(shí),會(huì)同時(shí)雙寫。一部分?jǐn)?shù)據(jù)會(huì)像原來一樣,寫到原來的本地 State Table 當(dāng)中。與此同時(shí),會(huì)將 State 數(shù)據(jù)以 Append Only 的形式,寫到本地 Changelog 中。Changelog 中的這部分信息,會(huì)周期性的上傳到遠(yuǎn)端 DFS。

          由于這部分 Changelog 信息相對(duì)固定,而且是一種周期定量的形式,所以使得在做 Checkpoint 時(shí),持久化的數(shù)據(jù)變少,加快了作業(yè) Failover 的速度。其次,因?yàn)檫@份數(shù)據(jù)相對(duì)較少,做 Checkpoint 時(shí)的速度會(huì)更快。

          除此之外,它還改善了 Checkpoint 的問題,解決了 CPU 和帶寬抖動(dòng)的問題。因?yàn)樗乃俣雀欤脱訒r(shí),使得端到端的數(shù)據(jù)新鮮度更好,保持在分鐘級(jí)別以內(nèi)。

          這部分功能在 Flink 1.16 實(shí)現(xiàn)了全面生產(chǎn)可用,整個(gè)集群變得更加穩(wěn)定。

          我們?cè)?Flink 1.16 的 RocksDB 上,也做了很多改進(jìn),我們引入了 Rescaling Benchmark。可以觀測(cè) RocksDB 的 Rescaling 耗時(shí),以及耗時(shí)的部分在哪里。

          除此之外,我們也在 RocksDB Rescaling 上做了一些改進(jìn),大幅提升了 RocksDB Rescaling 的性能。如上圖所示,我們能看到在左圖中,對(duì)于一個(gè) WordCount 作業(yè),提升了 3~4 倍的性能。

          除了 RocksDB Rescaling 的改進(jìn),我們對(duì)現(xiàn)有的 State Metrics 和 Monitor 也進(jìn)行了改進(jìn)。我們將原來 RocksDB 的一些 log 信息,重定向到了 Flink log 中。其次,我們將 RocksDB 基于 Database 級(jí)別的 Metric 信息引入到了 Flink 系統(tǒng)中。用戶可以通過 Flink Metric 系統(tǒng),查看 RocksDB 的情況。

          剛才講了我們?cè)?Flink 1.16 中,對(duì) State 和 RocksDB 的改進(jìn)。除此之外,我們?cè)?Checkpoint 上也做了很多改進(jìn)。

          在 Flink 1.11 中,我們引入了 Unaligned Checkpoint,并在 Flink 1.13 實(shí)現(xiàn)了生產(chǎn)可用。自此,很多公司開始在他們的生產(chǎn)環(huán)境中使用 Unaligned Checkpoint。但在使用過程中,也發(fā)現(xiàn)了一些問題。某公司的小伙伴在自己的生產(chǎn)環(huán)境中,使用了 Unaligned Checkpoint 后,發(fā)現(xiàn)了一些問題,并進(jìn)行了改進(jìn),回饋給了社區(qū)。

          我們簡(jiǎn)單看一下 Unaligned Checkpoint 做的一些改進(jìn)。

          第一個(gè)是支持透支 buffer。這個(gè)功能的引入是為了解決我們 Unaligned Checkpoint 由于 Flink 的執(zhí)行流程是基于 Mailbox 的處理流程帶來的可能的問題。我們知道,做 Checkpoint 的流程是在主線程中進(jìn)行的,而當(dāng)主線程在處理 Process Function 邏輯時(shí),Process Function 輸出的數(shù)據(jù)是需要往下游 buffer 輸出需要申請(qǐng)一些 output buffer 的。

          在上述過程當(dāng)中,可能出現(xiàn)在反壓很嚴(yán)重的情況時(shí),由于難以申請(qǐng)到 buffer,導(dǎo)致主線程卡在了 Process Function 邏輯中。在 Flink 1.16 之前,社區(qū)也做了相關(guān)的改進(jìn):當(dāng)主線程需要進(jìn)入 Process Function 邏輯前,需要預(yù)先申請(qǐng) output buffer 里的 buffer。有了這個(gè) buffer 后,它才會(huì)進(jìn)入 Process Function 的邏輯,從而避免卡在里面。

          上述方案解決了部分問題,但是仍沒有解決其他的一類 Case。如果輸入/輸出的數(shù)據(jù)太大,或者 Process Function 是一個(gè) Flatmap Function,需要輸出多條數(shù)據(jù)的情況,一個(gè) buffer 將無法滿足,主線程依然會(huì)卡死在 Process Function 里。

          在 Flink 1.16 中,引入了透支 buffer 的方式。如果 TM 上有額外的一些 buffer 的話,你就可以申請(qǐng)這部分內(nèi)存。然后,通過透支這部分的 buffer,讓主線程不會(huì)卡死在 Process Function 中,就能正常跳出 Process Function。主線程就能接收到 Unaligned Checkpoint Barrier。

          之前,Unaligned Checkpoint 引入了一個(gè) Timeout Aligned 機(jī)制。如果你的 Input Channel 接收到一個(gè) Checkpoint Barrier 時(shí),在指定時(shí)間段內(nèi)沒有實(shí)現(xiàn) Barrier 對(duì)齊的話,Task 將會(huì)切換到 Unaligned Checkpoint。但是如果 Barrier 卡在了 output buffer 里面,下游的 Task 依然是 Aligned Checkpoint。在 Flink 1.16 中,解決了 Barrier 卡在輸出隊(duì)列的情況。

          通過以上這兩個(gè)改進(jìn),Unaligned Checkpoint 得到了更大的提升,穩(wěn)定性也更高。

          我們?cè)?Flink 1.16 中,對(duì)維表部分的增強(qiáng)。


          1. 我們引入了一種緩存機(jī)制,提升了維表的查詢性能。

          2. 我們引入了一種異步查詢機(jī)制,提升了整個(gè)吞吐。

          3. 我們引入一種重試機(jī)制,主要為了解決維表查詢時(shí),遇到的外部系統(tǒng)更新過慢,導(dǎo)致結(jié)果不正確,以及穩(wěn)定性問題。

          通過上述改進(jìn),我們的維表查詢能力得到了極大提升。


          在 Flink 1.16 中,我們支持了更多的 DDL。比如 CREATE FUNCTION USING JAR,支持動(dòng)態(tài)加載用戶的 JAR,方便平臺(tái)用戶管理用戶的 UDF。

          其次,我們支持 CREATE TABLE AS SELECT,讓用戶便捷的通過已有 Table 創(chuàng)建一個(gè)新 Table。

          最后,ANALYZE TABLE 是 Table 的一種新語法。幫助用戶生成更高效的統(tǒng)計(jì)信息。這些統(tǒng)計(jì)信息會(huì)讓優(yōu)化器產(chǎn)生更好的執(zhí)行圖,提升整個(gè)作業(yè)的性能。

          除此之外,我們?cè)诹魃献隽撕芏鄡?yōu)化。這里只列舉了幾個(gè)比較重要的優(yōu)化。

          我們改進(jìn)了流處理系統(tǒng)中,非確定性問題。這部分非確定性的問題主要包含兩部分,一個(gè)是維表查詢上的非確定性問題,另一個(gè)是用戶的 UDF 是非確定性的 UDF。

          1. 我們?cè)?Flink 1.16 提供了一套非常完備的系統(tǒng)性解決方案。首先,我們能夠自動(dòng)檢測(cè) SQL 中,是否有一些非確定性的問題。其次,引擎幫用戶解決了維表查詢的非確定性問題。最后,提供了一些文檔,用戶能根據(jù)這些文檔,更好的發(fā)現(xiàn)和解決自己作業(yè)中非確定性的問題。
          2. 我們終于解決了我們 Protobuf Format 的支持。在 Flink 1.16 中,支持用戶使用 PB 格式數(shù)據(jù)。
          3. 我們引入了 RateLimitingStrategy。之前這部分的 Strategy 是定制化,不可配的。在 Flink 1.16 中,我們把它變成可配置。用戶能夠根據(jù)自己的網(wǎng)絡(luò)堵塞策略,實(shí)現(xiàn)自己的一套配置。

          03

          更穩(wěn)定易用高性能的批處理


          剛才我們聊的都是 Flink 在流處理方面的改進(jìn)。Flink 不僅是一個(gè)流式計(jì)算引擎,而且是一個(gè)流批一體的計(jì)算引擎。所以我們?cè)谂幚矸矫妫沧隽朔浅6嗟墓ぷ鳌link 1.16 的目標(biāo)是,使批處理計(jì)算夠達(dá)到更穩(wěn)定地應(yīng)用和高性能。

          在易用性方面,現(xiàn)有的批生態(tài)中,很多用戶的作業(yè)仍運(yùn)行在 Hive 生態(tài)。在 Flink 1.16 中,我們希望 Hive 的 SQL 能夠以非常低價(jià)的方式,遷移到 Flink 上。Flink 1.16 的 Hive 生態(tài)兼容度達(dá)到了 94%。如果扣除掉一些 acid 操作,Hive 生態(tài)兼容度達(dá)到了 97%。與此同時(shí),配合 Catalog,Hive SQL 在 Flink 引擎上,能夠運(yùn)行聯(lián)邦查詢的能力。

          在 Flink 1.16 中,我們還引入了一個(gè)非常重要的組件 SQL Gateway。通過 SQL Gateway,以及支持的 HiveServer2,使得 Hive 生態(tài)中主流的生態(tài)產(chǎn)品可以非常自然的接入 Flink 生態(tài)。

          在 Flink 1.16 SQL Gateway 中,我們支持多租戶,兼容 HiveServer2 協(xié)議,以及 HiveServer2 帶來的 Hive 生態(tài)。有了 HiveServer2 的配合,整個(gè) Hive 生態(tài)就能非常便捷的遷移到 Flink 生態(tài)上。

          接下來,看看 Flink 引擎本身為批做了哪些優(yōu)化。首先,講一講與調(diào)度相關(guān)的優(yōu)化。Flink 批作業(yè)經(jīng)常會(huì)遇到這樣的問題。由于一些熱點(diǎn)機(jī)器的 IO 繁忙或 CPU 高負(fù)載,導(dǎo)致機(jī)器上運(yùn)行的任務(wù)拖累 Flink 批作業(yè)端到端的執(zhí)行時(shí)間。

          在 Flink 1.16 中,為了解決這個(gè)問題,我們引入了 Speculative Execution,一種推測(cè)執(zhí)行的方式。它的基本原理是,在每個(gè)階段,如果我們檢測(cè)到某一個(gè)機(jī)器,它是一種熱點(diǎn)機(jī)器,它上面運(yùn)行的任務(wù)被稱為慢任務(wù)。所謂慢任務(wù)就是,它的慢任務(wù)執(zhí)行時(shí)間比同一階段其他任務(wù)的執(zhí)行時(shí)間要長(zhǎng)的多,從而把這臺(tái)機(jī)器定義為熱點(diǎn)機(jī)器。

          有了熱點(diǎn)機(jī)器之后,為了降低整個(gè)作業(yè)的執(zhí)行時(shí)間。我們希望把熱點(diǎn)機(jī)器上運(yùn)行的慢任務(wù),通過一個(gè)備份任務(wù),讓它能夠運(yùn)行在其他非熱點(diǎn)機(jī)器上。從而使得整個(gè)任務(wù)的總執(zhí)行時(shí)間縮短。

          接下來,我們簡(jiǎn)單看一下它的具體細(xì)節(jié)。首先,有一個(gè)叫 Slow Task Detector 的組件。這個(gè)組件會(huì)周期性的查看是否有一些慢任務(wù)以及慢任務(wù)對(duì)應(yīng)的熱點(diǎn)機(jī)器。

          收集到了這些信息后,它會(huì)匯報(bào)給 Speculative Scheduler。Scheduler 會(huì)把這些信息匯報(bào)給 Blocklist Handler。然后 Blocklist Handler 會(huì)把這些機(jī)器加黑。

          有了這些加黑機(jī)器之后,加黑機(jī)器上慢任務(wù)的備份任務(wù)會(huì)被調(diào)度到集群當(dāng)中其他非熱點(diǎn)的機(jī)器之上,讓這些慢任務(wù)和備份任務(wù)同時(shí)運(yùn)行。誰先完成就承認(rèn)哪個(gè)任務(wù)的結(jié)果。被承認(rèn)的那個(gè)實(shí)例,它的輸出也能作為下游算子的輸入。沒能完成的任務(wù)將會(huì)被 cancel 掉。

          在 Speculative Execution 中,我們也引入了一些 Rest 和 Web UI 的支持。如上左圖所示,可以觀察到哪些慢任務(wù)被取消,哪些是其備份任務(wù)。通過右圖,可以實(shí)時(shí)看到哪些 TM 機(jī)器被標(biāo)成了 black 機(jī)器。

          關(guān)于 Speculative Execution 的后續(xù)工作,我們當(dāng)前不支持在 Sink 上的 Speculative Execution。其次,我們現(xiàn)有的檢測(cè)策略比較粗糙。我們并沒有有效的考慮到,由于數(shù)據(jù)傾斜導(dǎo)致一些慢任務(wù)的檢測(cè)錯(cuò)誤。把一些本身是正常的機(jī)器,標(biāo)成了熱點(diǎn)機(jī)器。這都是我們?cè)?Flink 1.17 之后,需要做的工作。

          我們?cè)?Shuffle 上做的工作。眾所周知,F(xiàn)link 有兩個(gè) Shuffle 策略,一個(gè)是 Pipelined Shuffle,另一個(gè)是 Blocking Shuffle。

          流式 Pipelined Shuffle 的上下游 Task,能夠同時(shí)調(diào)度運(yùn)行。它的數(shù)據(jù)傳輸是一種空對(duì)空的傳輸,數(shù)據(jù)不落盤,性能更好。但它的缺點(diǎn)是,會(huì)占用更多的資源。因?yàn)樗枰舷掠蔚?Task,同時(shí)調(diào)度。在一些資源比較緊張的情況下,可能導(dǎo)致作業(yè)難以拉起,或者整個(gè)作業(yè)因?yàn)橘Y源索取,造成死鎖。

          在 Blocking Shuffle 方面,由于它在每個(gè)階段,Task 都會(huì)把它的結(jié)果寫到磁盤里。然后,下游的 Task 再通過磁盤,讀取它的數(shù)據(jù)。這樣是好處是,理論上只需要一個(gè) Slot,就能完成整個(gè)批作業(yè)的運(yùn)行。但它的缺點(diǎn)也顯而易見。因?yàn)槊總€(gè)階段都要把結(jié)果數(shù)據(jù)落盤,下一步還需要讀磁盤,所以它的性能較差。

          基于這種考慮,我們?cè)?Flink 1.16 提出了一種新的 Shuffle 策略,即 Hybrid Shuffle。其目是同時(shí)利用上述兩套 Shuffle 的優(yōu)點(diǎn)。在資源充足時(shí),我們利用 Pipelined Shuffle 的性能優(yōu)勢(shì)。在資源不足時(shí),我們利用 Blocking Shuffle 的資源優(yōu)勢(shì)。整套 Shuffle 策略是自適應(yīng)切換的,這是 Hybrid Shuffle 的基本思想。

          Hybrid Shuffle 在數(shù)據(jù)落盤方面,有兩套策略。一套是全落盤,另一套是選擇性落盤。

          選擇性落盤的好處是,我們寫更少的數(shù)據(jù)落盤,整個(gè)性能相對(duì)更高。而全落盤的好處是在 Failover 時(shí)的性能會(huì)更好。根據(jù)用戶場(chǎng)景是哪種不同,選擇到底用 Hybrid Shuffle 的哪種落盤策略。

          在性能方面,F(xiàn)link 1.16 的 Hybrid Shuffle 相比 Blocking Shufle,TPC-DS 執(zhí)行時(shí)間減少了 7.2%。如果加上廣播方面的優(yōu)化,優(yōu)化后的 TPC-DS 執(zhí)行時(shí)間比會(huì)比 Blocking Shuffle 減少 17%。

          關(guān)于 Hybrid Shuffle 的后續(xù)工作,一個(gè)是廣播方面的性能優(yōu)化。另一個(gè)是,與 Flink 1.15 提出的 Adaptive Batch Scheduler 適配,以及 Speculative Execution 的適配。

          如上圖所示,我們?cè)谂幚矸矫孢€有許多其他優(yōu)化。我們簡(jiǎn)單列了一些比較重要的優(yōu)化。

          1. 首先,我們支持了 Dynamic Partition Pruning,即動(dòng)態(tài)分支裁剪。在 1.16 以前的分支裁剪策略都是基于執(zhí)行圖上的靜態(tài)裁剪。但在批處理上,完全可以利用 Runtime 的一些統(tǒng)計(jì)信息,更加高效的進(jìn)行分支裁剪策略。這套實(shí)現(xiàn)讓 Flink 1.16 在 TPC-DS 上有 30%的性能提升。
          2. 我們引入了 Adaptive Hash Join,一種自適應(yīng)策略。我們利用 Runtime 的一些統(tǒng)計(jì)信息,自適應(yīng)的將 Hash Join 回退到 Sort Merge Join,提升 Join 的穩(wěn)定性。
          3. 我們?cè)谂幚碇暗?Blocking Shuffle 上做了一些改進(jìn)。我們引入了更多的壓縮算法(LZO 和 ZSTD)。新的壓縮算法是為了解決在數(shù)據(jù)大小以及 CPU 消耗上的平衡。
          4. 我們優(yōu)化了現(xiàn)有的 Blocking Shuffle 的實(shí)現(xiàn)。通過自適應(yīng)的 buffer 分配,IO 順序讀取,以及 Result Partition 共享,在 TPC-DS 上有 7%的性能提升。
          我們?cè)?Batch SQL 上,支持 Join Hints。Join Hints 讓用戶能手動(dòng)干預(yù) Join 策略。用戶將會(huì)知道生成更加高效的執(zhí)行計(jì)劃,提升整個(gè)作業(yè)的性能。


          04

          蓬勃發(fā)展的生態(tài)


          接下來,介紹一些蓬勃發(fā)展的生態(tài)產(chǎn)品。如上圖所示,PyFlink 是我們生態(tài)中非常重要的產(chǎn)品。PyFlink 經(jīng)過 Flink 1.9 的一路發(fā)展到了 Flink 1.16。

          1. Python API 的覆蓋度達(dá)到了 95%以上。一方面,我們優(yōu)化內(nèi)置 Window 的支持。在 Flink 1.16 以前,我們?cè)?Flink 1.15 支持了自定義 Window。但對(duì)于需要自定義的 Window,用戶的實(shí)現(xiàn)成本依然較高,難以使用。另一方面,我們?cè)?Flink 1.16 引入了 side output、broadcast state 等支持。
          2. PyFlink 支持支持所有的內(nèi)置 Connector&Format。擴(kuò)充了 PyFlink 對(duì)接各種系統(tǒng)的能力。
          3. PyFlink 支持 M1 和 Python 3.9。有了這兩部分能力,降低了用戶的上手成本。與此同時(shí),Deprecate Python 3.6,將在 Flink 1.17 里移除對(duì) Python3.6 的支持,引入 Python3.10 的支持。
          4. PyFlink 搭建了自己的用戶網(wǎng)站。提供了各種執(zhí)行環(huán)境下的安裝教程,可以在線運(yùn)行 QuickStart 例子。這些例子直接掛載在在線的 notebook 網(wǎng)站。與此同時(shí),我們總結(jié)了許多用戶常見的問題答疑,方便新用戶上手。同時(shí)我們整理了 PyFlink 端到端的場(chǎng)景案例。這些部分內(nèi)容本質(zhì)上是為了降低新用戶的門檻。

          在性能方面,我們?cè)?PyFlink 1.15 時(shí),引入了 Thread Mode。Thread Mode 相對(duì)于 Process Mode 最大的不同是,它解決了 Python 進(jìn)程和 Java 進(jìn)程間的通信問題。如果是進(jìn)程間通信,將會(huì)有一些序列化/反序列化的開銷,而 Thread Mode 將不再有這種問題。

          在 Flink 1.16,我們對(duì) Thread Mode 進(jìn)行了完整的支持。相對(duì)于 Process Mode,它的性能會(huì)更好,端到端的延遲會(huì)更低。

          如上圖所示,在 JSON 計(jì)算的場(chǎng)景下,Thread Mode 的端到端延遲只有 Process Mode 的 1/500。它的性能在通用的典型場(chǎng)景,以及數(shù)據(jù)比較常見的場(chǎng)景之下,Thread Mode 的性能基本追平了 Java。

          由此可見,在 Flink 1.16 中,PyFlink 在功能和性能上,已經(jīng)達(dá)到全面生產(chǎn)可用。除此之外,CEP 也是 Flink 生態(tài)中很重要的一部分。我們?cè)?Flink 1.16,對(duì) CEP 的功能進(jìn)行擴(kuò)充。

          1. 我們?cè)?Batch SQL 上,支持了 CEP 能力。
          2. 我們擴(kuò)充了原有只支持首尾時(shí)間間隔的功能,支持了定義事件、事件間隔。

          剛才講的是 Flink 內(nèi)部重要的生態(tài)產(chǎn)品,在 Flink 項(xiàng)目外,我們還有一些重要的生態(tài)項(xiàng)目。比如 Flink Table Store、Flink CDC、Flink ML、Feathub。

          1. Flink Table Store 配合 Changelog State Backend,實(shí)現(xiàn)端到端數(shù)據(jù)的新鮮度達(dá)到分鐘級(jí)別內(nèi)。
          2. 我們?cè)跀?shù)據(jù)的正確性問題上,做了一些改進(jìn)。使得 CDC 流進(jìn)來后做 Join 和聚合會(huì)更加流暢。
          3. 我們?cè)?DataStream 上支持了 Cache 功能,使 Flink ML 在實(shí)現(xiàn)內(nèi)置算子時(shí),能夠得到更高性能。
          4. 前一段剛剛開源的 Feathub 項(xiàng)目。Feathub 依賴 PyFlink 作為它的計(jì)算引擎底座。隨著 PyFlink 性能的提升,F(xiàn)eathub 使用 Python Function 的性能接近 Java Function 的性能,不再有劣勢(shì)。

          更多 Flink 相關(guān)技術(shù)問題,可掃碼加入社區(qū)釘釘交流群~

          往期精選


          ??

          ▼ 關(guān)注「Apache Flink」,獲取更多技術(shù)干貨 

             點(diǎn)擊「閱讀原文」,查看原文視頻&演講 PPT

          瀏覽 36
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黑人把逼操出水的视频了 | 天天操三级 | 国产精品久久 | 人人模人人插 | 蜜桃av成人网站 蜜桃无码久久久久 |