指標(biāo)統(tǒng)計:基于流計算Oceanus(Flink) 實現(xiàn)實時UVPV統(tǒng)計

導(dǎo)語?|?最近梳理了一下如何用Flink來實現(xiàn)實時的UV、PV指標(biāo)的統(tǒng)計,并和公司內(nèi)微視部門的同事交流。然后針對該場景做了簡化,并發(fā)現(xiàn)使用Flink SQL來實現(xiàn)這些指標(biāo)的統(tǒng)計會更加便捷。
一、解決方案描述
(一)概述

(二)方案架構(gòu)及優(yōu)勢

本地數(shù)據(jù)中心(IDC)的自建Kafka集群
私有網(wǎng)絡(luò)VPC
專線接入/云聯(lián)網(wǎng)/VPN連接/對等連接
流計算Oceanus (Flink)
云數(shù)據(jù)庫Redis
二、前置準(zhǔn)備
(一)創(chuàng)建私有網(wǎng)絡(luò)VPC
(二)創(chuàng)建Oceanus集群
流計算Oceanus是大數(shù)據(jù)產(chǎn)品生態(tài)體系的實時化分析利器,是基于Apache Flink構(gòu)建的具備一站開發(fā)、無縫連接、亞秒延時、低廉成本、安全穩(wěn)定等特點的企業(yè)級實時大數(shù)據(jù)分析平臺。流計算Oceanus以實現(xiàn)企業(yè)數(shù)據(jù)價值最大化為目標(biāo),加速企業(yè)實時化數(shù)字化的建設(shè)進(jìn)程。
在Oceanus控制臺的【集群管理->【新建集群】頁面創(chuàng)建集群,選擇地域、可用區(qū)、VPC、日志、存儲,設(shè)置初始密碼等。VPC及子網(wǎng)使用剛剛創(chuàng)建好的網(wǎng)絡(luò)。創(chuàng)建完后Flink的集群如下:

(三)創(chuàng)建Redis集群
Redis控制臺:https://console.cloud.tencent.com/redis#/

(四)配置自建Kafka集群
修改自建Kafka集群配置
advertised.listeners=PLAINTEXT://10.1.0.10:9092advertised.host.name=PLAINTEXT://10.1.0.10:9092
修改后重啟Kafka集群。
注意:若在云上使用到自建的zookeeper地址,也需要將zk配置中的hostname修改ip地址形式。
模擬發(fā)送數(shù)據(jù)到topic
Kafka客戶端
./bin/kafka-console-producer.sh --broker-list 10.1.0.10:9092 --topic uvpv-demo{"record_type":0, "user_id": 2, "client_ip": "100.0.0.2", "product_id": 101, "create_time": "2021-09-08 16:20:00"}{"record_type":0, "user_id": 3, "client_ip": "100.0.0.3", "product_id": 101, "create_time": "2021-09-08 16:20:00"}{"record_type":1, "user_id": 2, "client_ip": "100.0.0.1", "product_id": 101, "create_time": "2021-09-08 16:20:00"}
使用腳本發(fā)送
(五)打通自建IDC集群到騰訊云網(wǎng)絡(luò)通信
專線接入
https://cloud.tencent.com/document/product/216適用于本地數(shù)據(jù)中心IDC與騰訊云網(wǎng)絡(luò)打通。
云聯(lián)網(wǎng)
https://cloud.tencent.com/document/product/877適用于本地數(shù)據(jù)中心IDC與騰訊云網(wǎng)絡(luò)打通,也可用于云上不同地域間私有網(wǎng)絡(luò)VPC打通。
VPN連接
https://cloud.tencent.com/document/product/554適用于本地數(shù)據(jù)中心IDC與騰訊云網(wǎng)絡(luò)打通。
對等連接+NAT網(wǎng)關(guān)
對等連接:
https://cloud.tencent.com/document/product/553
NAT網(wǎng)關(guān):
https://cloud.tencent.com/document/product/552適合云上不同地域間私有網(wǎng)絡(luò)VPC打通,不適合本地IDC到騰訊云網(wǎng)絡(luò)。

三、方案實現(xiàn)
(一)業(yè)務(wù)目標(biāo)
網(wǎng)站的獨立訪客數(shù)量UV。Oceanus處理后在Redis中通過set類型存儲獨立訪客數(shù)量,同時也達(dá)到了對同一訪客的數(shù)據(jù)去重的目的。
網(wǎng)站商品頁面的點擊量PV。Oceanus處理后在Redis中使用list類型存儲頁面點擊量。
轉(zhuǎn)化率(轉(zhuǎn)化率=成交次數(shù)/點擊量)。Oceanus處理后在Redis中用String存儲即可。
(二)源數(shù)據(jù)格式

Kafka內(nèi)部采用json格式存儲,數(shù)據(jù)格式如下:
# 瀏覽記錄{"record_type":0, # 0 表示瀏覽記錄"user_id": 6,"client_ip": "100.0.0.6","product_id": 101,"create_time": "2021-09-06 16:00:00"}# 購買記錄{"record_type":1, # 1 表示購買記錄"user_id": 6,"client_ip": "100.0.0.8","product_id": 101,"create_time": "2021-09-08 18:00:00"}
(三)編寫Flink SQL作業(yè)
定義Source
CREATE TABLE `input_web_record` (`record_type` INT,`user_id` INT,`client_ip` VARCHAR,`product_id` INT,`create_time` TIMESTAMP,`times` AS create_time,WATERMARK FOR times AS times - INTERVAL '10' MINUTE) WITH ('connector' = 'kafka', -- 可選 'kafka','kafka-0.11'. 注意選擇對應(yīng)的內(nèi)置 Connector'topic' = 'uvpv-demo','scan.startup.mode' = 'earliest-offset',--'properties.bootstrap.servers' = '82.157.27.147:9092','properties.bootstrap.servers' = '10.1.0.10:9092','properties.group.id' = 'WebRecordGroup', -- 必選參數(shù), 一定要指定 Group ID'format' = 'json','json.ignore-parse-errors' = 'true', -- 忽略 JSON 結(jié)構(gòu)解析異常'json.fail-on-missing-field' = 'false' -- 如果設(shè)置為 true, 則遇到缺失字段會報錯 設(shè)置為 false 則缺失字段設(shè)置為 null);
定義Sink
-- UV sinkCREATE TABLE `output_uv` (`userids` STRING,`user_id` STRING) WITH ('connector' = 'redis','command' = 'sadd', -- 使用集合保存uv(支持命令:set、lpush、sadd、hset、zadd)'nodes' = '192.28.28.217:6379', -- redis連接地址,集群模式多個節(jié)點使用'',''分隔。-- 'additional-key' = '', -- 用于指定hset和zadd的key。hset、zadd必須設(shè)置。 'password' = 'yourpassword');-- PV sinkCREATE TABLE `output_pv` (`pagevisits` STRING,`product_id` STRING,`hour_count` BIGINT) WITH ('connector' = 'redis','command' = 'lpush', -- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)'nodes' = '192.28.28.217:6379', -- redis連接地址,集群模式多個節(jié)點使用'',''分隔。-- 'additional-key' = '', -- 用于指定hset和zadd的key。hset、zadd必須設(shè)置。 'password' = 'yourpassword');-- 轉(zhuǎn)化率 sinkCREATE TABLE `output_conversion_rate` (`conversion_rate` STRING,`rate` STRING) WITH ('connector' = 'redis','command' = 'set', -- 使用列表保存pv(支持命令:set、lpush、sadd、hset、zadd)'nodes' = '192.28.28.217:6379', -- redis連接地址,集群模式多個節(jié)點使用'',''分隔。-- 'additional-key' = '', -- 用于指定hset和zadd的key。hset、zadd必須設(shè)置。 'password' = 'yourpassword');
業(yè)務(wù)邏輯
-- 加工得到 UV 指標(biāo),統(tǒng)計所有時間內(nèi)的 UVINSERT INTO output_uvSELECT'userids' AS `userids`,CAST(user_id AS string) AS user_idFROM input_web_record ;-- 加工并得到 PV 指標(biāo),統(tǒng)計每 10 分鐘內(nèi)的 PVINSERT INTO output_pvSELECT'pagevisits' AS pagevisits,CAST(product_id AS string) AS product_id,SUM(product_id) AS hour_countFROM input_web_record WHERE record_type = 0GROUP BYHOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),product_id,user_id;-- 加工并得到轉(zhuǎn)化率指標(biāo),統(tǒng)計每 10 分鐘內(nèi)的轉(zhuǎn)化率INSERT INTO output_conversion_rateSELECT'conversion_rate' AS conversion_rate,CAST( (((SELECT COUNT(1) FROM input_web_record WHERE record_type=0)*1.0)/SUM(a.product_id)) as string)FROM (SELECT * FROM input_web_record where record_type = 1) AS aGROUP BYHOP(times, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE),product_id;
(四)結(jié)果驗證

userids: 存儲UV
pagevisits: 存儲PV
conversion_rate: 存儲轉(zhuǎn)化率,即購買商品次數(shù)/總頁面點擊量。
四、總結(jié)
流計算 Oceanus?限量秒殺專享活動火爆進(jìn)行中↓↓

??點擊下方「閱讀原文」,了解騰訊云流計算Oceanus更多信息~
