<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Flink 維表 Join 實(shí)踐|附四種方式的源碼

          共 59581字,需瀏覽 120分鐘

           ·

          2021-05-02 00:02

          常見的維表Join方式有四種:

          • 預(yù)加載維表
          • 熱存儲(chǔ)維表
          • 廣播維表
          • Temporal table function join

          下面分別使用這四種方式來(lái)實(shí)現(xiàn)一個(gè)join的需求,這個(gè)需求是:一個(gè)主流中數(shù)據(jù)是用戶信息,字段包括用戶姓名、城市id;維表是城市數(shù)據(jù),字段包括城市ID、城市名稱。要求用戶表與城市表關(guān)聯(lián),輸出為:用戶名稱、城市ID、城市名稱。

          用戶表表結(jié)構(gòu)如下:

          城市維表表結(jié)構(gòu)如下:

          1、 預(yù)加載維表

          通過(guò)定義一個(gè)類實(shí)現(xiàn)RichMapFunction,在open()中讀取維表數(shù)據(jù)加載到內(nèi)存中,在probe流map()方法中與維表數(shù)據(jù)進(jìn)行關(guān)聯(lián)。

          RichMapFunction中open方法里加載維表數(shù)據(jù)到內(nèi)存的方式特點(diǎn)如下:

          • 優(yōu)點(diǎn):實(shí)現(xiàn)簡(jiǎn)單
          • 缺點(diǎn):因?yàn)閿?shù)據(jù)存于內(nèi)存,所以只適合小數(shù)據(jù)量并且維表數(shù)據(jù)更新頻率不高的情況下。雖然可以在open中定義一個(gè)定時(shí)器定時(shí)更新維表,但是還是存在維表更新不及時(shí)的情況。

          下面是一個(gè)例子:

          package join;

          import org.apache.flink.api.common.functions.RichMapFunction;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.configuration.Configuration;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import java.util.HashMap;
          import java.util.Map;

          /**
           * Create By 鳴宇淳 on 2020/6/1
           * 這個(gè)例子是從socket中讀取的流,數(shù)據(jù)為用戶名稱和城市id,維表是城市id、城市名稱,
           * 主流和維表關(guān)聯(lián),得到用戶名稱、城市id、城市名稱
           * 這個(gè)例子采用在RichMapfunction類的open方法中將維表數(shù)據(jù)加載到內(nèi)存
           **/

          public class JoinDemo1 {
              public static void main(String[] args) throws Exception {
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost"9000"\n")
                          .map(p -> {
                              //輸入格式為:user,1000,分別是用戶名稱和城市編號(hào)
                              String[] list = p.split(",");
                              return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                          })
                          .returns(new TypeHint<Tuple2<String, Integer>>() {
                          });
                  DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());
                  result.print();
                  env.execute("joinDemo1");
              }

              static class MapJoinDemo1 extends RichMapFunction<Tuple2<StringInteger>, Tuple3<StringIntegerString>> {
                  //定義一個(gè)變量,用于保存維表數(shù)據(jù)在內(nèi)存
                  Map<Integer, String> dim;

                  @Override
                  public void open(Configuration parameters) throws Exception {
                      //在open方法中讀取維表數(shù)據(jù),可以從數(shù)據(jù)中讀取、文件中讀取、接口中讀取等等。
                      dim = new HashMap<>();
                      dim.put(1001"beijing");
                      dim.put(1002"shanghai");
                      dim.put(1003"wuhan");
                      dim.put(1004"changsha");
                  }

                  @Override
                  public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {
                      //在map方法中進(jìn)行主流和維表的關(guān)聯(lián)
                      String cityName = "";
                      if (dim.containsKey(value.f1)) {
                          cityName = dim.get(value.f1);
                      }
                      return new Tuple3<>(value.f0, value.f1, cityName);
                  }
              }
          }

          2、 熱存儲(chǔ)維表

          這種方式是將維表數(shù)據(jù)存儲(chǔ)在Redis、HBase、MySQL等外部存儲(chǔ)中,實(shí)時(shí)流在關(guān)聯(lián)維表數(shù)據(jù)的時(shí)候?qū)崟r(shí)去外部存儲(chǔ)中查詢,這種方式特點(diǎn)如下:

          • 優(yōu)點(diǎn):維度數(shù)據(jù)量不受內(nèi)存限制,可以存儲(chǔ)很大的數(shù)據(jù)量。
          • 缺點(diǎn):因?yàn)榫S表數(shù)據(jù)在外部存儲(chǔ)中,讀取速度受制于外部存儲(chǔ)的讀取速度;另外維表的同步也有延遲。

          (1) 使用cache來(lái)減輕訪問(wèn)壓力

          可以使用緩存來(lái)存儲(chǔ)一部分常訪問(wèn)的維表數(shù)據(jù),以減少訪問(wèn)外部系統(tǒng)的次數(shù),比如使用guava Cache。

          下面是一個(gè)例子:

          package join;

          import com.google.common.cache.*;
          import org.apache.flink.api.common.functions.RichMapFunction;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.configuration.Configuration;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

          import java.util.HashMap;
          import java.util.Map;
          import java.util.concurrent.TimeUnit;

          /**
           * Create By 鳴宇淳 on 2020/6/1
           **/

          public class JoinDemo2 {
              public static void main(String[] args) throws Exception {

                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost"9000"\n")
                          .map(p -> {
                              //輸入格式為:user,1000,分別是用戶名稱和城市編號(hào)
                              String[] list = p.split(",");
                              return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                          })
                          .returns(new TypeHint<Tuple2<String, Integer>>() {
                          });

                  DataStream<Tuple3<String, Integer, String>> result = textStream.map(new MapJoinDemo1());
                  result.print();
                  env.execute("joinDemo1");
              }

              static class MapJoinDemo1 extends RichMapFunction<Tuple2<StringInteger>, Tuple3<StringIntegerString>> {
                  LoadingCache<Integer, String> dim;

                  @Override
                  public void open(Configuration parameters) throws Exception {
                      //使用google LoadingCache來(lái)進(jìn)行緩存
                      dim = CacheBuilder.newBuilder()
                              //最多緩存?zhèn)€數(shù),超過(guò)了就根據(jù)最近最少使用算法來(lái)移除緩存
                              .maximumSize(1000)
                              //在更新后的指定時(shí)間后就回收
                              .expireAfterWrite(10, TimeUnit.MINUTES)
                              //指定移除通知
                              .removalListener(new RemovalListener<Integer, String>() {
                                  @Override
                                  public void onRemoval(RemovalNotification<Integer, String> removalNotification) {
                                      System.out.println(removalNotification.getKey() + "被移除了,值為:" + removalNotification.getValue());
                                  }
                              })
                              .build(
                                      //指定加載緩存的邏輯
                                      new CacheLoader<Integer, String>() {
                                          @Override
                                          public String load(Integer cityId) throws Exception {
                                              String cityName = readFromHbase(cityId);
                                              return cityName;
                                          }
                                      }
                              );

                  }

                  private String readFromHbase(Integer cityId) {
                      //讀取hbase
                      //這里寫死,模擬從hbase讀取數(shù)據(jù)
                      Map<Integer, String> temp = new HashMap<>();
                      temp.put(1001"beijing");
                      temp.put(1002"shanghai");
                      temp.put(1003"wuhan");
                      temp.put(1004"changsha");
                      String cityName = "";
                      if (temp.containsKey(cityId)) {
                          cityName = temp.get(cityId);
                      }

                      return cityName;
                  }

                  @Override
                  public Tuple3<String, Integer, String> map(Tuple2<String, Integer> value) throws Exception {
                      //在map方法中進(jìn)行主流和維表的關(guān)聯(lián)
                      String cityName = "";
                      if (dim.get(value.f1) != null) {
                          cityName = dim.get(value.f1);
                      }
                      return new Tuple3<>(value.f0, value.f1, cityName);
                  }
              }
          }

          (2) 使用異步IO來(lái)提高訪問(wèn)吞吐量

          Flink與外部存儲(chǔ)系統(tǒng)進(jìn)行讀寫操作的時(shí)候可以使用同步方式,也就是發(fā)送一個(gè)請(qǐng)求后等待外部系統(tǒng)響應(yīng),然后再發(fā)送第二個(gè)讀寫請(qǐng)求,這樣的方式吞吐量比較低,可以用提高并行度的方式來(lái)提高吞吐量,但是并行度多了也就導(dǎo)致了進(jìn)程數(shù)量多了,占用了大量的資源。

          Flink中可以使用異步IO來(lái)讀寫外部系統(tǒng),這要求外部系統(tǒng)客戶端支持異步IO,不過(guò)目前很多系統(tǒng)都支持異步IO客戶端。但是如果使用異步就要涉及到三個(gè)問(wèn)題:

          • 超時(shí):如果查詢超時(shí)那么就認(rèn)為是讀寫失敗,需要按失敗處理;
          • 并發(fā)數(shù)量:如果并發(fā)數(shù)量太多,就要觸發(fā)Flink的反壓機(jī)制來(lái)抑制上游的寫入。
          • 返回順序錯(cuò)亂:順序錯(cuò)亂了要根據(jù)實(shí)際情況來(lái)處理,F(xiàn)link支持兩種方式:允許亂序、保證順序。

          下面是一個(gè)實(shí)例,演示了試用異步IO來(lái)訪問(wèn)維表:

          package join;

          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.configuration.Configuration;
          import org.apache.flink.streaming.api.datastream.AsyncDataStream;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.functions.async.ResultFuture;
          import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
          import java.sql.DriverManager;
          import java.sql.PreparedStatement;
          import java.sql.ResultSet;
          import java.util.ArrayList;
          import java.util.List;
          import java.util.concurrent.TimeUnit;

          /**
           * Create By 鳴宇淳 on 2020/6/1
           **/

          public class JoinDemo3 {
              public static void main(String[] args) throws Exception {

                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost"9000"\n")
                          .map(p -> {
                              //輸入格式為:user,1000,分別是用戶名稱和城市編號(hào)
                              String[] list = p.split(",");
                              return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                          })
                          .returns(new TypeHint<Tuple2<String, Integer>>() {
                          });


                  DataStream<Tuple3<String,Integer, String>> orderedResult = AsyncDataStream
                          //保證順序:異步返回的結(jié)果保證順序,超時(shí)時(shí)間1秒,最大容量2,超出容量觸發(fā)反壓
                          .orderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
                          .setParallelism(1);

                  DataStream<Tuple3<String,Integer, String>> unorderedResult = AsyncDataStream
                          //允許亂序:異步返回的結(jié)果允許亂序,超時(shí)時(shí)間1秒,最大容量2,超出容量觸發(fā)反壓
                          .unorderedWait(textStream, new JoinDemo3AyncFunction(), 1000L, TimeUnit.MILLISECONDS, 2)
                          .setParallelism(1);

                  orderedResult.print();
                  unorderedResult.print();
                  env.execute("joinDemo");
              }

              //定義個(gè)類,繼承RichAsyncFunction,實(shí)現(xiàn)異步查詢存儲(chǔ)在mysql里的維表
              //輸入用戶名、城市ID,返回 Tuple3<用戶名、城市ID,城市名稱>
              static class JoinDemo3AyncFunction extends RichAsyncFunction<Tuple2<StringInteger>, Tuple3<StringIntegerString>> {
                  // 鏈接
                  private static String jdbcUrl = "jdbc:mysql://192.168.145.1:3306?useSSL=false";
                  private static String username = "root";
                  private static String password = "123";
                  private static String driverName = "com.mysql.jdbc.Driver";
                  java.sql.Connection conn;
                  PreparedStatement ps;

                  @Override
                  public void open(Configuration parameters) throws Exception {
                      super.open(parameters);

                      Class.forName(driverName);
                      conn = DriverManager.getConnection(jdbcUrl, username, password);
                      ps = conn.prepareStatement("select city_name from tmp.city_info where id = ?");
                  }

                  @Override
                  public void close() throws Exception {
                      super.close();
                      conn.close();
                  }

                  //異步查詢方法
                  @Override
                  public void asyncInvoke(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {
                      // 使用 city id 查詢
                      ps.setInt(1, input.f1);
                      ResultSet rs = ps.executeQuery();
                      String cityName = null;
                      if (rs.next()) {
                          cityName = rs.getString(1);
                      }
                      List list = new ArrayList<Tuple2<Integer, String>>();
                      list.add(new Tuple3<>(input.f0,input.f1, cityName));
                      resultFuture.complete(list);
                  }

                  //超時(shí)處理
                  @Override
                  public void timeout(Tuple2<String, Integer> input, ResultFuture<Tuple3<String,Integer, String>> resultFuture) throws Exception {
                      List list = new ArrayList<Tuple2<Integer, String>>();
                      list.add(new Tuple3<>(input.f0,input.f1, ""));
                      resultFuture.complete(list);
                  }
              }
          }

          3、 廣播維表

          利用Flink的Broadcast State將維度數(shù)據(jù)流廣播到下游做join操作。特點(diǎn)如下:

          • 優(yōu)點(diǎn):維度數(shù)據(jù)變更后可以即時(shí)更新到結(jié)果中。
          • 缺點(diǎn):數(shù)據(jù)保存在內(nèi)存中,支持的維度數(shù)據(jù)量比較小。

          下面是一個(gè)實(shí)例:

          package join;

          import org.apache.flink.api.common.functions.RichMapFunction;
          import org.apache.flink.api.common.state.BroadcastState;
          import org.apache.flink.api.common.state.MapStateDescriptor;
          import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.configuration.Configuration;
          import org.apache.flink.streaming.api.datastream.BroadcastStream;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
          import org.apache.flink.util.Collector;

          import java.util.ArrayList;
          import java.util.HashMap;
          import java.util.List;
          import java.util.Map;

          /**
           * Create By 鳴宇淳 on 2020/6/1
           * 這個(gè)例子是從socket中讀取的流,數(shù)據(jù)為用戶名稱和城市id,維表是城市id、城市名稱,
           * 主流和維表關(guān)聯(lián),得到用戶名稱、城市id、城市名稱
           * 這個(gè)例子采用 Flink 廣播流的方式來(lái)做為維度
           **/

          public class JoinDemo4 {

              public static void main(String[] args) throws Exception {
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  //定義主流
                  DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost"9000"\n")
                          .map(p -> {
                              //輸入格式為:user,1000,分別是用戶名稱和城市編號(hào)
                              String[] list = p.split(",");
                              return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                          })
                          .returns(new TypeHint<Tuple2<String, Integer>>() {
                          });
                  
                  //定義城市流
                  DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost"9001"\n")
                          .map(p -> {
                              //輸入格式為:城市ID,城市名稱
                              String[] list = p.split(",");
                              return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);
                          })
                          .returns(new TypeHint<Tuple2<Integer, String>>() {
                          });

                  //將城市流定義為廣播流
                  final MapStateDescriptor<Integer, String> broadcastDesc = new MapStateDescriptor("broad1", Integer.classString.class);
                  BroadcastStream<Tuple2<Integer, String>> broadcastStream = cityStream.broadcast(broadcastDesc);

                  DataStream result = textStream.connect(broadcastStream)
                          .process(new BroadcastProcessFunction<Tuple2<String, Integer>, Tuple2<Integer, String>, Tuple3<String, Integer, String>>() {
                              //處理非廣播流,關(guān)聯(lián)維度
                              @Override
                              public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                                  ReadOnlyBroadcastState<Integer, String> state = ctx.getBroadcastState(broadcastDesc);
                                  String cityName = "";
                                  if (state.contains(value.f1)) {
                                      cityName = state.get(value.f1);
                                  }
                                  out.collect(new Tuple3<>(value.f0, value.f1, cityName));
                              }

                              @Override
                              public void processBroadcastElement(Tuple2<Integer, String> value, Context ctx, Collector<Tuple3<String, Integer, String>> out) throws Exception {
                                  System.out.println("收到廣播數(shù)據(jù):" + value);
                                  ctx.getBroadcastState(broadcastDesc).put(value.f0, value.f1);
                              }
                          });


                  result.print();
                  env.execute("joinDemo");
              }
          }

          4、 Temporal table function join

          Temporal table是持續(xù)變化表上某一時(shí)刻的視圖,Temporal table function是一個(gè)表函數(shù),傳遞一個(gè)時(shí)間參數(shù),返回Temporal table這一指定時(shí)刻的視圖。

          可以將維度數(shù)據(jù)流映射為Temporal table,主流與這個(gè)Temporal table進(jìn)行關(guān)聯(lián),可以關(guān)聯(lián)到某一個(gè)版本(歷史上某一個(gè)時(shí)刻)的維度數(shù)據(jù)。

          Temporal table function join的特點(diǎn)如下:

          • 優(yōu)點(diǎn):維度數(shù)據(jù)量可以很大,維度數(shù)據(jù)更新及時(shí),不依賴外部存儲(chǔ),可以關(guān)聯(lián)不同版本的維度數(shù)據(jù)。
          • 缺點(diǎn):只支持在Flink SQL API中使用。

          (1) ProcessingTime的一個(gè)實(shí)例

          package join;

          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.java.tuple.Tuple2;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.table.api.EnvironmentSettings;
          import org.apache.flink.table.api.Table;
          import org.apache.flink.table.api.java.StreamTableEnvironment;
          import org.apache.flink.table.functions.TemporalTableFunction;
          import org.apache.flink.types.Row;


          /**
           * Create By 鳴宇淳 on 2020/6/1
           **/

          public class JoinDemo5 {
              public static void main(String[] args) throws Exception {
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
                  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

                  //定義主流
                  DataStream<Tuple2<String, Integer>> textStream = env.socketTextStream("localhost"9000"\n")
                          .map(p -> {
                              //輸入格式為:user,1000,分別是用戶名稱和城市編號(hào)
                              String[] list = p.split(",");
                              return new Tuple2<String, Integer>(list[0], Integer.valueOf(list[1]));
                          })
                          .returns(new TypeHint<Tuple2<String, Integer>>() {
                          });

                  //定義城市流
                  DataStream<Tuple2<Integer, String>> cityStream = env.socketTextStream("localhost"9001"\n")
                          .map(p -> {
                              //輸入格式為:城市ID,城市名稱
                              String[] list = p.split(",");
                              return new Tuple2<Integer, String>(Integer.valueOf(list[0]), list[1]);
                          })
                          .returns(new TypeHint<Tuple2<Integer, String>>() {
                          });

                  //轉(zhuǎn)變?yōu)門able
                  Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ps.proctime");
                  Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ps.proctime");

                  //定義一個(gè)TemporalTableFunction
                  TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ps""city_id");
                  //注冊(cè)表函數(shù)
                  tableEnv.registerFunction("dimCity", dimCity);

                  //關(guān)聯(lián)查詢
                  Table result = tableEnv
                          .sqlQuery("select u.user_name,u.city_id,d.city_name from " + userTable + " as u " +
                                  ", Lateral table (dimCity(u.ps)) d " +
                                  "where u.city_id=d.city_id");
                  
                  //打印輸出
                  DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
                  resultDs.print();
                  env.execute("joinDemo");
              }
          }


          (2) EventTime的一個(gè)實(shí)例

          package join;

          import org.apache.flink.api.java.tuple.Tuple3;
          import org.apache.flink.streaming.api.TimeCharacteristic;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
          import org.apache.flink.streaming.api.windowing.time.Time;
          import org.apache.flink.table.api.EnvironmentSettings;
          import org.apache.flink.table.api.Table;
          import org.apache.flink.table.api.java.StreamTableEnvironment;
          import org.apache.flink.table.functions.TemporalTableFunction;
          import org.apache.flink.types.Row;

          import java.sql.Timestamp;
          import java.util.ArrayList;
          import java.util.List;

          /**
           * Create By 鳴宇淳 on 2020/6/1
           **/

          public class JoinDemo9 {
              public static void main(String[] args) throws Exception {
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  //指定是EventTime
                  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                  EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
                  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
                  env.setParallelism(1);

                  //主流,用戶流, 格式為:user_name、city_id、ts
                  List<Tuple3<String, Integer, Long>> list1 = new ArrayList<>();
                  list1.add(new Tuple3<>("user1"10011L));
                  list1.add(new Tuple3<>("user1"100110L));
                  list1.add(new Tuple3<>("user2"10022L));
                  list1.add(new Tuple3<>("user2"100215L));
                  DataStream<Tuple3<String, Integer, Long>> textStream = env.fromCollection(list1)
                          .assignTimestampsAndWatermarks(
                                  //指定水位線、時(shí)間戳
                                  new BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Integer, Long>>(Time.seconds(10)) {
                                      @Override
                                      public long extractTimestamp(Tuple3<String, Integer, Long> element) {
                                          return element.f2;
                                      }
                                  }
                          );

                  //定義城市流,格式為:city_id、city_name、ts
                  List<Tuple3<Integer, String, Long>> list2 = new ArrayList<>();
                  list2.add(new Tuple3<>(1001"beijing"1L));
                  list2.add(new Tuple3<>(1001"beijing2"10L));
                  list2.add(new Tuple3<>(1002"shanghai"1L));
                  list2.add(new Tuple3<>(1002"shanghai2"5L));

                  DataStream<Tuple3<Integer, String, Long>> cityStream = env.fromCollection(list2)
                          .assignTimestampsAndWatermarks(
                                  //指定水位線、時(shí)間戳
                                  new BoundedOutOfOrdernessTimestampExtractor<Tuple3<Integer, String, Long>>(Time.seconds(10)) {
                                      @Override
                                      public long extractTimestamp(Tuple3<Integer, String, Long> element) {
                                          return element.f2;
                                      }
                                  });

                  //轉(zhuǎn)變?yōu)門able
                  Table userTable = tableEnv.fromDataStream(textStream, "user_name,city_id,ts.rowtime");
                  Table cityTable = tableEnv.fromDataStream(cityStream, "city_id,city_name,ts.rowtime");

                  tableEnv.createTemporaryView("userTable", userTable);
                  tableEnv.createTemporaryView("cityTable", cityTable);

                  //定義一個(gè)TemporalTableFunction
                  TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ts""city_id");
                  //注冊(cè)表函數(shù)
                  tableEnv.registerFunction("dimCity", dimCity);

                  //關(guān)聯(lián)查詢
                  Table result = tableEnv
                          .sqlQuery("select u.user_name,u.city_id,d.city_name,u.ts from userTable as u " +
                                  ", Lateral table (dimCity(u.ts)) d " +
                                  "where u.city_id=d.city_id");

                  //打印輸出
                  DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
                  resultDs.print();
                  env.execute("joinDemo");
              }
          }


          結(jié)果輸出為:

          user1,1001,beijing,1970-01-01T00:00:00.001
          user1,1001,beijing2,1970-01-01T00:00:00.010
          user2,1002,shanghai,1970-01-01T00:00:00.002
          user2,1002,shanghai2,1970-01-01T00:00:00.015

          通過(guò)結(jié)果可以看到,根據(jù)主流中的EventTime的時(shí)間,去維表流中取響應(yīng)時(shí)間版本的數(shù)據(jù)。

          (3) Kafka Source的EventTime實(shí)例

          package join.temporaltablefunctionjoin;

          import lombok.Data;
          import org.apache.flink.streaming.api.TimeCharacteristic;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
          import org.apache.flink.streaming.api.windowing.time.Time;
          import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
          import org.apache.flink.table.api.EnvironmentSettings;
          import org.apache.flink.table.api.Table;
          import org.apache.flink.table.api.java.StreamTableEnvironment;
          import org.apache.flink.table.functions.TemporalTableFunction;
          import org.apache.flink.types.Row;

          import java.io.Serializable;
          import java.util.Properties;

          /**
           * Create By 鳴宇淳 on 2020/6/1
           **/

          public class JoinDemo10 {
              public static void main(String[] args) throws Exception {
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  //指定是EventTime
                  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                  EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
                  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
                  env.setParallelism(1);

                  //Kafka的ip和要消費(fèi)的topic,//Kafka設(shè)置
                  String kafkaIPs = "192.168.***.**1:9092,192.168.***.**2:9092,192.168.***.**3:9092";
                  Properties props = new Properties();
                  props.setProperty("bootstrap.servers", kafkaIPs);
                  props.setProperty("group.id""group.cyb.2");

                  //讀取用戶信息Kafka
                  FlinkKafkaConsumer<UserInfo> userConsumer = new FlinkKafkaConsumer<UserInfo>("user"new UserInfoSchema(), props);
                  userConsumer.setStartFromEarliest();
                  userConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserInfo>(Time.seconds(0)) {
                      @Override
                      public long extractTimestamp(UserInfo userInfo) {
                          return userInfo.getTs();
                      }
                  });

                  //讀取城市維度信息Kafka
                  FlinkKafkaConsumer<CityInfo> cityConsumer = new FlinkKafkaConsumer<CityInfo>("city"new CityInfoSchema(), props);
                  cityConsumer.setStartFromEarliest();
                  cityConsumer.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<CityInfo>(Time.seconds(0)) {
                      @Override
                      public long extractTimestamp(CityInfo cityInfo) {
                          return cityInfo.getTs();
                      }
                  });

                  //主流,用戶流, 格式為:user_name、city_id、ts
                  Table userTable = tableEnv.fromDataStream(env.addSource(userConsumer),"userName,cityId,ts.rowtime" );
                  //定義城市維度流,格式為:city_id、city_name、ts
                  Table cityTable = tableEnv.fromDataStream(env.addSource(cityConsumer),"cityId,cityName,ts.rowtime");
                  tableEnv.createTemporaryView("userTable", userTable);
                  tableEnv.createTemporaryView("cityTable", cityTable);

                  //定義一個(gè)TemporalTableFunction
                  TemporalTableFunction dimCity = cityTable.createTemporalTableFunction("ts""cityId");
                  //注冊(cè)表函數(shù)
                  tableEnv.registerFunction("dimCity", dimCity);

                  Table u = tableEnv.sqlQuery("select * from userTable");
                  u.printSchema();
                  tableEnv.toAppendStream(u, Row.class).print("用戶流接收到:");

                  Table c = tableEnv.sqlQuery("select * from cityTable");
                  c.printSchema();
                  tableEnv.toAppendStream(c, Row.class).print("城市流接收到:");

                  //關(guān)聯(lián)查詢
                  Table result = tableEnv
                          .sqlQuery("select u.userName,u.cityId,d.cityName,u.ts " +
                                  "from userTable as u " +
                                  ", Lateral table  (dimCity(u.ts)) d " +
                                  "where u.cityId=d.cityId");

                  //打印輸出
                  DataStream resultDs = tableEnv.toAppendStream(result, Row.class);
                  resultDs.print("\t\t關(guān)聯(lián)輸出:");
                  env.execute("joinDemo");
              }
          }
          package join.temporaltablefunctionjoin;
          import java.io.Serializable;

          /**
           * Create By 鳴宇淳 on 2020/6/4
           **/

           @Data
          public class UserInfo implements Serializable {
              private String userName;
              private Integer cityId;
              private Long ts;
          }
          package join.temporaltablefunctionjoin;
          import java.io.Serializable;

          /**
           * Create By 鳴宇淳 on 2020/6/4
           **/

          @Data
          public class CityInfo implements Serializable {
              private Integer cityId;
              private String cityName;
              private Long ts;
          }
          package join.temporaltablefunctionjoin;

          import com.alibaba.fastjson.JSON;
          import com.alibaba.fastjson.TypeReference;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.common.typeinfo.TypeInformation;
          import org.apache.flink.api.common.serialization.DeserializationSchema;

          import java.io.IOException;
          import java.nio.charset.StandardCharsets;

          /**
           * Create By 鳴宇淳 on 2020/6/4
           **/

          public class UserInfoSchema implements DeserializationSchema<UserInfo{

              @Override
              public UserInfo deserialize(byte[] message) throws IOException {
                  String jsonStr = new String(message, StandardCharsets.UTF_8);
                  UserInfo data = JSON.parseObject(jsonStr, new TypeReference<UserInfo>() {});
                  return data;
              }

              @Override
              public boolean isEndOfStream(UserInfo nextElement) {
                  return false;
              }

              @Override
              public TypeInformation<UserInfo> getProducedType() {
                  return TypeInformation.of(new TypeHint<UserInfo>() {
                  });
              }
          }

          package join.temporaltablefunctionjoin;

          import com.alibaba.fastjson.JSON;
          import com.alibaba.fastjson.TypeReference;
          import org.apache.flink.api.common.serialization.DeserializationSchema;
          import org.apache.flink.api.common.typeinfo.TypeHint;
          import org.apache.flink.api.common.typeinfo.TypeInformation;

          import java.io.IOException;
          import java.nio.charset.StandardCharsets;

          /**
           * Create By 鳴宇淳 on 2020/6/4
           **/

          public class CityInfoSchema implements DeserializationSchema<CityInfo{


              @Override
              public CityInfo deserialize(byte[] message) throws IOException {
                  String jsonStr = new String(message, StandardCharsets.UTF_8);
                  CityInfo data = JSON.parseObject(jsonStr, new TypeReference<CityInfo>() {});
                  return data;
              }

              @Override
              public boolean isEndOfStream(CityInfo nextElement) {
                  return false;
              }

              @Override
              public TypeInformation<CityInfo> getProducedType() {
                  return TypeInformation.of(new TypeHint<CityInfo>() {
                  });
              }
          }


          依次向user和city兩個(gè)topic中寫入數(shù)據(jù),

          用戶信息格式:

          {“userName”:“user1”,“cityId”:1,“ts”:11}

          城市維度格式:

          {“cityId”:1,“cityName”:“nanjing”,“ts”:15}

          測(cè)試得到的輸出如下:

          城市流接收到:> 1,beijing,1970-01-01T00:00
          用戶流接收到:> user1,1,1970-01-01T00:00
            關(guān)聯(lián)輸出:> user1,1,beijing,1970-01-01T00:00
          城市流接收到:> 1,shanghai,1970-01-01T00:00:00.005
          用戶流接收到:> user1,1,1970-01-01T00:00:00.001
            關(guān)聯(lián)輸出:> user1,1,beijing,1970-01-01T00:00:00.001
          用戶流接收到:> user1,1,1970-01-01T00:00:00.004
            關(guān)聯(lián)輸出:> user1,1,beijing,1970-01-01T00:00:00.004
          用戶流接收到:> user1,1,1970-01-01T00:00:00.005
            關(guān)聯(lián)輸出:> user1,1,shanghai,1970-01-01T00:00:00.005
          用戶流接收到:> user1,1,1970-01-01T00:00:00.007
          用戶流接收到:> user1,1,1970-01-01T00:00:00.009
          城市流接收到:> 1,shanghai,1970-01-01T00:00:00.007
            關(guān)聯(lián)輸出:> user1,1,shanghai,1970-01-01T00:00:00.007
          城市流接收到:> 1,wuhan,1970-01-01T00:00:00.010
            關(guān)聯(lián)輸出:> user1,1,shanghai,1970-01-01T00:00:00.009
          用戶流接收到:> user1,1,1970-01-01T00:00:00.011
          城市流接收到:> 1,nanjing,1970-01-01T00:00:00.015
            關(guān)聯(lián)輸出:> user1,1,wuhan,1970-01-01T00:00:00.011


          5、四種維表關(guān)聯(lián)方式比較

          本文轉(zhuǎn)載自:https://blog.csdn.net/chybin500/article/details/106482620


          瀏覽 46
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  日韩一级片免费看 | 天天操人人摸视频ⅩⅩⅩA√ | 天天久久无码一区二区三区 | 欧美X X X欧美91 | 澳门成人无码视频免费播放 |