<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink重點難點:維表關聯(lián)理論和Join實戰(zhàn)

          共 32062字,需瀏覽 65分鐘

           ·

          2021-09-07 18:54

          點擊上方藍色字體,選擇“設為星標”

          回復”面試“獲取更多驚喜

          在閱讀本文之前,你應該閱讀過的系列:

          Flink官方文檔中公開的信息

          1 Join 的概念

          在閱讀之前請一定要先了解:

          數(shù)據(jù)流操作的另一個常見需求是對兩條數(shù)據(jù)流中的事件進行聯(lián)結(connect)或Join。Flink DataStream API中內(nèi)置有兩個可以根據(jù)時間條件對數(shù)據(jù)流進行Join的算子:基于間隔的Join和基于窗口的Join。本節(jié)我們會對它們進行介紹。

          如果Flink內(nèi)置的Join算子無法表達所需的Join語義,那么你可以通過CoProcessFunction、BroadcastProcessFunction或KeyedBroadcastProcessFunction實現(xiàn)自定義的Join邏輯。

          注意,你要設計的Join算子需要具備高效的狀態(tài)訪問模式及有效的狀態(tài)清理策略。

          1.1 基于間隔的Join

          基于間隔的Join會對兩條流中擁有相同鍵值以及彼此之間時間戳不超過某一指定間隔的事件進行Join。

          下圖展示了兩條流(A和B)上基于間隔的Join,如果B中事件的時間戳相較于A中事件的時間戳不早于1小時且不晚于15分鐘,則會將兩個事件Join起來。Join間隔具有對稱性,因此上面的條件也可以表示為A中事件的時間戳相較B中事件的時間戳不早于15分鐘且不晚于1小時。

          基于間隔的Join目前只支持事件時間以及INNER JOIN語義(無法發(fā)出未匹配成功的事件)。下面的例子定義了一個基于間隔的Join。

          input1
          .intervalJoin(input2)
          .between(<lower-bound>, <upper-bound>) // 相對于input1的上下界
          .process(ProcessJoinFunction) // 處理匹配的事件對

          Join成功的事件對會發(fā)送給ProcessJoinFunction。下界和上界分別由負時間間隔和正時間間隔來定義,例如between(Time.hour(-1), Time.minute(15))。在滿足下界值小于上界值的前提下,你可以任意對它們賦值。例如,允許出現(xiàn)B中事件的時間戳相較A中事件的時間戳早1~2小時這樣的條件。

          基于間隔的Join需要同時對雙流的記錄進行緩沖。對第一個輸入而言,所有時間戳大于當前水位線減去間隔上界的數(shù)據(jù)都會被緩沖起來;對第二個輸入而言,所有時間戳大于當前水位線加上間隔下界的數(shù)據(jù)都會被緩沖起來。注意,兩側邊界值都有可能為負。上圖中的Join需要存儲數(shù)據(jù)流A中所有時間戳大于當前水位線減去15分鐘的記錄,以及數(shù)據(jù)流B中所有時間戳大于當前水位線減去1小時的記錄。不難想象,如果兩條流的事件時間不同步,那么Join所需的存儲就會顯著增加,因為水位線總是由“較慢”的那條流來決定。

          案例你可以參考:《Flink重點難點:時間、窗口和流Join》

          1.2 基于窗口的Join

          顧名思義,基于窗口的Join需要用到Flink中的窗口機制。其原理是將兩條輸入流中的元素分配到公共窗口中并在窗口完成時進行Join(或Cogroup)。

          下面的例子展示了如何定義基于窗口的Join。

          input1.join(input2)
          .where(...) // 為input1指定鍵值屬性
          .equalTo(...) // 為input2指定鍵值屬性
          .window(...) // 指定WindowAssigner
          [.trigger(...)] // 選擇性的指定Trigger
          [.evictor(...)] // 選擇性的指定Evictor
          .apply(...) // 指定JoinFunction

          下圖展示了DataStream API中基于窗口的Join是如何工作的。

          兩條輸入流都會根據(jù)各自的鍵值屬性進行分區(qū),公共窗口分配器會將二者的事件映射到公共窗口內(nèi)(其中同時存儲了兩條流中的數(shù)據(jù))。當窗口的計時器觸發(fā)時,算子會遍歷兩個輸入中元素的每個組合(叉乘積)去調(diào)用JoinFunction。同時你也可以自定義觸發(fā)器或移除器。由于兩條流中的事件會被映射到同一個窗口中,因此該過程中的觸發(fā)器和移除器與常規(guī)窗口算子中的完全相同。

          除了對窗口中的兩條流進行Join,你還可以對它們進行Cogroup,只需將算子定義開始位置的join改為coGroup()即可。Join和Cogroup的總體邏輯相同,二者的唯一區(qū)別是:Join會為兩側輸入中的每個事件對調(diào)用JoinFunction;而Cogroup中用到的CoGroupFunction會以兩個輸入的元素遍歷器為參數(shù),只在每個窗口中被調(diào)用一次。

          注意,對劃分窗口后的數(shù)據(jù)流進行Join可能會產(chǎn)生意想不到的語義。例如,假設你為執(zhí)行Join操作的算子配置了1小時的滾動窗口,那么一旦來自兩個輸入的元素沒有被劃分到同一窗口,它們就無法Join在一起,即使二者彼此僅相差1秒鐘。

          案例你可以參考:《Flink重點難點:時間、窗口和流Join》

          2 Streaming SQL Join

          3 Flink DataStream Join

          4 Flink 案例實戰(zhàn)演練

          Flink維表Join實踐

          常見的維表Join方式有四種:

          • 預加載維表

          • 熱存儲維表

          • 廣播維表

          • Temporal table function join

          下面分別使用這四種方式來實現(xiàn)一個join的需求,這個需求是:一個主流中數(shù)據(jù)是用戶信息,字段包括用戶姓名、城市id;維表是城市數(shù)據(jù),字段包括城市ID、城市名稱。要求用戶表與城市表關聯(lián),輸出為:用戶名稱、城市ID、城市名稱。

          用戶表表結構如下:

          城市維表表結構如下:

          1、 預加載維表

          通過定義一個類實現(xiàn)RichMapFunction,在open()中讀取維表數(shù)據(jù)加載到內(nèi)存中,在probe流map()方法中與維表數(shù)據(jù)進行關聯(lián)。RichMapFunction中open方法里加載維表數(shù)據(jù)到內(nèi)存的方式特點如下:

          優(yōu)點:實現(xiàn)簡單. 缺點:因為數(shù)據(jù)存于內(nèi)存,所以只適合小數(shù)據(jù)量并且維表數(shù)據(jù)更新頻率不高的情況下。雖然可以在open中定義一個定時器定時更新維表,但是還是存在維表更新不及時的情況。下面是一個例子:

          package join;

          import org.apache.flink.api.common.functions.RichMapFunction;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.configuration.Configuration;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import java.util.HashMap;
          import java.util.Map;

          /**
          * 這個例子是從socket中讀取的流,數(shù)據(jù)為用戶名稱和城市id,維表是城市id、城市名稱,
          * 主流和維表關聯(lián),得到用戶名稱、城市id、城市名稱
          * 這個例子采用在RichMapfunction類的open方法中將維表數(shù)據(jù)加載到內(nèi)存
          **/
          public class JoinDemo1 {
          public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
          .map(p -> {
          //輸入格式為:user,1000,分別是用戶名稱和城市編號
          String[] list = p.split(",");
          return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
          })
          .returns(new TypeHint<Tuple2<String, Integer>>() {
          });
          DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());
          result.print();
          env.execute("joinDemo1");
          }

          static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
          //定義一個變量,用于保存維表數(shù)據(jù)在內(nèi)存
          Map<Integer, String> dim;

          @Override
          public void open(Configuration parameters) throws Exception {
          //在open方法中讀取維表數(shù)據(jù),可以從數(shù)據(jù)中讀取、文件中讀取、接口中讀取等等。
          dim = new HashMap<>();
          dim.put(1001, "beijing");
          dim.put(1002, "shanghai");
          dim.put(1003, "wuhan");
          dim.put(1004, "changsha");
          }

          @Override
          public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {
          //在map方法中進行主流和維表的關聯(lián)
          String cityName = "";
          if (dim.containsKey(value.f1)) {
          cityName = dim.get(value.f1);
          }
          return new Tuple3<>(value.f0, value.f1, cityName);
          }
          }
          }

          2、 熱存儲維表

          這種方式是將維表數(shù)據(jù)存儲在Redis、HBase、MySQL等外部存儲中,實時流在關聯(lián)維表數(shù)據(jù)的時候?qū)崟r去外部存儲中查詢,這種方式特點如下:

          優(yōu)點:維度數(shù)據(jù)量不受內(nèi)存限制,可以存儲很大的數(shù)據(jù)量。缺點:因為維表數(shù)據(jù)在外部存儲中,讀取速度受制于外部存儲的讀取速度;另外維表的同步也有延遲。

          (1) 使用cache來減輕訪問壓力

          可以使用緩存來存儲一部分常訪問的維表數(shù)據(jù),以減少訪問外部系統(tǒng)的次數(shù),比如使用guava Cache。

          下面是一個例子:

          package join;

          import com.google.common.cache.*;
          import org.apache.flink.api.common.functions.RichMapFunction;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.configuration.Configuration;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

          import java.util.HashMap;
          import java.util.Map;
          import java.util.concurrent.TimeUnit;

          public class JoinDemo2 {
          public static void main(String[] args) throws Exception {

          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
          .map(p -> {
          //輸入格式為:user,1000,分別是用戶名稱和城市編號
          String[] list = p.split(",");
          return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
          })
          .returns(new TypeHint<Tuple2<String, Integer>>() {
          });

          DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());
          result.print();
          env.execute("joinDemo1");
          }

          static class MapJoinDemo1 extends RichMapFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
          LoadingCache<Integer, String> dim;

          @Override
          public void open(Configuration parameters) throws Exception {
          //使用google LoadingCache來進行緩存
          dim = CacheBuilder.newBuilder()
          //最多緩存?zhèn)€數(shù),超過了就根據(jù)最近最少使用算法來移除緩存
          .maximumSize(1000)
          //在更新后的指定時間后就回收
          .expireAfterWrite(10, TimeUnit.MINUTES)
          //指定移除通知
          .removalListener(new RemovalListener<Integer, String>() {
          @Override
          public void onRemoval(RemovalNotification<Integer, String> removalNotification) {
          System.out.println(removalNotification.getKey() + "被移除了,值為:" + removalNotification.getValue());
          }
          })
          .build(
          //指定加載緩存的邏輯
          new CacheLoader<Integer, String>() {
          @Override
          public String load(Integer cityId) throws Exception {
          String cityName = readFromHbase(cityId);
          return cityName;
          }
          }
          );

          }

          private String readFromHbase(Integer cityId) {
          //讀取hbase
          //這里寫死,模擬從hbase讀取數(shù)據(jù)
          Map<Integer, String> temp = new HashMap<>();
          temp.put(1001, "beijing");
          temp.put(1002, "shanghai");
          temp.put(1003, "wuhan");
          temp.put(1004, "changsha");
          String cityName = "";
          if (temp.containsKey(cityId)) {
          cityName = temp.get(cityId);
          }

          return cityName;
          }

          @Override
          public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {
          //在map方法中進行主流和維表的關聯(lián)
          String cityName = "";
          if (dim.get(value.f1) != null) {
          cityName = dim.get(value.f1);
          }
          return new Tuple3<>(value.f0, value.f1, cityName);
          }
          }
          }

          (2) 使用異步IO來提高訪問吞吐量

          Flink與外部存儲系統(tǒng)進行讀寫操作的時候可以使用同步方式,也就是發(fā)送一個請求后等待外部系統(tǒng)響應,然后再發(fā)送第二個讀寫請求,這樣的方式吞吐量比較低,可以用提高并行度的方式來提高吞吐量,但是并行度多了也就導致了進程數(shù)量多了,占用了大量的資源。

          Flink中可以使用異步IO來讀寫外部系統(tǒng),這要求外部系統(tǒng)客戶端支持異步IO,不過目前很多系統(tǒng)都支持異步IO客戶端。但是如果使用異步就要涉及到三個問題:

          • 超時:如果查詢超時那么就認為是讀寫失敗,需要按失敗處理;

          • 并發(fā)數(shù)量:如果并發(fā)數(shù)量太多,就要觸發(fā)Flink的反壓機制來抑制上游的寫入;

          • 返回順序錯亂:順序錯亂了要根據(jù)實際情況來處理,F(xiàn)link支持兩種方式:允許亂序、保證順序。

          下面是一個實例,演示了試用異步IO來訪問維表:

          package join;

          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.configuration.Configuration;
          import org.apache.flink.streaming.api.datastream.AsyncDataStream;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.functions.async.ResultFuture;
          import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
          import java.sql.DriverManager;
          import java.sql.PreparedStatement;
          import java.sql.ResultSet;
          import java.util.ArrayList;
          import java.util.List;
          import java.util.concurrent.TimeUnit;

          public class JoinDemo3 {
          public static void main(String[] args) throws Exception {

          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
          .map(p -> {
          //輸入格式為:user,1000,分別是用戶名稱和城市編號
          String[] list = p.split(",");
          return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
          })
          .returns(new TypeHint<Tuple2<String, Integer>>() {
          });


          DataStream<Tuple3<String,Integer, String>> orderedResult = AsyncDataStream
          //保證順序:異步返回的結果保證順序,超時時間1秒,最大容量2,超出容量觸發(fā)反壓
          .orderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
          .setParallelism(1);

          DataStream<Tuple3<String,Integer, String>> unorderedResult = AsyncDataStream
          //允許亂序:異步返回的結果允許亂序,超時時間1秒,最大容量2,超出容量觸發(fā)反壓
          .unorderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
          .setParallelism(1);

          orderedResult.print();
          unorderedResult.print();
          env.execute("joinDemo");
          }

          //定義個類,繼承RichAsyncFunction,實現(xiàn)異步查詢存儲在mysql里的維表
          //輸入用戶名、城市ID,返回 Tuple3<用戶名、城市ID,城市名稱>
          static class JoinDemo3AyncFunction extends RichAsyncFunction<Tuple2<String, Integer>, Tuple3<String, Integer, String>> {
          // 鏈接
          private static String jdbcUrl = "jdbc:mysql://192.168.145.1:3306?useSSL=false";
          private static String username = "root";
          private static String password = "123";
          private static String driverName = "com.mysql.jdbc.Driver";
          java.sql.Connection conn;
          PreparedStatement ps;

          @Override
          public void open(Configuration parameters) throws Exception {
          super.open(parameters);

          Class.forName(driverName);
          conn = DriverManager.getConnection(jdbcUrl, username, password);
          ps = conn.prepareStatement("select city_name from tmp.city_info where id = ?");
          }

          @Override
          public void close() throws Exception {
          super.close();
          conn.close();
          }

          //異步查詢方法
          @Override
          public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {
          // 使用 city id 查詢
          ps.setInt(1, input.f1);
          ResultSet rs = ps.executeQuery();
          String cityName = null;
          if (rs.next()) {
          cityName = rs.getString(1);
          }
          List list = new ArrayList<Tuple2<Integer, String>>();
          list.add(new Tuple3<>(input.f0,input.f1, cityName));
          resultFuture.complete(list);
          }

          //超時處理
          @Override
          public void timeout(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {
          List list = new ArrayList<Tuple2<Integer, String>>();
          list.add(new Tuple3<>(input.f0,input.f1, ""));
          resultFuture.complete(list);
          }
          }
          }

          3、 廣播維表

          利用Flink的Broadcast State將維度數(shù)據(jù)流廣播到下游做join操作。特點如下:

          優(yōu)點:維度數(shù)據(jù)變更后可以即時更新到結果中。缺點:數(shù)據(jù)保存在內(nèi)存中,支持的維度數(shù)據(jù)量比較小。

          下面是一個實例:

          package join;

          import org.apache.flink.api.common.functions.RichMapFunction;
          import org.apache.flink.api.common.state.BroadcastState;
          import org.apache.flink.api.common.state.MapStateDescriptor;
          import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.configuration.Configuration;
          import org.apache.flink.streaming.api.datastream.BroadcastStream;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
          import org.apache.flink.util.Collector;

          import java.util.ArrayList;
          import java.util.HashMap;
          import java.util.List;
          import java.util.Map;

          /**
          * 這個例子是從socket中讀取的流,數(shù)據(jù)為用戶名稱和城市id,維表是城市id、城市名稱,
          * 主流和維表關聯(lián),得到用戶名稱、城市id、城市名稱
          * 這個例子采用 Flink 廣播流的方式來做為維度
          **/
          public class JoinDemo4 {

          public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //定義主流
          DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
          .map(p -> {
          //輸入格式為:user,1000,分別是用戶名稱和城市編號
          String[] list = p.split(",");
          return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
          })
          .returns(new TypeHint<Tuple2<String, Integer>>() {
          });

          //定義城市流
          DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n")
          .map(p -> {
          //輸入格式為:城市ID,城市名稱
          String[] list = p.split(",");
          return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);
          })
          .returns(new TypeHint<Tuple2<Integer, String>>() {
          });

          //將城市流定義為廣播流
          final MapStateDescriptor<Integer, String> broadcastDesc = new MapStateDescriptor("broad1", Integer.class, String.class);
          BroadcastStream<Tuple2<Integer, String>> broadcastStream = cityStream.broadcast(broadcastDesc);

          DataStream result = textStream.connect(broadcastStream)
          .process(new BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<Integer, String>, Tuple3<String, Integer, String>>() {
          //處理非廣播流,關聯(lián)維度
          @Override
          public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
          ReadOnlyBroadcastState<Integer, String> state = ctx.getBroadcastState(broadcastDesc);
          String cityName = "";
          if (state.contains(value.f1)) {
          cityName = state.get(value.f1);
          }
          out.collect(new Tuple3<>(value.f0, value.f1, cityName));
          }

          @Override
          public void processBroadcastElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
          System.out.println("收到廣播數(shù)據(jù):" + value);
          ctx.getBroadcastState(broadcastDesc).put(value.f0, value.f1);
          }
          });


          result.print();
          env.execute("joinDemo");
          }
          }

          4、 Temporal table function join

          Temporal table是持續(xù)變化表上某一時刻的視圖,Temporal table function是一個表函數(shù),傳遞一個時間參數(shù),返回Temporal table這一指定時刻的視圖。

          可以將維度數(shù)據(jù)流映射為Temporal table,主流與這個Temporal table進行關聯(lián),可以關聯(lián)到某一個版本(歷史上某一個時刻)的維度數(shù)據(jù)。

          Temporal table function join的特點如下:

          優(yōu)點:維度數(shù)據(jù)量可以很大,維度數(shù)據(jù)更新及時,不依賴外部存儲,可以關聯(lián)不同版本的維度數(shù)據(jù)。缺點:只支持在Flink SQL API中使用。

          (1) ProcessingTime的一個實例

          package join;

          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.table.api.EnvironmentSettings;
          import org.apache.flink.table.api.Table;
          import org.apache.flink.table.api.java.StreamTableEnvironment;
          import org.apache.flink.table.functions.TemporalTableFunction;
          import org.apache.flink.types.Row;

          public class JoinDemo5 {
          public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

          //定義主流
          DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost", 9000, "\n")
          .map(p -> {
          //輸入格式為:user,1000,分別是用戶名稱和城市編號
          String[] list = p.split(",");
          return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
          })
          .returns(new TypeHint<Tuple2<String, Integer>>() {
          });

          //定義城市流
          DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost", 9001, "\n")
          .map(p -> {
          //輸入格式為:城市ID,城市名稱
          String[] list = p.split(",");
          return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);
          })
          .returns(new TypeHint<Tuple2<Integer, String>>() {
          });

          //轉(zhuǎn)變?yōu)門able
          Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ps.proctime");
          Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ps.proctime");

          //定義一個TemporalTableFunction
          TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ps", "city_id");
          //注冊表函數(shù)
          tableEnv.registerFunction("dimCity", dimCity);

          //關聯(lián)查詢
          Table result = tableEnv
          .sqlQuery("select u.user_name,u.city_id,d.city_name from " + userTable + " as u " +
          ", Lateral table (dimCity(u.ps)) d " +
          "where u.city_id=d.city_id");

          //打印輸出
          DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
          resultDs.print();
          env.execute("joinDemo");
          }
          }

          (2) EventTime的一個實例

          package join;

          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.streaming.api.TimeCharacteristic;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
          import org.apache.flink.streaming.api.windowing.time.Time;
          import org.apache.flink.table.api.EnvironmentSettings;
          import org.apache.flink.table.api.Table;
          import org.apache.flink.table.api.java.StreamTableEnvironment;
          import org.apache.flink.table.functions.TemporalTableFunction;
          import org.apache.flink.types.Row;

          import java.sql.Timestamp;
          import java.util.ArrayList;
          import java.util.List;

          public class JoinDemo9 {
          public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //指定是EventTime
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
          EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
          env.setParallelism(1);

          //主流,用戶流, 格式為:user_name、city_id、ts
          List<Tuple3<String, Integer, Long>> list1 = new ArrayList<>();
          list1.add(new Tuple3<>("user1", 1001, 1L));
          list1.add(new Tuple3<>("user1", 1001, 10L));
          list1.add(new Tuple3<>("user2", 1002, 2L));
          list1.add(new Tuple3<>("user2", 1002, 15L));
          DataStream<Tuple3<String, Integer, Long>> textStream = env.fromCollection(list1)
          .assignTimestampsAndWatermarks(
          //指定水位線、時間戳
          new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(Time.seconds(10)) {
          @Override
          public long extractTimestamp(Tuple3<String, Integer, Long> element) {
          return element.f2;
          }
          }
          );

          //定義城市流,格式為:city_id、city_name、ts
          List<Tuple3<Integer, String, Long>> list2 = new ArrayList<>();
          list2.add(new Tuple3<>(1001, "beijing", 1L));
          list2.add(new Tuple3<>(1001, "beijing2", 10L));
          list2.add(new Tuple3<>(1002, "shanghai", 1L));
          list2.add(new Tuple3<>(1002, "shanghai2", 5L));

          DataStream<Tuple3<Integer, String, Long>> cityStream = env.fromCollection(list2)
          .assignTimestampsAndWatermarks(
          //指定水位線、時間戳
          new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, String, Long>>(Time.seconds(10)) {
          @Override
          public long extractTimestamp(Tuple3<Integer, String, Long> element) {
          return element.f2;
          }
          });

          //轉(zhuǎn)變?yōu)門able
          Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ts.rowtime");
          Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ts.rowtime");

          tableEnv.createTemporaryView("userTable", userTable);
          tableEnv.createTemporaryView("cityTable", cityTable);

          //定義一個TemporalTableFunction
          TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ts", "city_id");
          //注冊表函數(shù)
          tableEnv.registerFunction("dimCity", dimCity);

          //關聯(lián)查詢
          Table result = tableEnv
          .sqlQuery("select u.user_name,u.city_id,d.city_name,u.ts from userTable as u " +
          ", Lateral table (dimCity(u.ts)) d " +
          "where u.city_id=d.city_id");

          //打印輸出
          DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
          resultDs.print();
          env.execute("joinDemo");
          }
          }

          結果輸出為:

          user1,1001,beijing,1970-01-01T00:00:00.001
          user1,1001,beijing2,1970-01-01T00:00:00.010
          user2,1002,shanghai,1970-01-01T00:00:00.002
          user2,1002,shanghai2,1970-01-01T00:00:00.015

          通過結果可以看到,根據(jù)主流中的EventTime的時間,去維表流中取響應時間版本的數(shù)據(jù)。

          (3) Kafka Source的EventTime實例

          package join.temporaltablefunctionjoin;

          import lombok.Data;
          import org.apache.flink.streaming.api.TimeCharacteristic;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
          import org.apache.flink.streaming.api.windowing.time.Time;
          import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
          import org.apache.flink.table.api.EnvironmentSettings;
          import org.apache.flink.table.api.Table;
          import org.apache.flink.table.api.java.StreamTableEnvironment;
          import org.apache.flink.table.functions.TemporalTableFunction;
          import org.apache.flink.types.Row;

          import java.io.Serializable;
          import java.util.Properties;

          public class JoinDemo10 {
          public static void main(String[] args) throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          //指定是EventTime
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
          EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
          env.setParallelism(1);

          //Kafka的ip和要消費的topic,//Kafka設置
          String kafkaIPs = "192.168.***.**1:9092,192.168.***.**2:9092,192.168.***.**3:9092";
          Properties props = new Properties();
          props.setProperty("bootstrap.servers", kafkaIPs);
          props.setProperty("group.id", "group.cyb.2");

          //讀取用戶信息Kafka
          FlinkKafkaConsumer<UserInfo> userConsumer = new FlinkKafkaConsumer<UserInfo>("user", new UserInfoSchema(), props);
          userConsumer.setStartFromEarliest();
          userConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserInfo>(Time.seconds(0)) {
          @Override
          public long extractTimestamp(UserInfo userInfo) {
          return userInfo.getTs();
          }
          });

          //讀取城市維度信息Kafka
          FlinkKafkaConsumer<CityInfo> cityConsumer = new FlinkKafkaConsumer<CityInfo>("city", new CityInfoSchema(), props);
          cityConsumer.setStartFromEarliest();
          cityConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CityInfo>(Time.seconds(0)) {
          @Override
          public long extractTimestamp(CityInfo cityInfo) {
          return cityInfo.getTs();
          }
          });

          //主流,用戶流, 格式為:user_name、city_id、ts
          Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer),"userName,cityId,ts.rowtime" );
          //定義城市維度流,格式為:city_id、city_name、ts
          Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer),"cityId,cityName,ts.rowtime");
          tableEnv.createTemporaryView("userTable", userTable);
          tableEnv.createTemporaryView("cityTable", cityTable);

          //定義一個TemporalTableFunction
          TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ts", "cityId");
          //注冊表函數(shù)
          tableEnv.registerFunction("dimCity", dimCity);

          Table u = tableEnv.sqlQuery("select * from userTable");
          u.printSchema();
          tableEnv.toAppendStream(u, Row.class).print("用戶流接收到:");

          Table c = tableEnv.sqlQuery("select * from cityTable");
          c.printSchema();
          tableEnv.toAppendStream(c, Row.class).print("城市流接收到:");

          //關聯(lián)查詢
          Table result = tableEnv
          .sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " +
          "from userTable as u " +
          ", Lateral table (dimCity(u.ts)) d " +
          "where u.cityId=d.cityId");

          //打印輸出
          DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
          resultDs.print("\t\t關聯(lián)輸出:");
          env.execute("joinDemo");
          }
          }
          package join.temporaltablefunctionjoin;
          import java.io.Serializable;

          @Data
          public class UserInfo implements Serializable {
          private String userName;
          private Integer cityId;
          private Long ts;
          }
          package join.temporaltablefunctionjoin;
          import java.io.Serializable;

          @Data
          public class CityInfo implements Serializable {
          private Integer cityId;
          private String cityName;
          private Long ts;
          }
          package join.temporaltablefunctionjoin;

          import com.alibaba.fastjson.JSON;
          import com.alibaba.fastjson.TypeReference;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.common.typeinfo.TypeInformation;
          import org.apache.flink.api.common.serialization.DeserializationSchema;

          import java.io.IOException;
          import java.nio.charset.StandardCharsets;

          public class UserInfoSchema implements DeserializationSchema<UserInfo> {

          @Override
          public UserInfo deserialize(byte[] message) throws IOException {
          String jsonStr = new String(message, StandardCharsets.UTF_8);
          UserInfo data = JSON.parseObject(jsonStr, new TypeReference<UserInfo>() {});
          return data;
          }

          @Override
          public boolean isEndOfStream(UserInfo nextElement) {
          return false;
          }

          @Override
          public TypeInformation<UserInfo> getProducedType() {
          return TypeInformation.of(new TypeHint<UserInfo>() {
          });
          }
          }
          package join.temporaltablefunctionjoin;

          import com.alibaba.fastjson.JSON;
          import com.alibaba.fastjson.TypeReference;
          import org.apache.flink.api.common.serialization.DeserializationSchema;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.common.typeinfo.TypeInformation;

          import java.io.IOException;
          import java.nio.charset.StandardCharsets;

          public class CityInfoSchema implements DeserializationSchema<CityInfo> {

          @Override
          public CityInfo deserialize(byte[] message) throws IOException {
          String jsonStr = new String(message, StandardCharsets.UTF_8);
          CityInfo data = JSON.parseObject(jsonStr, new TypeReference<CityInfo>() {});
          return data;
          }

          @Override
          public boolean isEndOfStream(CityInfo nextElement) {
          return false;
          }

          @Override
          public TypeInformation<CityInfo> getProducedType() {
          return TypeInformation.of(new TypeHint<CityInfo>() {
          });
          }
          }

          依次向user和city兩個topic中寫入數(shù)據(jù):

          用戶信息格式:{“userName”:“user1”,“cityId”:1,“ts”:11}
          城市維度格式:{“cityId”:1,“cityName”:“nanjing”,“ts”:15}

          測試得到的輸出如下:

          城市流接收到:> 1,beijing,1970-01-01T00:00
          用戶流接收到:> user1,1,1970-01-01T00:00
          關聯(lián)輸出:> user1,1,beijing,1970-01-01T00:00
          城市流接收到:> 1,shanghai,1970-01-01T00:00:00.005
          用戶流接收到:> user1,1,1970-01-01T00:00:00.001
          關聯(lián)輸出:> user1,1,beijing,1970-01-01T00:00:00.001
          用戶流接收到:> user1,1,1970-01-01T00:00:00.004
          關聯(lián)輸出:> user1,1,beijing,1970-01-01T00:00:00.004
          用戶流接收到:> user1,1,1970-01-01T00:00:00.005
          關聯(lián)輸出:> user1,1,shanghai,1970-01-01T00:00:00.005
          用戶流接收到:> user1,1,1970-01-01T00:00:00.007
          用戶流接收到:> user1,1,1970-01-01T00:00:00.009
          城市流接收到:> 1,shanghai,1970-01-01T00:00:00.007
          關聯(lián)輸出:> user1,1,shanghai,1970-01-01T00:00:00.007
          城市流接收到:> 1,wuhan,1970-01-01T00:00:00.010
          關聯(lián)輸出:> user1,1,shanghai,1970-01-01T00:00:00.009
          用戶流接收到:> user1,1,1970-01-01T00:00:00.011
          城市流接收到:> 1,nanjing,1970-01-01T00:00:00.015
          關聯(lián)輸出:> user1,1,wuhan,1970-01-01T00:00:00.011

          5、四種維表關聯(lián)方式總結


          八千里路云和月 | 從零到大數(shù)據(jù)專家學習路徑指南

          我們在學習Flink的時候,到底在學習什么?

          193篇文章暴揍Flink,這個合集你需要關注一下

          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS

          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點

          我們在學習Spark的時候,到底在學習什么?

          在所有Spark模塊中,我愿稱SparkSQL為最強!

          硬剛Hive | 4萬字基礎調(diào)優(yōu)面試小總結

          數(shù)據(jù)治理方法論和實踐小百科全書

          標簽體系下的用戶畫像建設小指南

          4萬字長文 | ClickHouse基礎&實踐&調(diào)優(yōu)全視角解析

          【面試&個人成長】2021年過半,社招和校招的經(jīng)驗之談

          大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結

          我寫過的關于成長/面試/職場進階的文章

          當我們在學習Hive的時候在學習什么?「硬剛Hive續(xù)集」


          你好,我是王知無,一個大數(shù)據(jù)領域的硬核原創(chuàng)作者。

          做過后端架構、數(shù)據(jù)中間件、數(shù)據(jù)平臺&架構、算法工程化。

          專注大數(shù)據(jù)領域?qū)崟r動態(tài)&技術提升&個人成長&職場進階,歡迎關注。

          瀏覽 119
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产精品久久草 | 爱福利一区二区三区 | 亚洲无码男人的天堂 | 天天曰天天操 | 少妇后入在线观看 |