Flink:Java開(kāi)發(fā)環(huán)境的搭建
一、實(shí)現(xiàn)功能
流式開(kāi)發(fā)Flink開(kāi)發(fā)環(huán)境的搭建。
二、實(shí)現(xiàn)步驟:Java開(kāi)發(fā)環(huán)境
【參考官網(wǎng):https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/projectsetup/java_api_quickstart.html】
1.本地環(huán)境
(1)官網(wǎng)要求 Maven 3.0.4 (or higher) and Java 8.x (2)本地環(huán)境 Maven:3.3.9 Java 1.8
2.創(chuàng)建java項(xiàng)目
(1)進(jìn)入項(xiàng)目目錄,運(yùn)行maven命令
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.7.2 \
-DarchetypeCatalog=local
本地:
E:\Tools\WorkspaceforMyeclipse\flink_project>mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.2 -DarchetypeCatalog=local
備注:添加DarchetypeCatalog參數(shù),使創(chuàng)建項(xiàng)目更加快
(2)輸入GAV對(duì)應(yīng)參數(shù)
Define value for property 'groupId': com.bd.flink
Define value for property 'artifactId': flink-pro
Define value for property 'version' 1.0-SNAPSHOT: : 1.0
Define value for property 'package' com.bd.flink: :
Confirm properties configuration:
groupId: com.bd.flink
artifactId: flink-pro
version: 1.0
package: com.bd.flink
Y: : Y
(3)查看創(chuàng)建結(jié)果
E:\Tools\WorkspaceforMyeclipse\flink_project>tree
卷 本地磁盤(pán) 的文件夾 PATH 列表
卷序列號(hào)為 0003-6793
E:.
└─flink-pro
└─src
└─main
├─java
│ └─com
│ └─bd
│ └─flink
└─resources
3.導(dǎo)入idea
File-》Open-》導(dǎo)入項(xiàng)目pom.xml
查看項(xiàng)目結(jié)構(gòu)
4.項(xiàng)目打包
(1)使用maven命令
進(jìn)入項(xiàng)目根目錄,執(zhí)行
E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro>mvn clean package
打包結(jié)果
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ flink-pro ---
[INFO] Building jar: E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro\target\flink-pro-1.0.jar
[INFO]
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flink-pro ---
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro\target\flink-pro-1.0.jar with E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro\target\flink-pro-1.0-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.005 s
[INFO] Finished at: 2019-11-30T16:24:20+08:00
[INFO] Final Memory: 25M/184M
[INFO] ------------------------------------------------------------------------
(2) 使用idea的maven打包工具 View-》Tools Windows-》Maven Projects-》clean+package
5.java開(kāi)發(fā)WordCount項(xiàng)目實(shí)例
(1)四步
第一步:創(chuàng)建開(kāi)發(fā)環(huán)境(set up the batch execution environment) 第二步:讀取數(shù)據(jù) 第三步:開(kāi)發(fā)業(yè)務(wù)邏輯(transform operations) 第四步:執(zhí)行程序(execute program)
(2)代碼
package com.bd.flink._1130application;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Created by Administrator on 2019/11/30.
* wordcount代碼:java實(shí)現(xiàn)
*/
public class BatchWCJava {
public static void main(String[] args) throws Exception {
String input="data\\hello.txt";
//第一步:創(chuàng)建開(kāi)發(fā)環(huán)境(set up the batch execution environment)
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//第二步:讀取數(shù)據(jù)
DataSource<String> text=env.readTextFile(input);
//第三步:開(kāi)發(fā)業(yè)務(wù)邏輯(transform operations)
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens=value.toLowerCase().split(" ");
for (String token : tokens) {
if(token.length()>0){
collector.collect(new Tuple2<String,Integer>(token,1));
}
}
}
}).groupBy(0).sum(1).print();
// 第四步:執(zhí)行程序(execute program)
// execute(), count(), collect(), 或者print()都是執(zhí)行算子,運(yùn)行即可
// env.execute("Flink Batch Java API Skeleton");
}
}
(3)運(yùn)行結(jié)果
其中flink_project\flink-pro\data\hello.txt內(nèi)容
flink hadoop storm
flume spark streaming
is excellent
執(zhí)行結(jié)果
(is,1)
(streaming,1)
(excellent,1)
(hadoop,1)
(flink,1)
(flume,1)
(storm,1)
(spark,1)
