Flink SQL管理平臺(tái)flink-streaming-platform-web安裝搭建
公眾號(hào)文章都在個(gè)人博客網(wǎng)站:https://www.ikeguang.com/ 同步,歡迎訪問。
最近看到有人在用flink sql的頁面管理平臺(tái),大致看了下,嘗試安裝使用,比原生的flink sql界面確實(shí)好用多了,我們看下原生的,通過bin/sql-client.sh命令進(jìn)入那個(gè)黑框,一只松鼠,對(duì),就是那個(gè)界面。。。。
這個(gè)工具不是Flink官方出的,是一個(gè)國內(nèi)的小伙伴寫的,Github地址:
https://github.com/zhp8341/flink-streaming-platform-web
根據(jù)github上,作者的描述,flink-streaming-patform-web主要功能:
- [1] 任務(wù)支持單流 、雙流、 單流與維表等。
- [2] 支持本地模式、yarn-per模式、STANDALONE模式 Application模式
- [3] 支持catalog、hive。
- [4] 支持自定義udf、連接器等,完全兼容官方連接器。
- [5] 支持sql的在線開發(fā),語法提示,格式化。
- [6] 支持釘釘告警、自定義回調(diào)告警、自動(dòng)拉起任務(wù)。
- [7] 支持自定義Jar提交任務(wù)。
- [8] 支持多版本flink版本(需要用戶編譯對(duì)應(yīng)flink版本)。
- [9] 支持自動(dòng)、手動(dòng)savepoint備份,并且從savepoint恢復(fù)任務(wù)。
- [10] 支持批任務(wù)如:hive。
- [11] 連接器、udf等三jar管理
是不是覺得很強(qiáng)大,很多同學(xué)已經(jīng)摩拳擦掌想試試了。
安裝
這里只介紹flink on yarn模式的安裝,如果你的hadoop集群已經(jīng)安裝好了,大概半個(gè)小時(shí)就能好;否則,安裝hadoop集群可老費(fèi)事兒了。總體步驟如下:
第一步 hadoop集群
這里假設(shè)你的hadoop集群是好的,yarn是可以正常使用的,8088端口可以訪問,如下:

第二步 下載flink
flink on yarn,只需要下載一個(gè)flink安裝包即可使用,下載命令:
http://archive.apache.org/dist/flink/flink-1.13.5/flink-1.13.5-bin-scala_2.11.tgz
解壓
tar?-xvf?flink-1.13.5-bin-scala_2.11.tgz
關(guān)鍵:這里問題來了,我的flink怎么識(shí)別hadoop呢,需要配置一個(gè)環(huán)境變量,編輯 /etc/profile,鍵入內(nèi)容:
export?HADOOP_CONF_DIR=填你的hadoop配置文件目錄,比如我的是/usr/local/hadoop2.8/etc/hadoop/conf
export?HADOOP_CLASSPATH=`hadoop?classpath`
好了,這樣一個(gè)flink on yarn的環(huán)境就搭建好了。
第三步 安裝flink-streaming-patform-web
官方地址文章開頭已經(jīng)給出,在github找到下載地址:https://github.com/zhp8341/flink-streaming-platform-web/releases/,我下載的版本是這個(gè)。

為什么我下的是適配flink 1.14.3的,我前面安裝flink1.13.5,我也是下了一堆flink,經(jīng)過嘗試,才發(fā)現(xiàn)flink1.13.5這個(gè)版本,適配flink-streaming-platform-web tagV20220625。

解壓后,修改配置文件:application.properties,懂的人知道這個(gè)其實(shí)是個(gè)springboot的配置文件。
####?jdbc信息
server.port=9084
spring.datasource.url=jdbc:mysql://192.168.1.1:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=bigdata
spring.datasource.password=bigdata
這里配置了一個(gè)數(shù)據(jù)庫,需要自己新建一下,建表語句作者給出了:https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql/flink_web.sql,把這段sql執(zhí)行一下,在flink_web數(shù)據(jù)庫建相應(yīng)的一些整個(gè)系統(tǒng)運(yùn)行需要的表。
啟動(dòng)web服務(wù)
#?bin目錄下面的命令
啟動(dòng)?:?sh?deploy.sh??start
停止?:??sh?deploy.sh??stop
服務(wù)啟動(dòng)后,通過9084端口在瀏覽器訪問

第四步 配置flink web平臺(tái)
這一步很關(guān)鍵,頁面上點(diǎn)擊系統(tǒng)設(shè)置,進(jìn)入配置頁面:

這里的參數(shù)意義:
- Flink客戶端目錄:就是安裝的Flink目錄;
- Flink管理平臺(tái)目錄:就是下載的flink-streaming-platform-web放的目錄;
- yarn RM http地址:就是yarn.resourcemanager.webapp.address,通常是8088端口;
經(jīng)過測(cè)試,配置這3個(gè)參數(shù)即可使用。
第五步 運(yùn)行demo
這里以官方demo為例,[demo1 單流kafka寫入mysqld 參考](https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql_demo/demo_1.md),這是一個(gè)通過flink sql消費(fèi)kafka,聚合結(jié)果寫入mysql的例子。
- 在flink_web數(shù)據(jù)建表
CREATE?TABLE?sync_test_1?(
??`day_time`?varchar(64)?NOT?NULL,
??`total_gmv`?bigint(11)?DEFAULT?NULL,
??PRIMARY?KEY?(`day_time`)
)?ENGINE=InnoDB?DEFAULT?CHARSET=utf8mb4;
- 下載依賴jar包
因?yàn)樯婕暗絢afka和mysql,需要對(duì)應(yīng)的connector依賴jar包,下圖中標(biāo)注出來了,放在Flink的lib目錄(/var/lib/hadoop-hdfs/flink-1.13.5/lib)下面:

wget?https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.11/1.13.5/flink-connector-jdbc_2.11-1.13.5.jar
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.13.5/flink-connector-kafka_2.11-1.13.5.jar
技巧:下載Flink的依賴jar包,有個(gè)地方下載很方便,地址是:
https://repo1.maven.org/maven2/org/apache/flink/
這樣依賴,一切都準(zhǔn)備好了。
在web頁面新建sql流任務(wù):

我建的一個(gè),任務(wù)屬性我是這樣填寫的:

sql腳本內(nèi)容:
create?table?flink_test_1?(?
????id?BIGINT,
????day_time?VARCHAR,
????amnount?BIGINT,
????proctime?AS?PROCTIME?()
)
with?(?
????'connector'?=?'kafka',
????'topic'?=?'flink_connector',
????'properties.bootstrap.servers'?=?'kafka-001:9092',?
????'properties.group.id'?=?'flink_gp_test1',
????'scan.startup.mode'?=?'earliest-offset',
????'format'?=?'json',
????'json.fail-on-missing-field'?=?'false',
????'json.ignore-parse-errors'?=?'true'
);
CREATE?TABLE?sync_test_1?(
????day_time?string,
????total_gmv?bigint,
????PRIMARY?KEY?(day_time)?NOT?ENFORCED
)?WITH?(
????'connector'?=?'jdbc',
????'url'?=?'jdbc:mysql://192.168.1.1:3306/flink_web?characterEncoding=UTF-8',
????'table-name'?=?'sync_test_1',
????'username'?=?'bigdata',
????'password'?=?'bigdata'
);
INSERT?INTO?sync_test_1?
SELECT?day_time,SUM(amnount)?AS?total_gmv
FROM?flink_test_1
GROUP?BY?day_time;
創(chuàng)建好任務(wù)后,啟動(dòng)任務(wù)吧。
啟動(dòng)后,可以在yarn的8088端口頁面看到起了一個(gè)application,名稱是新建任務(wù)填寫的名稱加flink@前綴:

這個(gè)任務(wù),我們點(diǎn)進(jìn)去看,通過管理平臺(tái)提交的sql任務(wù)確實(shí)跑起來了,這個(gè)頁面了解Flink的同學(xué)就很熟悉了:

其實(shí),這段sql腳本,我們可以在flink的bin/sql-client.sh進(jìn)入的那個(gè)小松鼠的黑框里面執(zhí)行的,你們可以試一下。

kafka控制臺(tái)往主題里面寫數(shù)據(jù),主題不存在會(huì)自動(dòng)創(chuàng)建:

我們?cè)倏纯磎ysql里面:

數(shù)據(jù)已經(jīng)進(jìn)來了。
與Flink SQL的比較
我們可以看到,flink-streaming-platform-web這個(gè)工具只是讓我們不需要在這個(gè)黑框里面寫sql了,而是在網(wǎng)頁上面寫sql,系統(tǒng)會(huì)把寫的sql進(jìn)行校驗(yàn)給flink去執(zhí)行,不管是flink-streaming-platform-web網(wǎng)頁也好,還是那個(gè)黑框sql控制臺(tái),都是客戶端,本質(zhì)上都是flink提供的一些table api去執(zhí)行任務(wù)。

