<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hudi 原理 | Apache Hudi 中自定義序列化和數(shù)據(jù)寫入邏輯

          共 8503字,需瀏覽 18分鐘

           ·

          2021-10-20 10:56



          1. 介紹

          在Apache Hudi中,Hudi的一條數(shù)據(jù)使用HoodieRecord這個(gè)類表示,其中包含了hoodie的主鍵,record的分區(qū)文件位置,還有今天本文的關(guān)鍵,payload。payload是一個(gè)條數(shù)據(jù)的內(nèi)容的抽象,決定了同一個(gè)主鍵的數(shù)據(jù)的增刪改查邏輯也決定了其序列化的方式。通過對(duì)payload的自定義,可以實(shí)現(xiàn)數(shù)據(jù)的靈活合并,數(shù)據(jù)的自定義編碼序列化等,豐富Hudi現(xiàn)有的語義,提升性能。

          2. 場(chǎng)景

          包括但不限于如下場(chǎng)景中,我們可以通過自定義payload來實(shí)現(xiàn)靈活的需求。

          ?實(shí)現(xiàn)同一個(gè)主鍵的數(shù)據(jù)非row level replace語義的合并,如mvcc語義等?實(shí)現(xiàn)同一個(gè)主鍵下多時(shí)間戳數(shù)據(jù)靈活排序的語義?實(shí)現(xiàn)輸出redo/undo log的效果?實(shí)現(xiàn)自定義序列化邏輯

          3. 作用方式

          首先我們回顧一下一條HoodieRecord在Spark環(huán)境中使用RDD API upsert寫入MOR表的生命周期。

          注意:在這個(gè)過程中,shuffle/寫入文件/磁盤spill的時(shí)候,都需要保證數(shù)據(jù)是已經(jīng)被序列化過的格式。

          4. 實(shí)現(xiàn)方式

          在Hudi中,默認(rèn)的payload實(shí)現(xiàn)是DefaultHoodieRecordPayload,它是OverwriteWithLatestAvroPayload子類。而OverwriteWithLatestAvroPayload這個(gè)類繼承了BaseAvroPayload并implements?HoodieRecordPayload這個(gè)接口。

          其中BaseAvroPayload決定了數(shù)據(jù)的序列化方式,而HoodieRecordPayload決定了數(shù)據(jù)的合并方式。后者是必須使用的,但是前者不是。下面來分別分析他們的實(shí)現(xiàn)。

          BaseAvroPayload

          /** * Base class for all AVRO record based payloads, that can be ordered based on a field. */public abstract class BaseAvroPayload implements Serializable {  /**   * Avro data extracted from the source converted to bytes.   */  public final byte[] recordBytes;  /**   * For purposes of preCombining.   */  public final Comparable orderingVal;  /**   * Instantiate {@link BaseAvroPayload}.   *   * @param record      Generic record for the payload.   * @param orderingVal {@link Comparable} to be used in pre combine.   */  public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {    this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) : new byte[0];    this.orderingVal = orderingVal;    if (orderingVal == null) {      throw new HoodieException("Ordering value is null for record: " + record);    }  }}

          首先BaseAvroPayload?implements了Serializable接口,標(biāo)志著這個(gè)類和它的子類都是為了序列化而設(shè)計(jì)的,大家在繼承的時(shí)候需要注意子類相關(guān)attribute的可序列化問題。

          構(gòu)造器傳入了GenericRecord和一個(gè)Comparable的變量。由于Hudi使用avro作為內(nèi)部的行存序列化格式,所以輸入的數(shù)據(jù)需要以GenericRecord的形式傳遞給payload。BaseAvroPayload會(huì)將數(shù)據(jù)直接序列化成binary待IO使用。這里的假設(shè)是我們只需要做row level操作,直接操作整行的二進(jìn)制數(shù)據(jù)毫無疑問是非常高效的,這里的orderingVal是因?yàn)榛谛屑?jí)別的record比較在RDBMS的CDC中是非常常見的,所以增加了這個(gè)字段。這樣處理之后,只需保證comparable的變量也是可序列化的,這個(gè)類的所有attribute都已經(jīng)是可序列化的格式了,使用任意序列化框架直接傳輸即可。

          HoodieRecordPayload

          /** * Every Hoodie table has an implementation of the HoodieRecordPayloadcode> This abstracts out callbacks which depend on record specific logic. */@PublicAPIClass(maturity = ApiMaturityLevel.STABLE)public interface HoodieRecordPayload<T extends HoodieRecordPayload> extends Serializable {  /**   * This method is deprecated. Please use this {@link #preCombine(HoodieRecordPayload, Properties)} method.   */  @Deprecated  @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)  T preCombine(T oldValue);  /**   * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to insert/upsert by taking in a property map.   * Implementation can leverage the property to decide their business logic to do preCombine.   *   * @param oldValue   instance of the old {@link HoodieRecordPayload} to be combined with.   * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage.   *   * @return the combined value   */  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)  default T preCombine(T oldValue, Properties properties) {    return preCombine(oldValue);  }  /**   * This methods is deprecated. Please refer to {@link #combineAndGetUpdateValue(IndexedRecord, Schema, Properties)} for java docs.   */  @Deprecated  @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)  Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException;  /**   * This methods lets you write custom merging/combining logic to produce new values as a function of current value on storage and whats contained   * in this object. Implementations can leverage properties if required.   * 

          * eg: * 1) You are updating counters, you may want to add counts to currentValue and write back updated counts * 2) You may be reading DB redo logs, and merge them with current image for a database row on storage * p> * * @param currentValue Current value in storage, to merge/combine this payload with * @param schema Schema used for record * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. * @return new combined/merged value to be written back to storage. EMPTY to skip writing this record. */ default Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { return combineAndGetUpdateValue(currentValue, schema); } /** * This method is deprecated. Refer to {@link #getInsertValue(Schema, Properties)} for java docs. * @param schema Schema used for record * @return the {@link IndexedRecord} to be inserted. */ @Deprecated @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) Option<IndexedRecord> getInsertValue(Schema schema) throws IOException; /** * Generates an avro record out of the given HoodieRecordPayload, to be written out to storage. Called when writing a new value for the given * HoodieKey, wherein there is no existing record in storage to be combined against. (i.e insert) Return EMPTY to skip writing this record. * Implementations can leverage properties if required. * @param schema Schema used for record * @param properties Payload related properties. For example pass the ordering field(s) name to extract from value in storage. * @return the {@link IndexedRecord} to be inserted. */ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) default Option<IndexedRecord> getInsertValue(Schema schema, Properties properties) throws IOException { return getInsertValue(schema); } /** * This method can be used to extract some metadata from HoodieRecordPayload. The metadata is passed to {@code WriteStatus.markSuccess()} and * {@code WriteStatus.markFailure()} in order to compute some aggregate metrics using the metadata in the context of a write success or failure. * @return the metadata in the form of Map<String, String> if any. */ @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) default Option<Map<String, String>> getMetadata() { return Option.empty(); }}

          這個(gè)類的注釋寫得非常清楚,其中每個(gè)方法都有定義兩個(gè)個(gè)不同接口,截止本文發(fā)出時(shí)候(0.9.0版本),部分內(nèi)部邏輯還在使用deprecated的舊版本,所以在使用時(shí)需要注意,邏輯最好放在舊接口里。

          簡(jiǎn)單來說,preCombine?這個(gè)方法定義了兩個(gè)payload合并的邏輯,在兩個(gè)場(chǎng)景下會(huì)被調(diào)用:

          1.當(dāng)deduplicated 開啟時(shí),寫入的數(shù)據(jù)兩兩合并時(shí)用到2.在MOR表發(fā)生compaction時(shí),兩條從log中讀取的payload合并時(shí)用到3.MOR表使用RT視圖讀取時(shí)

          combineAndGetUpdateValue?則定義了寫入數(shù)據(jù)和baseFile中的數(shù)據(jù)(這里已經(jīng)被轉(zhuǎn)化成avro的行存格式)的合并方式。通常情況下,這合并邏輯應(yīng)該和preCombine保持語義上的一致。

          最后getInsertValue則定義了如何將數(shù)據(jù)從payload形式轉(zhuǎn)化成GenericRecord。在Hoodie相關(guān)的WriteHandle中被大量使用。通常是被用在寫入Log/BaseFile時(shí)調(diào)用的。

          幾點(diǎn)額外注意的是:

          1.combineAndGetUpdateValue和getInsertValue返回的都是Option,在這里,如果返回Option.empty(),就是指數(shù)據(jù)刪除的意思。EmptyHoodieRecordPayload?正是這一邏輯的payload表達(dá),如果preCombine的返回結(jié)果是刪除,則可以返回這個(gè)類的實(shí)例。而hoodie中,在insert和upsert中通過添加_hoodie_is_deleted字段來實(shí)現(xiàn)刪除的原理,本質(zhì)上也是在payload中判斷到這個(gè)字段,就返回空來實(shí)現(xiàn)的。2.不論是否繼承BaseAvroPayload這個(gè)類/是否需要Comparable類型的orderingVal, 最好保留(GenericRecord, Comparable)這個(gè)構(gòu)造器,因?yàn)镠udi中存在反射調(diào)用創(chuàng)建對(duì)象,默認(rèn)尋找的構(gòu)造器就是這個(gè)。

          5. 使用場(chǎng)景

          5. 1 Column Level的數(shù)據(jù)合并

          有時(shí)候我們希望能夠?qū)崿F(xiàn)兩個(gè)數(shù)據(jù)合并時(shí),能夠按照每個(gè)列的實(shí)現(xiàn)不同的合并邏輯。這時(shí)候就可以在preCombinecombineAndGetUpdateValue方法中借助schema遍歷所有列,然后做不同的處理。如果需要在preCombine中使用Schema,可以在構(gòu)造器初始化的時(shí)候保留GenericRecord中schema的引用。如果發(fā)生序列化后的傳輸,同時(shí)又沒有使用schema可以序列化的版本(avro 1.8.2中 schema是不可序列化的對(duì)象),那么可以從方法中傳遞的properties中傳遞的信息構(gòu)建schema。

          public HoodieRecordPayload preCombine(HoodieRecordPayload oldValue, Properties properties) {    if (schema == null) {        this.schema = new Schema.Parser().parse(properties.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()));    }    initialSchema(properties);    GenericRecord thisRecord = getInsertValue(schema).get();    GenericRecord otherRecord = oldValue.getInsertValue(schema).get();    List<Schema.Field> fields = schema.getFields();    for (Schema.Field field : fields) {        // logic for each column    }    return new HoodieRecordPayload(thisRecord, orderingVal);}

          5.2 實(shí)現(xiàn)自定義的序列化方式

          在默認(rèn)的BaseAvroPayload中,一次upsert,一條數(shù)據(jù)通常最少要序列化/反序列化三次,第一次是創(chuàng)建payload的時(shí)候,第二次是在寫入時(shí)反序列化,第三次是寫入文件時(shí)序列化。如果數(shù)據(jù)非常復(fù)雜,序列化其實(shí)是非常耗時(shí)的。我們可以通過靈活定義payload來決定序列化的方式,減少觸發(fā)正反序列化的次數(shù)。這個(gè)技巧在Compaction的時(shí)候也可以獲得收益。如考慮如下場(chǎng)景:

          對(duì)于一條kakfa的數(shù)據(jù),我們可以把key和partition相關(guān)的內(nèi)容存在kafka的key/timestamp中。然后使用binary的方式獲取kafka的value。通過kafka的key來構(gòu)建HoodieRecordKey,然后將value直接以二進(jìn)制方式存在payload中的map/list中,這樣不會(huì)觸發(fā)任何關(guān)于數(shù)據(jù)的序列化,額外的開銷很低。而后將合并的邏輯放在getInsertValue方法中,在從payload轉(zhuǎn)換成GenericRecord時(shí),才將binary進(jìn)行同一個(gè)key的數(shù)據(jù)合并和數(shù)據(jù),這樣只需要一次avro的序列化操作就可以完成寫入過程。

          需要注意的是,這樣的設(shè)計(jì)方式毫無疑問增加了復(fù)雜度,使業(yè)務(wù)邏輯抽象方式變難,同時(shí)因?yàn)閍vro的序列化壓縮比例通常比較高,如果直接傳輸業(yè)務(wù)數(shù)據(jù),可能會(huì)有更大的IO和內(nèi)存占用,需要根據(jù)場(chǎng)景評(píng)估收益。

          6. 總結(jié)

          本篇文章中我們介紹了Apache Hudi的關(guān)鍵數(shù)據(jù)抽象payload邏輯,同時(shí)介紹了幾種關(guān)鍵payload的實(shí)現(xiàn),最后給出基于payload的幾種典型應(yīng)用場(chǎng)景。

          瀏覽 145
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  亚洲专区在线播放 | 伊人色色视频 | 狠狠婷婷| 亚洲18禁网站 | 色多多簧片 |