大數(shù)據(jù)實(shí)戰(zhàn) -- 基于 Flink 的短視頻生產(chǎn)消費(fèi)監(jiān)控
本文詳細(xì)介紹了實(shí)時監(jiān)控類指標(biāo)的數(shù)據(jù)流轉(zhuǎn)鏈路以及技術(shù)方案,大多數(shù)的實(shí)時監(jiān)控類指標(biāo)都可按照本文中的幾種方案實(shí)現(xiàn)。
短視頻生產(chǎn)消費(fèi)監(jiān)控
短視頻帶來了全新的傳播場域和節(jié)目形態(tài),小屏幕、快節(jié)奏成為行業(yè)潮流的同時,也催生了新的用戶消費(fèi)習(xí)慣,為創(chuàng)作者和商戶帶來收益。而多元化的短視頻也可以為品牌方提供營銷機(jī)遇。
其中對于垂類生態(tài)短視頻的生產(chǎn)消費(fèi)熱點(diǎn)的監(jiān)控分析目前成為了實(shí)時數(shù)據(jù)處理很常見的一個應(yīng)用場景,比如對某個圈定的垂類生態(tài)下的視頻生產(chǎn)或者視頻消費(fèi)進(jìn)行監(jiān)控,對熱點(diǎn)視頻生成對應(yīng)的優(yōu)化推薦策略,促進(jìn)熱點(diǎn)視頻的生產(chǎn)或者消費(fèi),構(gòu)建整個生產(chǎn)消費(fèi)數(shù)據(jù)鏈路的閉環(huán),從而提高創(chuàng)作者收益以及消費(fèi)者留存。
本文將完整分析垂類生態(tài)短視頻生產(chǎn)消費(fèi)數(shù)據(jù)的整條鏈路流轉(zhuǎn)方式,并基于 Flink 提供幾種對于垂類視頻生產(chǎn)消費(fèi)監(jiān)控的方案設(shè)計。通過本文,你可以了解到:
-
垂類生態(tài)短視頻生產(chǎn)消費(fèi)數(shù)據(jù)鏈路閉環(huán)
-
實(shí)時監(jiān)控短視頻生產(chǎn)消費(fèi)的方案設(shè)計
-
不同監(jiān)控量級場景下的代碼實(shí)現(xiàn)
-
flink 學(xué)習(xí)資料
項(xiàng)目簡介
垂類生態(tài)短視頻生產(chǎn)消費(fèi)數(shù)據(jù)鏈路流轉(zhuǎn)架構(gòu)圖如下,此數(shù)據(jù)流轉(zhuǎn)圖也適用于其他場景:

在上述場景中,用戶生產(chǎn)和消費(fèi)短視頻,從而客戶端、服務(wù)端以及數(shù)據(jù)庫會產(chǎn)生相應(yīng)的行為操作日志,這些日志會通過日志抽取中間件抽取到消息隊(duì)列中,我們目前的場景中是使用 Kafka 作為消息隊(duì)列;然后使用 flink 對垂類生態(tài)中的視頻進(jìn)行生產(chǎn)或消費(fèi)監(jiān)控(內(nèi)容生產(chǎn)通常是圈定垂類作者 id 池,內(nèi)容消費(fèi)通常是圈定垂類視頻 id 池),最后將實(shí)時聚合數(shù)據(jù)產(chǎn)出到下游;下游可以以數(shù)據(jù)服務(wù),實(shí)時看板的方式展現(xiàn),運(yùn)營同學(xué)或者自動化工具最終會幫助我們分析當(dāng)前垂類下的生產(chǎn)或者消費(fèi)熱點(diǎn),從而生成推薦策略。
方案設(shè)計

其中數(shù)據(jù)源如下:
-
Kafka 為全量內(nèi)容生產(chǎn)和內(nèi)容消費(fèi)的日志。 -
Rpc/Http/Mysql/配置中心/Redis/HBase 為需要監(jiān)控的垂類生態(tài)內(nèi)容 id 池(內(nèi)容生產(chǎn)則為作者 id 池,內(nèi)容消費(fèi)則為視頻 id 池),其主要是提供給運(yùn)營同學(xué)動態(tài)配置需要監(jiān)控的 id 范圍,其可以在 flink 中進(jìn)行實(shí)時查詢,解析運(yùn)營同學(xué)想要的監(jiān)控指標(biāo)范圍,以及監(jiān)控的指標(biāo)和計算方式,然后加工數(shù)據(jù)產(chǎn)出,可以支持隨時配置,實(shí)時數(shù)據(jù)隨時計算產(chǎn)出。
其中數(shù)據(jù)匯為聚類好的內(nèi)容生產(chǎn)或者消費(fèi)熱點(diǎn)話題或者事件指標(biāo):
-
Redis/HBase 主要是以低延遲(Redis 5ms p99,HBase 100ms p99,不同公司的服務(wù)能力不同)并且高 QPS 提供數(shù)據(jù)服務(wù),給 Server 端或者線上用戶提供低延遲的數(shù)據(jù)查詢。 -
Druid/Mysql 可以做為 OLAP 引擎為 BI 分析提供靈活的上卷下鉆聚合分析能力,供運(yùn)營同學(xué)配置可視化圖表使用。 -
Kafka 可以以流式數(shù)據(jù)產(chǎn)出,從而提供給下游繼續(xù)消費(fèi)或者進(jìn)行特征提取。
廢話不多說,我們直接上方案和代碼,下述幾種方案按照監(jiān)控 id 范圍量級區(qū)分,不同的量級對應(yīng)著不同的方案,其中的代碼示例為 ProcessWindowFunction,也可以使用 AggregateFunction 代替,其中主要監(jiān)控邏輯都相同。
方案 1
適合監(jiān)控 id 數(shù)據(jù)量小的場景(幾千 id),其實(shí)現(xiàn)方式是在 flink 任務(wù)初始化時將需要監(jiān)控的 id 池或動態(tài)配置中心的 id 池加載到內(nèi)存當(dāng)中,之后只需要在內(nèi)存中判斷內(nèi)容生產(chǎn)或者消費(fèi)數(shù)據(jù)是否在這個監(jiān)控池當(dāng)中。
ProcessWindowFunction p = new ProcessWindowFunction
() {
// 配置中心動態(tài) id 池
private Config
> needMonitoredIdsConfig;
@Override
public void open(Configuration parameters) throws Exception {
this.needMonitoredIdsConfig = ConfigBuilder
.buildSet(
"needMonitoredIds", Long.class);
}
@Override
public void process(Long bucket, Context context, Iterable
iterable, Collector
collector)
throws Exception {
Set
needMonitoredIds = needMonitoredIdsConfig.get();
/**
* 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
*/
}
}
監(jiān)控的 id 池可以按照固定或者可配置從而分出兩種獲取方式:第一種是在 flink 任務(wù)開始時就全部加載進(jìn)內(nèi)存中,這種方式適合監(jiān)控 id 池不變的情況;第二種是使用動態(tài)配置中心,每次都從配置中心訪問到最新的監(jiān)控 id 池,其可以滿足動態(tài)配置或者更改 id 池的需求,并且這種實(shí)現(xiàn)方式通常可以實(shí)時感知到配置更改,幾乎無延遲。
方案 2
適合監(jiān)控 id 數(shù)據(jù)量適中(幾十萬 id),監(jiān)控數(shù)據(jù)范圍會不定時發(fā)生變動的場景。其實(shí)現(xiàn)方式是在 flink 算子中定時訪問接口獲取最新的監(jiān)控 id 池,以獲取最新監(jiān)控數(shù)據(jù)范圍。
ProcessWindowFunction p = new ProcessWindowFunction
() {
private
long lastRefreshTimestamp;
private Set
needMonitoredIds;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.refreshNeedMonitoredIds(System.currentTimeMillis());
}
@Override
public void process(Long bucket, Context context, Iterable
iterable, Collector
collector)
throws Exception {
long windowStart = context.window().getStart();
this.refreshNeedMonitoredIds(windowStart);
/**
* 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
*/
}
public void refreshNeedMonitoredIds(long windowStart) {
// 每隔 10 秒訪問一次
if (windowStart -
this.lastRefreshTimestamp >=
10000L) {
this.lastRefreshTimestamp = windowStart;
this.needMonitoredIds = Rpc.get(...)
}
}
}
根據(jù)上述代碼實(shí)現(xiàn)方式,按照時間間隔的方式刷新 id 池,其缺點(diǎn)在于不能實(shí)時感知監(jiān)控 id 池的變化,所以刷新時間可能會和需求場景強(qiáng)耦合(如果 id 池會頻繁更新,那么就需要縮小刷新時間間隔)。也可根據(jù)需求場景在每個窗口開始前刷新 id 池,這樣可保證每個窗口中的 id 池中的數(shù)據(jù)一直保持更新。
方案 3
方案 3 對方案 2 的一個優(yōu)化(幾十萬 id,我們生產(chǎn)環(huán)境中最常用的)。其實(shí)現(xiàn)方式是在 flink 中使用 broadcast 算子定時訪問監(jiān)控 id 池,并將 id 池以廣播的形式下發(fā)給下游參與計算的各個算子。其優(yōu)化點(diǎn)在于:比如任務(wù)的并行度為 500,每 1s 訪問一次,采用方案 2 則訪問監(jiān)控 id 池接口的 QPS 為 500,在使用 broadcast 算子之后,其訪問 QPS 可以減少到 1,可以大大減少對接口的訪問量,減輕接口壓力。
public class Example {
@Slf4j
static class NeedMonitorIdsSource implements SourceFunction<Map<Long, Set<Long>>> {
private volatile boolean isCancel;
@Override
public void run(SourceContext throws Exception {
while (!this.isCancel) {
try {
TimeUnit.SECONDS.sleep(1);
Set
needMonitorIds = Rpc.get(...);
// 可以和上一次訪問的數(shù)據(jù)做比較查看是否有變化,如果有變化,才發(fā)送出去
if (CollectionUtils.isNotEmpty(needMonitorIds)) {
sourceContext.collect(
new HashMap
>() {{
put(0L, needMonitorIds);
}});
}
}
catch (Throwable e) {
// 防止接口訪問失敗導(dǎo)致的錯誤導(dǎo)致 flink job 掛掉
log.error(
"need monitor ids error", e);
}
}
}
@Override
public void cancel() {
this.isCancel =
true;
}
}
public static void main(String[] args) {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
InputParams inputParams =
new InputParams(parameterTool);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
final MapStateDescriptor
> broadcastMapStateDescriptor =
new MapStateDescriptor<>(
"config-keywords",
BasicTypeInfo.LONG_TYPE_INFO,
TypeInformation.of(
new TypeHint
>() {
}));
/********************* kafka source *********************/
BroadcastStream
方案 4
適合于超大監(jiān)控范圍的數(shù)據(jù)(幾百萬,我們自己的生產(chǎn)實(shí)踐中使用擴(kuò)量到 500 萬)。其原理是將監(jiān)控范圍接口按照 id 按照一定規(guī)則分桶。flink 消費(fèi)到日志數(shù)據(jù)后將 id 按照 監(jiān)控范圍接口 id 相同的分桶方法進(jìn)行分桶 keyBy,這樣在下游算子中每個算子中就可以按照桶變量值,從接口中拿到對應(yīng)桶的監(jiān)控 id 數(shù)據(jù),這樣 flink 中并行的每個算子只需要獲取到自己對應(yīng)的桶的數(shù)據(jù),可以大大減少請求的壓力。
public class Example {
public static void main(String[] args) {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
InputParams inputParams = new InputParams(parameterTool);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
final MapStateDescriptor
> broadcastMapStateDescriptor =
new MapStateDescriptor<>(
"config-keywords",
BasicTypeInfo.LONG_TYPE_INFO,
TypeInformation.of(
new TypeHint
>() {
}));
/********************* kafka source *********************/
DataStream
logSourceDataStream = SourceFactory.getSourceDataStream(...);
/********************* dag *********************/
DataStream
resultDataStream = logSourceDataStream
.keyBy(KeySelectorFactory.getLongKeySelector(CommonModel::getKeyField))
.timeWindow(Time.seconds(inputParams.accTimeWindowSeconds))
.process(
new ProcessWindowFunction
() {
private
long lastRefreshTimestamp;
private Set
oneBucketNeedMonitoredIds;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void process(Long bucket, Context context, Iterable
iterable, Collector
collector)
throws Exception {
long windowStart = context.window().getStart();
this.refreshNeedMonitoredIds(windowStart, bucket);
/**
* 判斷 commonModel 中的 id 是否在 needMonitoredIds 池中
*/
}
public void refreshNeedMonitoredIds(long windowStart, long bucket) {
// 每隔 10 秒訪問一次
if (windowStart -
this.lastRefreshTimestamp >=
10000L) {
this.lastRefreshTimestamp = windowStart;
this.oneBucketNeedMonitoredIds = Rpc.get(bucket, ...)
}
}
});
/********************* kafka sink *********************/
SinkFactory.setSinkDataStream(...);
env.execute(inputParams.jobName);
}
}
總結(jié)
本文首先介紹了,在短視頻領(lǐng)域中,短視頻生產(chǎn)消費(fèi)數(shù)據(jù)鏈路的整個閉環(huán),并且其數(shù)據(jù)鏈路閉環(huán)一般情況下也適用于其他場景;以及對應(yīng)的實(shí)時監(jiān)控方案的設(shè)計和不同場景下的代碼實(shí)現(xiàn),包括:
-
垂類生態(tài)短視頻生產(chǎn)消費(fèi)數(shù)據(jù)鏈路閉環(huán):用戶操作行為日志的流轉(zhuǎn),日志上傳,實(shí)時計算,以及流轉(zhuǎn)到 BI,數(shù)據(jù)服務(wù),最后數(shù)據(jù)賦能的整個流程
-
實(shí)時監(jiān)控方案設(shè)計:監(jiān)控類實(shí)時計算流程中各類數(shù)據(jù)源,數(shù)據(jù)匯的選型
-
監(jiān)控 id 池在不同量級場景下具體代碼實(shí)現(xiàn)
學(xué)習(xí)資料
flink
-
https://github.com/flink-china/flink-training-course/blob/master/README.md -
https://ververica.cn/developers-resources/ -
https://space.bilibili.com/33807709
--end--
掃描下方二維碼
添加好友,備注【交流】 可私聊交流,也可進(jìn)資源豐富學(xué)習(xí)群
