<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Apache Doris實(shí)時(shí)數(shù)據(jù)分析保姆級(jí)使用教程

          共 20908字,需瀏覽 42分鐘

           ·

          2022-06-21 09:43

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
          回復(fù)"面試"獲取更多驚喜


          《大數(shù)據(jù)面試提升私教訓(xùn)練營第6期低調(diào)報(bào)名開啟~》

          Doris安裝

          集群部署

          官網(wǎng)下載地址:

          https://doris.apache.org/zh-CN/downloads/downloads.html

          選擇二進(jìn)制下載,源碼下載需要自己編譯。解壓doris文件:

          tar -zxvf apache-doris-1.0.0-incubating-bin.tar.gz -C /opt/module/

          集群規(guī)劃

          FE部署

          修改配置文件vim conf/fe.conf

          meta_dir = /opt/module/doris-meta

          集群中分發(fā)存儲(chǔ)路徑和FE配置文件,啟動(dòng)FE。

          # 創(chuàng)建meta文件夾存儲(chǔ)路徑
          mkdir /opt/module/doris-meta
          # 三臺(tái)機(jī)器都要執(zhí)行
          sh bin/start_fe.sh --daemon

          BE部署

          修改配置文件vim conf/be.conf

          # storage_root_path配置存儲(chǔ)目錄,可以用;來指定多個(gè)目錄,每個(gè)目錄后可以跟逗號(hào),指定大小默認(rèn)GB
          storage_root_path = /opt/module/doris_storage1,10;/opt/module/doris_storage2

          集群中分發(fā)存儲(chǔ)路徑和BE配置文件,啟動(dòng)BE

          # 創(chuàng)建storage_root_path存儲(chǔ)路徑
          mkdir /opt/module/doris_storage1
          mkdir /opt/module/doris_storage2
          # 三臺(tái)機(jī)器都要執(zhí)行
          sh bin/start_be.sh --daemon

          訪問Doris PE節(jié)點(diǎn)

          doris可以使用mysql客戶端訪問,如果未安裝,則需要安裝mysql-client。

          # 第一次訪問不需要密碼,可以自行設(shè)置密碼
          mysql -hdoris1 -P 9030 -uroot
          # 修改密碼
          set password for 'root' = password('root');

          添加BE節(jié)點(diǎn)

          通過mysql客戶端登入后,添加be節(jié)點(diǎn),port為be上的heartbeat_service_port端口,默認(rèn)9050

          mysql> ALTER SYSTEM ADD BACKEND "hadoop102:9050";
          mysql> ALTER SYSTEM ADD BACKEND "hadoop103:9050";
          mysql> ALTER SYSTEM ADD BACKEND "hadoop104:9050";

          通過mysql客戶端,檢測(cè)be節(jié)點(diǎn)狀態(tài),alive必須為true

          mysql> SHOW PROC '/backends';

          BROKER部署

          可選,非必須部署,啟動(dòng)BROKER

          # 三臺(tái)集群都要啟動(dòng)
          sh bin/start_broker.sh --daemon

          使用mysql客戶端訪問pe,添加broker節(jié)點(diǎn)

          mysql> ALTER SYSTEM ADD BROKER broker_name "hadoop102:8000","hadoop103:8000","hadoop104:8000";查看broker狀態(tài)

          mysql> SHOW PROC "/brokers";

          擴(kuò)容縮容

          Doris可以很方便的擴(kuò)容和縮容FE、BE、Broker實(shí)例。通過頁面訪問進(jìn)行監(jiān)控,訪問8030,賬戶為root,密碼默認(rèn)為空不用填寫,除非上述設(shè)置了密碼使用密碼登錄http://hadoop102:8030

          FE 擴(kuò)容和縮容

          FE 節(jié)點(diǎn)的擴(kuò)容和縮容過程,不影響當(dāng)前系統(tǒng)運(yùn)行。

          使用mysql登錄客戶端后,可以使用sql命令查看FE狀態(tài),目前就一臺(tái)FE。

          mysql> SHOW PROC '/frontends';

          增加FE節(jié)點(diǎn),F(xiàn)E分為Leader,Follower和Observer三種角色。默認(rèn)一個(gè)集群只能有一個(gè)Leader,可以有多個(gè)Follower和Observer.其中Leader和Follower組成一個(gè)Paxos選擇組,如果Leader宕機(jī),則剩下的Follower會(huì)成為Leader,保證HA。Observer是負(fù)責(zé)同步Leader數(shù)據(jù)的不參與選舉。如果只部署一個(gè)FE,則FE默認(rèn)就是Leader

          第一個(gè)啟動(dòng)的FE自動(dòng)成為Leader。在此基礎(chǔ)上,可以添加若干Follower和Observer。添加Follower或Observer。使用mysql-client連接到已啟動(dòng)的FE,并執(zhí)行:在doris2部署Follower,doris3上部署Observer

          # 執(zhí)行其中的一個(gè)即可,注解如下
          # follower/observer_host IP節(jié)點(diǎn)位置
          # edit_log_port fe.conf配置文件中可以查詢到

          # ALTER SYSTEM ADD FOLLOWER "follower_host:edit_log_port";
          ALTER SYSTEM ADD FOLLOWER "hadoop103:9010";
          # ALTER SYSTEM ADD OBSERVER "observer_host:edit_log_port";
          ALTER SYSTEM ADD OBSERVER "hadoop104:9010";

          需要重啟配置節(jié)點(diǎn)的FE,并添加如下參數(shù)啟動(dòng)

          # --helper參數(shù)指定leader地址和端口號(hào)
          sh bin/start_fe.sh --helper hadoop102:9010 --daemon
          sh bin/start_fe.sh --helper hadoop102:9010 --daemon

          全部啟動(dòng)完畢后,再通過mysql客戶端,查看FE狀況

          mysql> SHOW PROC '/frontends';

          使用以下命令刪除對(duì)應(yīng)的FE節(jié)點(diǎn)ALTER SYSTEM DROP FOLLOWER[OBSERVER] "fe_host:edit_log_port";刪除Follower FE時(shí),確保最終剩余的Follower(包括 Leader)節(jié)點(diǎn)為奇數(shù)

          ALTER SYSTEM DROP FOLLOWER "hadoop103:9010";
          ALTER SYSTEM DROP OBSERVER "hadoop104:9010"

          BE 擴(kuò)容和縮容

          增加BE節(jié)點(diǎn),就像上面安裝一樣在mysql客戶端,使用ALTER SYSTEM ADD BACKEND

          刪除BE節(jié)點(diǎn),使用ALTER SYSTEM DROP BACKEND "be_host:be_heartbeat_service_port";

          具體文檔請(qǐng)查看官網(wǎng)。

          Doris操作手冊(cè)

          創(chuàng)建用戶

          # 連接doris
          mysql -hhadoop102 -P 9030 -uroot
          # 創(chuàng)建用戶
          mysql> create user 'test' identified by 'test';
          # 退出使用test即可登錄
          mysql> exit;
          mysql -hhadoop102 -P 9030 -utest -ptest

          表操作

          # 創(chuàng)建數(shù)據(jù)庫
          mysql> create database test_db;
          # 賦予test用戶test庫權(quán)限
          mysql> grant all  on test_dn to test;
          # 使用數(shù)據(jù)庫
          mysql> use test_db;

          分區(qū)表

          分區(qū)表分為單分區(qū)和復(fù)合分區(qū)

          單分區(qū)表,建立一張student表。分桶列為id,桶數(shù)為10,副本數(shù)為1

          CREATE TABLE student
          (
          id INT,
          name VARCHAR(50),
          age INT,
          count  BIGINT SUM DEFAULT '0'
          )
          AGGREGATE KEY (id,name,age)
          DISTRIBUTED BY HASH(id) buckets 10
          PROPERTIES("replication_num" = "1");

          復(fù)合分區(qū)表,第一級(jí)稱為Partition,即分區(qū)。用戶指定某一維度列做為分區(qū)列(當(dāng)前只支持整型和時(shí)間類型的列),并指定每個(gè)分區(qū)的取值范圍。第二級(jí)稱為Distribution,即分桶。用戶可以指定一個(gè)或多個(gè)維度列以及桶數(shù)進(jìn)行HASH分布

          #創(chuàng)建student2表,使用dt字段作為分區(qū)列,并且創(chuàng)建3個(gè)分區(qū)發(fā),分別是:
          #P202007 范圍值是是小于2020-08-01的數(shù)據(jù)
          #P202008 范圍值是2020-08-01到2020-08-31的數(shù)據(jù)
          #P202009 范圍值是2020-09-01到2020-09-30的數(shù)據(jù)
          CREATE TABLE student2
          (
          dt DATE,
          id INT,
          name VARCHAR(50),
          age INT,
          count  BIGINT SUM DEFAULT '0'
          )
          AGGREGATE KEY (dt,id,name,age)
          PARTITION BY RANGE(dt)
          (
            PARTITION p202007 VALUES LESS THAN ('2020-08-01'),
            PARTITION p202008 VALUES LESS THAN ('2020-09-01'),
            PARTITION p202009 VALUES LESS THAN ('2020-10-01')
          )
          DISTRIBUTED BY HASH(id) buckets 10
          PROPERTIES("replication_num" = "1");

          復(fù)合分區(qū)表,第一級(jí)稱為Partition,即分區(qū)。用戶指定某一維度列做為分區(qū)列(當(dāng)前只支持整型和時(shí)間類型的列),并指定每個(gè)分區(qū)的取值范圍。第二級(jí)稱為Distribution,即分桶。用戶可以指定一個(gè)或多個(gè)維度列以及桶數(shù)進(jìn)行HASH分布.

          #創(chuàng)建student2表,使用dt字段作為分區(qū)列,并且創(chuàng)建3個(gè)分區(qū)發(fā),分別是:
          #P202007 范圍值是是小于2020-08-01的數(shù)據(jù)
          #P202008 范圍值是2020-08-01到2020-08-31的數(shù)據(jù)
          #P202009 范圍值是2020-09-01到2020-09-30的數(shù)據(jù)
          CREATE TABLE student2
          (
          dt DATE,
          id INT,
          name VARCHAR(50),
          age INT,
          count  BIGINT SUM DEFAULT '0'
          )
          AGGREGATE KEY (dt,id,name,age)
          PARTITION BY RANGE(dt)
          (
            PARTITION p202007 VALUES LESS THAN ('2020-08-01'),
            PARTITION p202008 VALUES LESS THAN ('2020-09-01'),
            PARTITION p202009 VALUES LESS THAN ('2020-10-01')
          )
          DISTRIBUTED BY HASH(id) buckets 10
          PROPERTIES("replication_num" = "1");

          數(shù)據(jù)模型

          AGGREGATE KEY

          AGGREGATE KEY相同時(shí),新舊記錄將會(huì)進(jìn)行聚合操作

          AGGREGATE KEY模型可以提前聚合數(shù)據(jù),適合報(bào)表和多維度業(yè)務(wù)

          UNIQUE KEY

          UNIQUE KEY相同時(shí),新記錄覆蓋舊記錄。目前UNIQUE KEY和AGGREGATE KEY的REPLACE聚合方法一致。適用于有更新需求的業(yè)務(wù)。

          DUPLICATE KEY

          只指定排序列,相同的行并不會(huì)合并。適用于數(shù)據(jù)無需提前聚合的分析業(yè)務(wù)

          數(shù)據(jù)導(dǎo)入

          為適配不同的數(shù)據(jù)導(dǎo)入需求,Doris系統(tǒng)提供5種不同的導(dǎo)入方式。每種導(dǎo)入方式支持不同的數(shù)據(jù)源,存在不同的方式(異步、同步)

          Broker load

          Broker load是一個(gè)導(dǎo)入的異步方式,支持的數(shù)據(jù)源取決于Broker進(jìn)程支持的數(shù)據(jù)源

          基本原理:用戶在提交導(dǎo)入任務(wù)后,F(xiàn)E(Doris系統(tǒng)的元數(shù)據(jù)和調(diào)度節(jié)點(diǎn))會(huì)生成相應(yīng)的PLAN(導(dǎo)入執(zhí)行計(jì)劃,BE會(huì)執(zhí)行導(dǎo)入計(jì)劃將輸入導(dǎo)入Doris中)并根據(jù)BE(Doris系統(tǒng)的計(jì)算和存儲(chǔ)節(jié)點(diǎn))的個(gè)數(shù)和文件的大小,將Plan分給多個(gè)BE執(zhí)行,每個(gè)BE導(dǎo)入一部分?jǐn)?shù)據(jù)。BE在執(zhí)行過程中會(huì)從Broker拉取數(shù)據(jù),在對(duì)數(shù)據(jù)轉(zhuǎn)換之后導(dǎo)入系統(tǒng)。所有BE均完成導(dǎo)入,由FE最終決定是否導(dǎo)入是否成功。

          測(cè)試導(dǎo)入HDFS數(shù)據(jù)到Doris

          編寫測(cè)試文件,上傳到HDFS.

          創(chuàng)建doris表,測(cè)試導(dǎo)入

          CREATE TABLE student
          (
          id INT,
          name VARCHAR(50),
          age INT,
          count  BIGINT SUM DEFAULT '0'
          )
          AGGREGATE KEY (id,name,age)
          DISTRIBUTED BY HASH(id) buckets 10
          PROPERTIES("replication_num" = "1");

          編寫diros導(dǎo)入sql,更多參數(shù)請(qǐng)看官網(wǎng)

          LOAD LABEL test_db.label1
          (
              DATA INFILE("hdfs://bigdata:8020/student")
              INTO TABLE student
              COLUMNS TERMINATED BY ","
              (id,name,age,count)
              SET
              (
                  id=id,
                  name=name,
                  age=age,
                  count=count
              )
          )
          WITH BROKER broker_name
          (
              "username"="root"
          )
          PROPERTIES
          (
              "timeout" = "3600"
          );

          查看doris導(dǎo)入狀態(tài)

          use test_db;
          show load;

          查看數(shù)據(jù)導(dǎo)入是否成功

          Routine Load

          例行導(dǎo)入(Routine Load)功能為用戶提供了一種自動(dòng)從指定數(shù)據(jù)源進(jìn)行數(shù)據(jù)導(dǎo)入的功能

          從Kafka導(dǎo)入數(shù)據(jù)到Doris

          創(chuàng)建kafka主題

          kafka-topics.sh --zookeeper bigdata:2181 --create --replication-factor 1 --partitions 1 --topic test

          啟動(dòng)kafka生產(chǎn)者生產(chǎn)數(shù)據(jù)

          kafka-console-producer.sh --broker-list bigdata:9092 --topic test

          # 數(shù)據(jù)格式
          {"id":"4","name":"czsqhh","age":"18","count":"50"}

          在doris中創(chuàng)建對(duì)應(yīng)表

          CREATE TABLE kafka_student
          (
          id INT,
          name VARCHAR(50),
          age INT,
          count  BIGINT SUM DEFAULT '0'
          )
          AGGREGATE KEY (id,name,age)
          DISTRIBUTED BY HASH(id) buckets 10
          PROPERTIES("replication_num" = "1");

          創(chuàng)建導(dǎo)入作業(yè),desired_concurrent_number指定并行度

          CREATE ROUTINE LOAD test_db.job1 on kafka_student
          PROPERTIES
          (
              "desired_concurrent_number"="1",
              "strict_mode"="false",
              "format"="json"
          )
          FROM KAFKA
          (
              "kafka_broker_list""bigdata:9092",
              "kafka_topic" = "test",
              "property.group.id" = "test"
          );

          查看作業(yè)狀態(tài)

          SHOW ROUTINE LOAD;

          控制作業(yè)

          STOP ROUTINE LOAD For jobxxx :停止作業(yè)
          PAUSE ROUTINE LOAD For jobxxx:暫停作業(yè)
          RESUME ROUTINE LOAD For jobxxx:重啟作業(yè)

          數(shù)據(jù)導(dǎo)出

          Drois導(dǎo)出數(shù)據(jù)到HDFS

          其他參數(shù)詳見官網(wǎng)

          EXPORT TABLE test_db.student
          PARTITION (student)
          TO "hdfs://bigdata:8020/doris/student/" 
          WITH BROKER broker_name
          (
              "username" = "root"
          );

          Doris代碼操作

          Spark

          引入依賴

          <dependencies>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-core_2.12</artifactId>
                  <version>3.0.0</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-yarn_2.12</artifactId>
                  <version>3.0.0</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-sql_2.12</artifactId>
                  <version>3.0.0</version>
              </dependency>

              <dependency>
                  <groupId>mysql</groupId>
                  <artifactId>mysql-connector-java</artifactId>
                  <version>5.1.27</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-hive_2.12</artifactId>
                  <version>3.0.0</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.hive</groupId>
                  <artifactId>hive-exec</artifactId>
                  <version>1.2.1</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-streaming_2.12</artifactId>
                  <version>3.0.0</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.spark</groupId>
                  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
                  <version>3.0.0</version>
              </dependency>
              <dependency>
                  <groupId>com.fasterxml.jackson.core</groupId>
                  <artifactId>jackson-core</artifactId>
                  <version>2.10.1</version>
              </dependency>
              <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
              <dependency>
                  <groupId>com.alibaba</groupId>
                  <artifactId>druid</artifactId>
                  <version>1.1.10</version>
              </dependency>
          </dependencies>

          讀取doris數(shù)據(jù)

          object ReadDoris {
            def main(args: Array[String]): Unit = {
              val sparkConf = new SparkConf().setAppName("testReadDoris").setMaster("local[*]")
              val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

              val df = sparkSession.read.format("jdbc")
                .option("url""jdbc:mysql://bigdata:9030/test_db")
                .option("user""root")
                .option("password""root")
                .option("dbtable""student")
                .load()

              df.show()

              sparkSession.close();
            }

          }

          Flink

          引入依賴

          <dependencies>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>1.14.3</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java_2.12</artifactId>
                  <version>1.14.3</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients_2.12</artifactId>
                  <version>1.14.3</version>
              </dependency>

              <dependency>
                  <groupId>org.projectlombok</groupId>
                  <artifactId>lombok</artifactId>
                  <version>1.18.16</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-kafka_2.12</artifactId>
                  <version>1.14.3</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
                  <version>1.14.3</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.bahir</groupId>
                  <artifactId>flink-connector-redis_2.12</artifactId>
                  <version>1.1-SNAPSHOT</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-api-java-bridge_2.12</artifactId>
                  <version>1.14.3</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-planner_2.12</artifactId>
                  <version>1.14.3</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-common</artifactId>
                  <version>1.14.3</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-csv</artifactId>
                  <version>1.14.3</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-jdbc_2.12</artifactId>
                  <version>1.14.3</version>
              </dependency>
              <dependency>
                  <groupId>mysql</groupId>
                  <artifactId>mysql-connector-java</artifactId>
                  <version>8.0.23</version>
              </dependency>

              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-kafka_2.12</artifactId>
                  <version>1.14.3</version>
              </dependency>

          </dependencies>

          讀取數(shù)據(jù)

          public static void main(String[] args) {
              EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
              TableEnvironment tEnv = TableEnvironment.create(settings);

              String sourceSql = "CREATE TABLE student (\n" +
                      "`id` Integer,\n" +
                      "`name` STRING,\n" +
                      "`age` Integer\n" +
                      ")WITH (\n" +
                      "'connector'='jdbc',\n" +
                      "'url' = 'jdbc:mysql://bigdata:9030/test_db',\n" +
                      "'username'='root',\n" +
                      "'password'='root',\n" +
                      "'table-name'='student'\n" +
                      ")";
              tEnv.executeSql(sourceSql);

              Table table = tEnv.sqlQuery("select * from student");
              table.execute().print();
          }

          如果這個(gè)文章對(duì)你有幫助,不要忘記 「在看」 「點(diǎn)贊」 「收藏」 三連啊喂!

          2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇)
          互聯(lián)網(wǎng)最壞的時(shí)代可能真的來了
          我在B站讀大學(xué),大數(shù)據(jù)專業(yè)
          我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么?
          193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下
          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn)
          我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?
          在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!
          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
          數(shù)據(jù)治理方法論和實(shí)踐小百科全書
          標(biāo)簽體系下的用戶畫像建設(shè)小指南
          4萬字長文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
          【面試&個(gè)人成長】2021年過半,社招和校招的經(jīng)驗(yàn)之談
          大數(shù)據(jù)方向另一個(gè)十年開啟 |《硬剛系列》第一版完結(jié)
          我寫過的關(guān)于成長/面試/職場(chǎng)進(jìn)階的文章
          當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
          瀏覽 126
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  粉嫩小泬无遮挡BBBB | www..91av | 毛片中文字幕 | 婷婷五月丁香激情免费视频 | 国产亚洲视频免费观看 |