Flink入門 03.入門案例
1 前置說明
1.1 API
Flink提供了多個層次的API供開發(fā)者使用,越往上抽象程度越高,使用起來越方便;越往下越底層,使用起來難度越大


注意:在Flink1.12時支持流批一體,DataSet API已經(jīng)不推薦使用了,所以課程中除了個別案例使用DataSet外,后續(xù)其他案例都會優(yōu)先使用DataStream流式API,既支持無界數(shù)據(jù)處理/流處理,也支持有界數(shù)據(jù)處理/批處理!當(dāng)然Table&SQL-API會單獨學(xué)習(xí)
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/
https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

1.2 編程模型
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
Flink 應(yīng)用程序結(jié)構(gòu)主要包含三部分,Source/Transformation/Sink,如下圖所示:


2 準(zhǔn)備工程
2.1 pom文件
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2.2 log4j.properties
log4j.rootLogger=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
3 Flink初體驗
3.1 需求
使用Flink實現(xiàn)WordCount
3.2 編碼步驟
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

準(zhǔn)備環(huán)境-env
準(zhǔn)備數(shù)據(jù)-source
處理數(shù)據(jù)-transformation
輸出結(jié)果-sink
觸發(fā)執(zhí)行-execute
其中創(chuàng)建環(huán)境可以使用如下3種方式:
getExecutionEnvironment() //推薦使用
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)

3.3 代碼實現(xiàn)

3.3.1 基于DataSet
package com.song.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 需求:使用Flink完成WordCount-DataSet
* 編碼步驟
* 1.準(zhǔn)備環(huán)境-env
* 2.準(zhǔn)備數(shù)據(jù)-source
* 3.處理數(shù)據(jù)-transformation
* 4.輸出結(jié)果-sink
* 5.觸發(fā)執(zhí)行-execute//如果有print,DataSet不需要調(diào)用execute,DataStream需要調(diào)用execute
*/
public class WorkCountWithDataSet {
public static void main(String[] args) throws Exception {
//老版本的批處理API如下,但已經(jīng)不推薦使用了
//1.準(zhǔn)備環(huán)境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.準(zhǔn)備數(shù)據(jù)-source
DataSet<String> lineDS = env.fromElements("Hello Hadoop", "Hello Spark", "Hello Flink");
//3.處理數(shù)據(jù)-transformation
//3.1 每行數(shù)據(jù)按照空格切分成一個個的單詞組成的集合
DataSet<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
// line就是一行行的數(shù)據(jù)
String[] words = line.split(" ");
for (String word : words) {
// 將切割處理的一個個單詞收集起來并返回
out.collect(word);
}
}
});
// 3.2 對集合中的每個單詞記為1,(word, 1)
DataSet<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// 3.3 對數(shù)據(jù)按照單詞key進行分組
// 0表示按照tuple中索引為0的字段,也就是key(單詞)進行分組
UnsortedGrouping<Tuple2<String, Integer>> groupedDS = wordAndOneDS.groupBy(0);
// 3.4 對各個組內(nèi)數(shù)據(jù)按照數(shù)量(value)進行聚合求sum
// 1表示按照tuple中的索引為1的字段也就是按照數(shù)量進行聚合累加
DataSet<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
// 3.5 排序
DataSet<Tuple2<String, Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
// 4.輸出結(jié)果
result.print();
// 5.觸發(fā)執(zhí)行-execute
// 如果有print,Dataset不需要調(diào)用execute,DataStream需要調(diào)用execute
// env.execute(); // execute(),count(),collect(),print()
}
}
執(zhí)行結(jié)果如下:

3.3.2 基于DataStream
package com.song.flink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Author SongXitang
* Desc
* 需求:使用Flink完成WordCount-DataStream
* 編碼步驟
* 1.準(zhǔn)備環(huán)境-env
* 2.準(zhǔn)備數(shù)據(jù)-source
* 3.處理數(shù)據(jù)-transformation
* 4.輸出結(jié)果-sink
* 5.觸發(fā)執(zhí)行-execute
*/
public class WordCountWithDataStream {
public static void main(String[] args) throws Exception {
// 新版本的流批統(tǒng)一API,既支持流處理也支持批處理
// 1.準(zhǔn)備環(huán)境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 2.準(zhǔn)備數(shù)據(jù)-source
DataStream<String> linesDS = env.fromElements("Hello Hadoop", "Hello Spark", "Hello Flink");
// 3.處理數(shù)據(jù)-transfromation
DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(word);
}
}
});
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
//KeyedStream<Tuple2<String, Integer>, Tuple> keyByDS = wordAndOneDS.keyBy(0);
KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyByDS.sum(1);
// 4.輸出結(jié)果
result.print();
// 5.觸發(fā)執(zhí)行-execute
// DataStream需要調(diào)用execute
env.execute();
}
}
執(zhí)行結(jié)果如下:

3.3.3 Lambda版
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/java_lambdas.html#java-lambda-expressions
package com.song.flink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class WordCountLambda {
public static void main(String[] args) throws Exception {
// 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2.source
DataStream<String> linesDS = env.fromElements("Hello Hadoop", "Hello Spark", "Hello Flink");
// 3.transformation
DataStream<String> wordsDS = linesDS.flatMap((String value, Collector<String> out) ->
Arrays.stream(value.split(" ")).forEach(out::collect)).returns(Types.STRING);
//DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String value) -> Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String value) -> Tuple2.of(value, 1),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
DataStream<Tuple2<String, Integer>> result = keyByDS.sum(1);
// 4.sink
result.print();
// 5.execute
env.execute();
}
}
執(zhí)行結(jié)果如下:

3.3.4 在Yarn上運行
注意
寫入HDFS如果存在權(quán)限問題:
進行如下設(shè)置:
hadoop fs -chmod -R 777 /
并在代碼中添加:
System.setProperty("HADOOP_USER_NAME", "root")
修改代碼
package com.song.flink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class WordCountYarn {
public static void main(String[] args) throws Exception {
// 設(shè)置yarn提交用戶
System.setProperty("HADOOP_USER_NAME", "song");
// 獲取參數(shù)
ParameterTool params = ParameterTool.fromArgs(args);
String output = null;
if (params.has("output")){
output = params.get("output");
}else {
output = "hdfs://nameservice1/data/flink/wordcount/output_" + System.currentTimeMillis();
}
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStream<String> linesDS = env.fromElements("Hello Hadoop", "Hello Spark", "Hello Flink");
// transformation
DataStream<String> wordsDS = linesDS.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" "))
.forEach(out::collect)).returns(Types.STRING);
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String word) -> Tuple2.of(word, 1),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
DataStream<Tuple2<String, Integer>> result = keyByDS.sum(1);
// sink
result.writeAsText(output).setParallelism(1);
// execute
env.execute();
}
}打包

查看打包的jar

上傳
[song@cdh68 jars]$ pwd
/home/song/data/jars
[song@cdh68 jars]$ ll
total 16
-rw-r--r-- 1 song song 15532 Aug 31 16:49 WordCount-1.0-SNAPSHOT.jar
[song@cdh68 jars]$ chmod 755 WordCount-1.0-SNAPSHOT.jar
[song@cdh68 jars]$ ll
total 16
-rwxr-xr-x 1 song song 15532 Aug 31 16:49 WordCount-1.0-SNAPSHOT.jar提交任務(wù)
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html

[song@cdh68 ~]$ flink run-application -t yarn-application \
-Dexecution.runtime-mode=BATCH \
-yjm 4096 \
-ytm 16384 \
-ys 4 \
-c com.song.flink.WordCountYarn \
/home/song/data/jars/WordCount-1.0-SNAPSHOT.jar \
--output hdfs://nameservice1/data/flink/wordcount/output在Web頁面可以觀察到提交的程序


