<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink入門 03.入門案例

          共 22218字,需瀏覽 45分鐘

           ·

          2021-09-04 23:47

          1   前置說明

          1.1   API

          Flink提供了多個層次的API供開發(fā)者使用,越往上抽象程度越高,使用起來越方便;越往下越底層,使用起來難度越大

          注意:在Flink1.12時支持流批一體,DataSet API已經(jīng)不推薦使用了,所以課程中除了個別案例使用DataSet外,后續(xù)其他案例都會優(yōu)先使用DataStream流式API,既支持無界數(shù)據(jù)處理/流處理,也支持有界數(shù)據(jù)處理/批處理!當(dāng)然Table&SQL-API會單獨學(xué)習(xí)

          https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/

          https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC

          https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

          1.2   編程模型

          https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

          Flink 應(yīng)用程序結(jié)構(gòu)主要包含三部分,Source/Transformation/Sink,如下圖所示:

          2   準(zhǔn)備工程

          2.1  pom文件

          <properties>
              <maven.compiler.source>8</maven.compiler.source>
              <maven.compiler.target>8</maven.compiler.target>
              <flink.version>1.13.2</flink.version>
          </properties>

          <dependencies>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>${flink.version}</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java_2.11</artifactId>
                  <version>${flink.version}</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients_2.11</artifactId>
                  <version>${flink.version}</version>
              </dependency>
              <dependency>
                  <groupId>org.projectlombok</groupId>
                  <artifactId>lombok</artifactId>
                  <version>1.18.20</version>
              </dependency>
              <dependency>
                  <groupId>mysql</groupId>
                  <artifactId>mysql-connector-java</artifactId>
                  <version>8.0.25</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-jdbc_2.11</artifactId>
                  <version>${flink.version}</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-kafka_2.11</artifactId>
                  <version>${flink.version}</version>
              </dependency>
          </dependencies>

          2.2   log4j.properties

          log4j.rootLogger=WARN, console
          log4j.appender.console=org.apache.log4j.ConsoleAppender
          log4j.appender.console.layout=org.apache.log4j.PatternLayout
          log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

          3   Flink初體驗

          3.1   需求

          使用Flink實現(xiàn)WordCount

          3.2   編碼步驟

          https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

          1. 準(zhǔn)備環(huán)境-env

          2. 準(zhǔn)備數(shù)據(jù)-source

          3. 處理數(shù)據(jù)-transformation

          4. 輸出結(jié)果-sink

          5. 觸發(fā)執(zhí)行-execute

          其中創(chuàng)建環(huán)境可以使用如下3種方式:

          getExecutionEnvironment() //推薦使用
          createLocalEnvironment()
          createRemoteEnvironment(String host, int port, String... jarFiles)

          3.3   代碼實現(xiàn)

          3.3.1   基于DataSet

          package com.song.flink;

          import org.apache.flink.api.common.functions.FlatMapFunction;
          import org.apache.flink.api.common.functions.MapFunction;
          import org.apache.flink.api.common.operators.Order;
          import org.apache.flink.api.java.DataSet;
          import org.apache.flink.api.java.ExecutionEnvironment;
          import org.apache.flink.api.java.operators.*;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.util.Collector;

          /**
           * 需求:使用Flink完成WordCount-DataSet
           * 編碼步驟
           * 1.準(zhǔn)備環(huán)境-env
           * 2.準(zhǔn)備數(shù)據(jù)-source
           * 3.處理數(shù)據(jù)-transformation
           * 4.輸出結(jié)果-sink
           * 5.觸發(fā)執(zhí)行-execute//如果有print,DataSet不需要調(diào)用execute,DataStream需要調(diào)用execute
           */

          public class WorkCountWithDataSet {
              public static void main(String[] args) throws Exception {
                  //老版本的批處理API如下,但已經(jīng)不推薦使用了
                  //1.準(zhǔn)備環(huán)境-env
                  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                  //2.準(zhǔn)備數(shù)據(jù)-source
                  DataSet<String> lineDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

                  //3.處理數(shù)據(jù)-transformation
                  //3.1 每行數(shù)據(jù)按照空格切分成一個個的單詞組成的集合
                  DataSet<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
                      @Override
                      public void flatMap(String line, Collector<String> out) throws Exception {
                          // line就是一行行的數(shù)據(jù)
                          String[] words = line.split(" ");
                          for (String word : words) {
                              // 將切割處理的一個個單詞收集起來并返回
                              out.collect(word);
                          }
                      }
                  });

                  // 3.2 對集合中的每個單詞記為1,(word, 1)
                  DataSet<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
                      @Override
                      public Tuple2<String, Integer> map(String word) throws Exception {
                          return Tuple2.of(word, 1);
                      }
                  });

                  // 3.3 對數(shù)據(jù)按照單詞key進行分組
                  // 0表示按照tuple中索引為0的字段,也就是key(單詞)進行分組
                  UnsortedGrouping<Tuple2<String, Integer>> groupedDS = wordAndOneDS.groupBy(0);

                  // 3.4 對各個組內(nèi)數(shù)據(jù)按照數(shù)量(value)進行聚合求sum
                  // 1表示按照tuple中的索引為1的字段也就是按照數(shù)量進行聚合累加
                  DataSet<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);

                  // 3.5 排序
                  DataSet<Tuple2<String, Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);

                  // 4.輸出結(jié)果
                  result.print();

                  // 5.觸發(fā)執(zhí)行-execute
                  // 如果有print,Dataset不需要調(diào)用execute,DataStream需要調(diào)用execute
                  // env.execute(); // execute(),count(),collect(),print()
              }
          }

          執(zhí)行結(jié)果如下:

          3.3.2   基于DataStream

          package com.song.flink;

          import org.apache.flink.api.common.RuntimeExecutionMode;
          import org.apache.flink.api.common.functions.FlatMapFunction;
          import org.apache.flink.api.common.functions.MapFunction;
          import org.apache.flink.api.java.tuple.Tuple;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.datastream.KeyedStream;
          import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.util.Collector;

          /**
           * Author SongXitang
           * Desc
           * 需求:使用Flink完成WordCount-DataStream
           * 編碼步驟
           * 1.準(zhǔn)備環(huán)境-env
           * 2.準(zhǔn)備數(shù)據(jù)-source
           * 3.處理數(shù)據(jù)-transformation
           * 4.輸出結(jié)果-sink
           * 5.觸發(fā)執(zhí)行-execute
           */

          public class WordCountWithDataStream {
              public static void main(String[] args) throws Exception {
                  // 新版本的流批統(tǒng)一API,既支持流處理也支持批處理
                  // 1.準(zhǔn)備環(huán)境-env
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
                  //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
                  //env.setRuntimeMode(RuntimeExecutionMode.BATCH);

                  // 2.準(zhǔn)備數(shù)據(jù)-source
                  DataStream<String> linesDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

                  // 3.處理數(shù)據(jù)-transfromation
                  DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
                      @Override
                      public void flatMap(String line, Collector<String> collector) throws Exception {
                          String[] words = line.split(" ");
                          for (String word : words) {
                              collector.collect(word);
                          }
                      }
                  });
                  DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
                      @Override
                      public Tuple2<String, Integer> map(String word) throws Exception {
                          return Tuple2.of(word, 1);
                      }
                  });
                  //KeyedStream<Tuple2<String, Integer>, Tuple> keyByDS = wordAndOneDS.keyBy(0);
                  KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
                  SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyByDS.sum(1);

                  // 4.輸出結(jié)果
                  result.print();

                  // 5.觸發(fā)執(zhí)行-execute
                  // DataStream需要調(diào)用execute
                  env.execute();
              }
          }

          執(zhí)行結(jié)果如下:

          3.3.3   Lambda版

          https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/java_lambdas.html#java-lambda-expressions

          package com.song.flink;

          import org.apache.flink.api.common.RuntimeExecutionMode;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.common.typeinfo.TypeInformation;
          import org.apache.flink.api.common.typeinfo.Types;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.datastream.KeyedStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.util.Collector;

          import java.util.Arrays;

          public class WordCountLambda {
              public static void main(String[] args) throws Exception {
                  // 1.env
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

                  // 2.source
                  DataStream<String> linesDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

                  // 3.transformation
                  DataStream<String> wordsDS = linesDS.flatMap((String value, Collector<String> out) ->
                          Arrays.stream(value.split(" ")).forEach(out::collect)).returns(Types.STRING);
                  //DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String value) -> Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
                  DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String value) -> Tuple2.of(value, 1),
                          TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
                  KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
                  DataStream<Tuple2<String, Integer>> result = keyByDS.sum(1);

                  // 4.sink
                  result.print();

                  // 5.execute
                  env.execute();
              }
          }

          執(zhí)行結(jié)果如下:

          3.3.4   在Yarn上運行

          注意

          寫入HDFS如果存在權(quán)限問題:

          進行如下設(shè)置:

          hadoop fs -chmod -R 777 /

          并在代碼中添加:

          System.setProperty("HADOOP_USER_NAME""root")
          1. 修改代碼

            package com.song.flink;

            import org.apache.flink.api.common.RuntimeExecutionMode;
            import org.apache.flink.api.common.typeinfo.TypeHint;
            import org.apache.flink.api.common.typeinfo.TypeInformation;
            import org.apache.flink.api.common.typeinfo.Types;
            import org.apache.flink.api.java.tuple.Tuple2;
            import org.apache.flink.api.java.utils.ParameterTool;
            import org.apache.flink.streaming.api.datastream.DataStream;
            import org.apache.flink.streaming.api.datastream.KeyedStream;
            import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
            import org.apache.flink.util.Collector;

            import java.util.Arrays;

            public class WordCountYarn {
                public static void main(String[] args) throws Exception {
                    // 設(shè)置yarn提交用戶
                    System.setProperty("HADOOP_USER_NAME""song");

                    // 獲取參數(shù)
                    ParameterTool params = ParameterTool.fromArgs(args);
                    String output = null;
                    if (params.has("output")){
                        output = params.get("output");
                    }else {
                        output = "hdfs://nameservice1/data/flink/wordcount/output_" + System.currentTimeMillis();
                    }

                    // env
                    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

                    // source
                    DataStream<String> linesDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

                    // transformation
                    DataStream<String> wordsDS = linesDS.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" "))
                            .forEach(out::collect)).returns(Types.STRING);
                    DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String word) -> Tuple2.of(word, 1),
                            TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
                    KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
                    DataStream<Tuple2<String, Integer>> result = keyByDS.sum(1);

                    // sink
                    result.writeAsText(output).setParallelism(1);

                    // execute
                    env.execute();
                }
            }
          2. 打包

          3. 查看打包的jar

          4. 上傳

            [song@cdh68 jars]$ pwd
            /home/song/data/jars
            [song@cdh68 jars]$ ll
            total 16
            -rw-r--r-- 1 song song 15532 Aug 31 16:49 WordCount-1.0-SNAPSHOT.jar
            [song@cdh68 jars]$ chmod 755 WordCount-1.0-SNAPSHOT.jar 
            [song@cdh68 jars]$ ll
            total 16
            -rwxr-xr-x 1 song song 15532 Aug 31 16:49 WordCount-1.0-SNAPSHOT.jar
          5. 提交任務(wù)

            https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html

            [song@cdh68 ~]$ flink run-application -t yarn-application \
            -Dexecution.runtime-mode=BATCH \
            -yjm 4096 \
            -ytm 16384 \
            -ys 4 \
            -c com.song.flink.WordCountYarn \
            /home/song/data/jars/WordCount-1.0-SNAPSHOT.jar \
            --output hdfs://nameservice1/data/flink/wordcount/output
          6. 在Web頁面可以觀察到提交的程序


          歡迎關(guān)注微信公眾號:大數(shù)據(jù)AI
          瀏覽 75
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  操逼超碰在线 | 理伦不卡在线 | 操逼内射视频 | 婷色五月天 | 囯产精品久久久久久久久久辛辛 |