<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink+hudi 構(gòu)架滄湖一體化解決方案

          共 26567字,需瀏覽 54分鐘

           ·

          2021-09-30 18:55

          本文是對(duì)前文《如何用Flink整合hudi,構(gòu)架滄湖一體化解決方案》的一些問(wèn)題更正,未添加新內(nèi)容,請(qǐng)讀者酌情閱讀。

          更新說(shuō)明:

          1. 修正章節(jié) 《版本搭配》中的使用建議
          2. 修改章節(jié) 《Flink on hudi》 內(nèi)的配置依賴(lài)描述,現(xiàn)在使用Flink cdc 2.0
          3. 對(duì)章節(jié)《添加依賴(lài)》添加配圖
          4. 更新文章排版

          最后,感謝社區(qū)大佬 玉兆 同學(xué)的細(xì)心指導(dǎo)

          在《如何利用 Flink CDC 實(shí)現(xiàn)數(shù)據(jù)增量備份到 Clickhouse》里,我們介紹了如何cdc到ck,今天我們依舊使用前文的案例,來(lái)sink到hudi,那么我們開(kāi)始吧。

          hudi

          簡(jiǎn)介

          Apache Hudi(發(fā)音為“Hoodie”)在DFS的數(shù)據(jù)集上提供以下流原語(yǔ)

          • 插入更新 (如何改變數(shù)據(jù)集?)
          • 增量拉取 (如何獲取變更的數(shù)據(jù)?)

          Hudi維護(hù)在數(shù)據(jù)集上執(zhí)行的所有操作的時(shí)間軸(timeline),以提供數(shù)據(jù)集的即時(shí)視圖。Hudi將數(shù)據(jù)集組織到與Hive表非常相似的基本路徑下的目錄結(jié)構(gòu)中。數(shù)據(jù)集分為多個(gè)分區(qū),文件夾包含該分區(qū)的文件。每個(gè)分區(qū)均由相對(duì)于基本路徑的分區(qū)路徑唯一標(biāo)識(shí)。

          分區(qū)記錄會(huì)被分配到多個(gè)文件。每個(gè)文件都有一個(gè)唯一的文件ID和生成該文件的提交(commit)。如果有更新,則多個(gè)文件共享相同的文件ID,但寫(xiě)入時(shí)的提交(commit)不同。

          存儲(chǔ)類(lèi)型–處理數(shù)據(jù)的存儲(chǔ)方式

          • 寫(xiě)時(shí)復(fù)制

          • 純列式

          • 創(chuàng)建新版本的文件

          • 讀時(shí)合并

          • 近實(shí)時(shí)

          視圖–處理數(shù)據(jù)的讀取方式

          讀取優(yōu)化視圖-輸入格式僅選擇壓縮的列式文件

          • parquet文件查詢(xún)性能

          • 500 GB的延遲時(shí)間約為30分鐘

          • 導(dǎo)入現(xiàn)有的Hive表

          近實(shí)時(shí)視圖

          • 混合、格式化數(shù)據(jù)

          • 約1-5分鐘的延遲

          • 提供近實(shí)時(shí)表

          增量視圖

          • 數(shù)據(jù)集的變更

          • 啟用增量拉取

          Hudi存儲(chǔ)層由三個(gè)不同的部分組成

          元數(shù)據(jù)–它以時(shí)間軸的形式維護(hù)了在數(shù)據(jù)集上執(zhí)行的所有操作的元數(shù)據(jù),該時(shí)間軸允許將數(shù)據(jù)集的即時(shí)視圖存儲(chǔ)在基本路徑的元數(shù)據(jù)目錄下。時(shí)間軸上的操作類(lèi)型包括

          • 提交(commit),一次提交表示將一批記錄原子寫(xiě)入數(shù)據(jù)集中的過(guò)程。單調(diào)遞增的時(shí)間戳,提交表示寫(xiě)操作的開(kāi)始。

          • 清理(clean),清理數(shù)據(jù)集中不再被查詢(xún)中使用的文件的較舊版本。

          • 壓縮(compaction),將行式文件轉(zhuǎn)化為列式文件的動(dòng)作。

          • 索引,將傳入的記錄鍵快速映射到文件(如果已存在記錄鍵)。索引實(shí)現(xiàn)是可插拔的,Bloom過(guò)濾器-由于不依賴(lài)任何外部系統(tǒng),因此它是默認(rèn)配置,索引和數(shù)據(jù)始終保持一致。Apache HBase-對(duì)少量key更高效。在索引標(biāo)記過(guò)程中可能會(huì)節(jié)省幾秒鐘。

          • 數(shù)據(jù),Hudi以?xún)煞N不同的存儲(chǔ)格式存儲(chǔ)數(shù)據(jù)。實(shí)際使用的格式是可插入的,但要求具有以下特征–讀優(yōu)化的列存儲(chǔ)格式(ROFormat),默認(rèn)值為Apache Parquet;寫(xiě)優(yōu)化的基于行的存儲(chǔ)格式(WOFormat),默認(rèn)值為Apache Avro。

          為什么Hudi對(duì)于大規(guī)模和近實(shí)時(shí)應(yīng)用很重要?

          Hudi解決了以下限制

          • HDFS的可伸縮性限制

          • 需要在Hadoop中更快地呈現(xiàn)數(shù)據(jù)

          • 沒(méi)有直接支持對(duì)現(xiàn)有數(shù)據(jù)的更新和刪除

          • 快速的ETL和建模

          • 要檢索所有更新的記錄,無(wú)論這些更新是添加到最近日期分區(qū)的新記錄還是對(duì)舊數(shù)據(jù)的更新,Hudi都允許用戶(hù)使用最后一個(gè)檢查點(diǎn)時(shí)間戳。此過(guò)程不用執(zhí)行掃描整個(gè)源表的查詢(xún)

          Hudi的優(yōu)勢(shì)

          • HDFS中的可伸縮性限制。
          • Hadoop中數(shù)據(jù)的快速呈現(xiàn)
          • 支持對(duì)于現(xiàn)有數(shù)據(jù)的更新和刪除
          • 快速的ETL和建模

          (以上內(nèi)容主要引用于:《Apache Hudi 詳解》)


          新架構(gòu)與湖倉(cāng)一體

          通過(guò)湖倉(cāng)一體、流批一體,準(zhǔn)實(shí)時(shí)場(chǎng)景下做到了:數(shù)據(jù)同源、同計(jì)算引擎、同存儲(chǔ)、同計(jì)算口徑。數(shù)據(jù)的時(shí)效性可以到分鐘級(jí),能很好的滿(mǎn)足業(yè)務(wù)準(zhǔn)實(shí)時(shí)數(shù)倉(cāng)的需求。下面是架構(gòu)圖:


          MySQL 數(shù)據(jù)通過(guò) Flink CDC 進(jìn)入到 Kafka。之所以數(shù)據(jù)先入 Kafka 而不是直接入 Hudi,是為了實(shí)現(xiàn)多個(gè)實(shí)時(shí)任務(wù)復(fù)用 MySQL 過(guò)來(lái)的數(shù)據(jù),避免多個(gè)任務(wù)通過(guò) Flink CDC 接 MySQL 表以及 Binlog,對(duì) MySQL 庫(kù)的性能造成影響。

          通過(guò) CDC 進(jìn)入到 Kafka 的數(shù)據(jù)除了落一份到離線(xiàn)數(shù)據(jù)倉(cāng)庫(kù)的 ODS 層之外,會(huì)同時(shí)按照實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)的鏈路,從 ODS->DWD->DWS->OLAP 數(shù)據(jù)庫(kù),最后供報(bào)表等數(shù)據(jù)服務(wù)使用。實(shí)時(shí)數(shù)倉(cāng)的每一層結(jié)果數(shù)據(jù)會(huì)準(zhǔn)實(shí)時(shí)的落一份到離線(xiàn)數(shù)倉(cāng),通過(guò)這種方式做到程序一次開(kāi)發(fā)、指標(biāo)口徑統(tǒng)一,數(shù)據(jù)統(tǒng)一。

          從架構(gòu)圖上,可以看到有一步數(shù)據(jù)修正 (重跑歷史數(shù)據(jù)) 的動(dòng)作,之所以有這一步是考慮到:有可能存在由于口徑調(diào)整或者前一天的實(shí)時(shí)任務(wù)計(jì)算結(jié)果錯(cuò)誤,導(dǎo)致重跑歷史數(shù)據(jù)的情況。

          而存儲(chǔ)在 Kafka 的數(shù)據(jù)有失效時(shí)間,不會(huì)存太久的歷史數(shù)據(jù),重跑很久的歷史數(shù)據(jù)無(wú)法從 Kafka 中獲取歷史源數(shù)據(jù)。再者,如果把大量的歷史數(shù)據(jù)再一次推到 Kafka,走實(shí)時(shí)計(jì)算的鏈路來(lái)修正歷史數(shù)據(jù),可能會(huì)影響當(dāng)天的實(shí)時(shí)作業(yè)。所以針對(duì)重跑歷史數(shù)據(jù),會(huì)通過(guò)數(shù)據(jù)修正這一步來(lái)處理。

          總體上說(shuō),這個(gè)架構(gòu)屬于 Lambda 和 Kappa 混搭的架構(gòu)。流批一體數(shù)據(jù)倉(cāng)庫(kù)的各個(gè)數(shù)據(jù)鏈路有數(shù)據(jù)質(zhì)量校驗(yàn)的流程。第二天對(duì)前一天的數(shù)據(jù)進(jìn)行對(duì)賬,如果前一天實(shí)時(shí)計(jì)算的數(shù)據(jù)無(wú)異常,則不需要修正數(shù)據(jù),Kappa 架構(gòu)已經(jīng)足夠。

          (本節(jié)內(nèi)容,引用自:《37 手游基于 Flink CDC + Hudi 湖倉(cāng)一體方案實(shí)踐》)


          最佳實(shí)踐

          版本搭配

          版本選擇,這個(gè)問(wèn)題可能會(huì)成為困擾大家的第一個(gè)絆腳石,下面是hudi中文社區(qū)推薦的版本適配:

          flinkhudi
          1.12.20.9.0
          1.13.10.10.0

          建議用 hudi master +flink 1.13 這樣可以和 cdc connector 更好地適配。

          下載hudi

          https://mvnrepository.com/artifact/org.apache.hudi/hudi-flink-bundle

          目前maven中央倉(cāng)庫(kù),最新版本是0.9.0 ,如果需要下載0.10.0版本 , 可以加入社區(qū)群,在共享文件中下載,也可以下載源碼自行編譯。

          執(zhí)行

          如果將 hudi-flink-bundle_2.11-0.10.0.jar 放到了 flink/lib 下,則只需要如下執(zhí)行即可,否則會(huì)出現(xiàn)各種找不到類(lèi)的異常

          bin/sql-client.sh embedded


          Flink on hudi

          新建maven工程,修改pom如下

          <?xml version="1.0" encoding="UTF-8"?>
          <project xmlns="http://maven.apache.org/POM/4.0.0"
                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

              <modelVersion>4.0.0</modelVersion>

              <groupId>org.example</groupId>
              <artifactId>flink_hudi_test</artifactId>
              <version>1.0-SNAPSHOT</version>

              <properties>
                  <maven.compiler.source>8</maven.compiler.source>
                  <maven.compiler.target>8</maven.compiler.target>
                  <flink.version>1.13.1</flink.version>
                  <hudi.version>0.10.0</hudi.version>
                  <hadoop.version>2.10.1</hadoop.version>
              </properties>

              <dependencies>


                  <dependency>
                      <groupId>org.apache.hadoop</groupId>
                      <artifactId>hadoop-client</artifactId>
                      <version>${hadoop.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.hadoop</groupId>
                      <artifactId>hadoop-hdfs</artifactId>
                      <version>${hadoop.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.hadoop</groupId>
                      <artifactId>hadoop-common</artifactId>
                      <version>${hadoop.version}</version>
                  </dependency>


                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-core</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-streaming-java_2.11</artifactId>
                      <version>${flink.version}</version>
                  </dependency>

                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-connector-jdbc_2.11</artifactId>
                      <version>${flink.version}</version>
                  </dependency>

                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-java</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-clients_2.11</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-api-java-bridge_2.11</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-common</artifactId>
                      <version>${flink.version}</version>
                  </dependency>

                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-planner_2.11</artifactId>
                      <version>${flink.version}</version>
                  </dependency>

                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-planner-blink_2.11</artifactId>
                      <version>${flink.version}</version>
                  </dependency>
                  <dependency>
                      <groupId>org.apache.flink</groupId>
                      <artifactId>flink-table-planner-blink_2.11</artifactId>
                      <version>${flink.version}</version>
                      <type>test-jar</type>
                  </dependency>

                  <dependency>
                      <groupId>com.ververica</groupId>
                      <artifactId>flink-connector-mysql-cdc</artifactId>
                      <version>2.0.0</version>
                  </dependency>

                  <dependency>
                      <groupId>org.apache.hudi</groupId>
                      <artifactId>hudi-flink-bundle_2.11</artifactId>
                      <version>${hudi.version}</version>
                      <scope>system</scope>
                      <systemPath>${project.basedir}/libs/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar</systemPath>
                  </dependency>

                  <dependency>
                      <groupId>mysql</groupId>
                      <artifactId>mysql-connector-java</artifactId>
                      <version>5.1.49</version>
                  </dependency>


              </dependencies>
          </project>

          我們通過(guò)構(gòu)建查詢(xún)insert into t2 select replace(uuid(),'-',''),id,name,description,now() from mysql_binlog 將創(chuàng)建的mysql表,插入到hudi里。

          package name.lijiaqi;

          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.table.api.EnvironmentSettings;
          import org.apache.flink.table.api.SqlDialect;
          import org.apache.flink.table.api.TableResult;
          import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

          public class MysqlToHudiExample {
              public static void main(String[] args) throws Exception {
                  EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                          .useBlinkPlanner()
                          .inStreamingMode()
                          .build();
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  env.setParallelism(1);
                  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);

                  tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

                  // 數(shù)據(jù)源表
                  String sourceDDL =
                          "CREATE TABLE mysql_binlog (\n" +
                                  " id INT NOT NULL,\n" +
                                  " name STRING,\n" +
                                  " description STRING\n" +
                                  ") WITH (\n" +
                                  " 'connector' = 'jdbc',\n" +
                                  " 'url' = 'jdbc:mysql://127.0.0.1:3306/test', \n"+
                                  " 'driver' = 'com.mysql.jdbc.Driver', \n"+
                                  " 'username' = 'root',\n" +
                                  " 'password' = 'dafei1288', \n" +
                                  " 'table-name' = 'test_cdc'\n" +
                                  ")";

                  // 輸出目標(biāo)表
                  String sinkDDL =
                          "CREATE TABLE t2(\n" +
                                  "\tuuid VARCHAR(20),\n"+
                                  "\tid INT NOT NULL,\n" +
                                  "\tname VARCHAR(40),\n" +
                                  "\tdescription VARCHAR(40),\n" +
                                  "\tts TIMESTAMP(3)\n"+
          //                        "\t`partition` VARCHAR(20)\n" +
                                  ")\n" +
          //                        "PARTITIONED BY (`partition`)\n" +
                                  "WITH (\n" +
                                  "\t'connector' = 'hudi',\n" +
                                  "\t'path' = 'hdfs://172.19.28.4:9000/hudi_t4/',\n" +
                                  "\t'table.type' = 'MERGE_ON_READ'\n" +
                                  ")" ;
                  // 簡(jiǎn)單的聚合處理
                  String transformSQL =
                          "insert into t2 select replace(uuid(),'-',''),id,name,description,now()  from mysql_binlog";

                  tableEnv.executeSql(sourceDDL);
                  tableEnv.executeSql(sinkDDL);
                  TableResult result = tableEnv.executeSql(transformSQL);
                  result.print();

                  env.execute("mysql-to-hudi");
              }
          }

          查詢(xún)hudi

          package name.lijiaqi;

          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.table.api.EnvironmentSettings;
          import org.apache.flink.table.api.SqlDialect;
          import org.apache.flink.table.api.TableResult;
          import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

          public class ReadHudi {
              public static void main(String[] args) throws Exception {
                  EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                          .useBlinkPlanner()
                          .inStreamingMode()
                          .build();
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  env.setParallelism(1);
                  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);

                  tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

                  String sourceDDL =
                          "CREATE TABLE t2(\n" +
                                  "\tuuid VARCHAR(20),\n"+
                                  "\tid INT NOT NULL,\n" +
                                  "\tname VARCHAR(40),\n" +
                                  "\tdescription VARCHAR(40),\n" +
                                  "\tts TIMESTAMP(3)\n"+
          //                        "\t`partition` VARCHAR(20)\n" +
                                  ")\n" +
          //                        "PARTITIONED BY (`partition`)\n" +
                                  "WITH (\n" +
                                  "\t'connector' = 'hudi',\n" +
                                  "\t'path' = 'hdfs://172.19.28.4:9000/hudi_t4/',\n" +
                                  "\t'table.type' = 'MERGE_ON_READ'\n" +
                                  ")" ;
                  tableEnv.executeSql(sourceDDL);
                  TableResult result2 = tableEnv.executeSql("select * from t2");
                  result2.print();

                  env.execute("read_hudi");
              }
          }

          展示結(jié)果



          Flink CDC 2.0 on Hudi

          上一章節(jié),我們使用代碼形式構(gòu)建實(shí)驗(yàn),在本章節(jié)里,我們直接使用官網(wǎng)下載的flink包來(lái)構(gòu)建實(shí)驗(yàn)環(huán)境。

          添加依賴(lài)

          添加如下依賴(lài)到$FLINK_HOME/lib下

          • hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (修改 Master 分支的 Hudi Flink 版本為 1.13.2 然后構(gòu)建)
          • hadoop-mapreduce-client-core-2.7.3.jar (解決 Hudi ClassNotFoundException)
          • flink-sql-connector-mysql-cdc-2.0.0.jar
          • flink-format-changelog-json-2.0.0.jar
          • flink-sql-connector-kafka_2.11-1.13.2.jar

          注意,在尋找jar的時(shí)候,cdc 2.0 更新過(guò)group id ,不再試 com.alibaba.ververica 而是改成了 com.ververica 


          flink sql cdc on hudi

          創(chuàng)建mysql cdc表

          CREATE  TABLE mysql_users (
           id BIGINT PRIMARY KEY NOT ENFORCED ,
           name STRING,
           birthday TIMESTAMP(3),
           ts TIMESTAMP(3)
          WITH (
           'connector' = 'mysql-cdc',
           'hostname' = 'localhost',
           'port' = '3306',
           'username' = 'root',
           'password' = 'dafei1288',
           'server-time-zone' = 'Asia/Shanghai',
           'database-name' = 'test',
           'table-name' = 'users'   
          );

          創(chuàng)建hudi表

          CREATE TABLE hudi_users5(
           id BIGINT PRIMARY KEY NOT ENFORCED,
              name STRING,
              birthday TIMESTAMP(3),
              ts TIMESTAMP(3),
              `partition` VARCHAR(20)
          ) PARTITIONED BY (`partition`WITH (
              'connector' = 'hudi',
              'table.type' = 'MERGE_ON_READ',
              'path' = 'hdfs://localhost:9009/hudi/hudi_users5'
          );

          修改配置,讓查詢(xún)模式輸出為表,設(shè)置checkpoint

          set execution.result-mode=tableau;

          set execution.checkpointing.interval=10sec;

          進(jìn)行輸入導(dǎo)入

          INSERT INTO hudi_users5(id,name,birthday,ts, partition) SELECT id,name,birthday,ts,DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users;

          查詢(xún)數(shù)據(jù)

          select * from hudi_users5;

          執(zhí)行結(jié)果

          卡執(zhí)行計(jì)劃

          這個(gè)問(wèn)題坑了我好幾天,一度都打算放棄hudi了,表面上很正常,日志也沒(méi)有任何報(bào)錯(cuò),也可以看出來(lái)cdc起作用了,有數(shù)據(jù)寫(xiě)入,但是就是卡在 hoodie_stream_write 上一動(dòng)不動(dòng)了,沒(méi)有數(shù)據(jù)下發(fā)了。感謝社區(qū)大佬 Danny Chan 的提點(diǎn),可能是 checkpoint的問(wèn)題,于是做了設(shè)置

          set execution.checkpointing.interval=10sec;

          終于正常了

          致此,flink + hudi 倉(cāng)湖一體化方案的原型構(gòu)建完成,感謝大家看到這里,如果對(duì)你有點(diǎn)點(diǎn)幫助的話(huà),希望點(diǎn)個(gè)關(guān)注,轉(zhuǎn)發(fā)。您的舉手之勞,會(huì)對(duì)我非常有幫助,非常感謝。

          參考鏈接

          https://blog.csdn.net/weixin_49218925/article/details/115511022

          https://blog.csdn.net/qq_37095882/article/details/103714548

          https://mp.weixin.qq.com/s/xoucbJxzO2Zkq_b2_WDUbA


          瀏覽 61
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产毛片AV一区二区三区牛牛影视 | 爽爽网站 | 欧美轻轻操 | 日本一级一片免费视频 | 大香蕉网青青 |