基于Flink快速開(kāi)發(fā)實(shí)時(shí)TopN程序最簡(jiǎn)單的思路
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
回復(fù)”資源“獲取更多資源

TopN 是統(tǒng)計(jì)報(bào)表和大屏非常常見(jiàn)的功能,主要用來(lái)實(shí)時(shí)計(jì)算排行榜。流式的TopN可以使業(yè)務(wù)方在內(nèi)存中按照某個(gè)統(tǒng)計(jì)指標(biāo)(如出現(xiàn)次數(shù))計(jì)算排名并快速出發(fā)出更新后的排行榜。
我們以統(tǒng)計(jì)詞頻為例展示一下如何快速開(kāi)發(fā)一個(gè)計(jì)算TopN的flink程序。
Flink支持各種各樣的流數(shù)據(jù)接口作為數(shù)據(jù)的數(shù)據(jù)源,本次demo我們采用內(nèi)置的socketTextStream作為數(shù)據(jù)數(shù)據(jù)源。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作為時(shí)間語(yǔ)義
DataStream text = env.socketTextStream(hostName, port); //監(jiān)聽(tīng)指定socket端口作為輸入
與離線wordcount類似,程序首先需要把輸入的整句文字按照分隔符split成一個(gè)一個(gè)單詞,然后按照單詞為key實(shí)現(xiàn)累加。
DataStream> ds = text
.flatMap(new LineSplitter()); //將輸入語(yǔ)句split成一個(gè)一個(gè)單詞并初始化count值為1的Tuple2類型
private static final class LineSplitter implements
FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}
DataStream> wcount = ds
.keyBy(0) //按照Tuple2的第一個(gè)元素為key,也就是單詞
.window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20)))
//key之后的元素進(jìn)入一個(gè)總時(shí)間長(zhǎng)度為600s,每20s向后滑動(dòng)一次的滑動(dòng)窗口
.sum(1);// 將相同的key的元素第二個(gè)count值相加
全局TopN
數(shù)據(jù)流經(jīng)過(guò)前面的處理后會(huì)每20s計(jì)算一次各個(gè)單詞的count值并發(fā)送到下游窗口。
DataStream> ret = wcount
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
//所有key元素進(jìn)入一個(gè)20s長(zhǎng)的窗口(選20秒是因?yàn)樯嫌未翱诿?0s計(jì)算一輪數(shù)據(jù),topN窗口一次計(jì)算只統(tǒng)計(jì)一個(gè)窗口時(shí)間內(nèi)的變化)
.process(new TopNAllFunction(5));//計(jì)算該窗口TopN
windowAll是一個(gè)全局并發(fā)為1的特殊操作,也就是所有元素都會(huì)進(jìn)入到一個(gè)窗口內(nèi)進(jìn)行計(jì)算。
private static class TopNAllFunction
extends
ProcessAllWindowFunction, Tuple2, TimeWindow> {
private int topSize = 10;
public TopNAllFunction(int topSize) {
// TODO Auto-generated constructor stub
this.topSize = topSize;
}
@Override
public void process(
ProcessAllWindowFunction, Tuple2, TimeWindow>.Context arg0,
Iterable> input,
Collector> out) throws Exception {
// TODO Auto-generated method stub
TreeMap> treemap = new TreeMap>(
new Comparator() {
@Override
public int compare(Integer y, Integer x) {
// TODO Auto-generated method stub
return (x < y) ? -1 : 1;
}
}); //treemap按照key降序排列,相同count值不覆蓋
for (Tuple2 element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) { //只保留前面TopN個(gè)元素
treemap.pollLastEntry();
}
}
for (Entry> entry : treemap
.entrySet()) {
out.collect(entry.getValue());
}
}
}
分組TopN
在部分場(chǎng)景下,用戶希望根據(jù)不同的分組進(jìn)行排序,計(jì)算出每個(gè)分組的一個(gè)排行榜。
wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分組
.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口統(tǒng)計(jì)上游數(shù)據(jù)
.process(new TopNFunction(5)) //分組TopN統(tǒng)計(jì)
private static class TupleKeySelectorByStart implements
KeySelector, String> {
@Override
public String getKey(Tuple2 value) throws Exception {
// TODO Auto-generated method stub
return value.f0.substring(0, 1); //取首字母做key
}
}
/**
*
*針對(duì)keyby window的TopN函數(shù),繼承自ProcessWindowFunction
*
*/
private static class TopNFunction
extends
ProcessWindowFunction, Tuple2, String, TimeWindow> {
private int topSize = 10;
public TopNFunction(int topSize) {
// TODO Auto-generated constructor stub
this.topSize = topSize;
}
@Override
public void process(
String arg0,
ProcessWindowFunction, Tuple2, String, TimeWindow>.Context arg1,
Iterable> input,
Collector> out) throws Exception {
// TODO Auto-generated method stub
TreeMap> treemap = new TreeMap>(
new Comparator() {
@Override
public int compare(Integer y, Integer x) {
// TODO Auto-generated method stub
return (x < y) ? -1 : 1;
}
});
for (Tuple2 element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) {
treemap.pollLastEntry();
}
}
for (Entry> entry : treemap
.entrySet()) {
out.collect(entry.getValue());
}
}
}
上面的代碼實(shí)現(xiàn)了按照首字母分組,取每組元素count最高的TopN方法。
嵌套TopN
全局topN的缺陷是,由于windowall是一個(gè)全局并發(fā)為1的操作,所有的數(shù)據(jù)只能匯集到一個(gè)節(jié)點(diǎn)進(jìn)行 TopN 的計(jì)算,那么計(jì)算能力就會(huì)受限于單臺(tái)機(jī)器,容易產(chǎn)生數(shù)據(jù)熱點(diǎn)問(wèn)題。
解決思路就是使用嵌套 TopN,或者說(shuō)兩層 TopN。在原先的 TopN 前面,再加一層 TopN,用于分散熱點(diǎn)。例如可以先加一層分組 TopN,第一層會(huì)計(jì)算出每一組的 TopN,而后在第二層中進(jìn)行合并匯總,得到最終的全網(wǎng)TopN。第二層雖然仍是單點(diǎn),但是大量的計(jì)算量由第一層分擔(dān)了,而第一層是可以水平擴(kuò)展的。


版權(quán)聲明:
本文為大數(shù)據(jù)技術(shù)與架構(gòu)整理,原作者獨(dú)家授權(quán)。未經(jīng)原作者允許轉(zhuǎn)載追究侵權(quán)責(zé)任。編輯|冷眼丶微信公眾號(hào)|import_bigdata文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!??


