<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          貝殼基于Spark的HiveToHBase實踐

          共 10682字,需瀏覽 22分鐘

           ·

          2021-06-15 21:23

          導讀:本文詳細介紹了如何將Hive里的數(shù)據(jù)快速穩(wěn)定的寫進HBase中。由于數(shù)據(jù)量比較大,我們采取的是HBase官方提供的bulkload方式來避免HBase put api寫入壓力大的缺陷。團隊早期采用的是MapReduce進行計算生成HFile,然后使用bulkload進行數(shù)據(jù)導入的工作。

          因為結構性的因素,整體的性能不是很理想,對于部分業(yè)務方來說不能接受。其中最重要的因素就是建HBase表時預分區(qū)的規(guī)劃不合理,導致了后面很多任務運行時間太過漫長,很多都達到了4~5個小時才能成功。

          在重新審視和規(guī)劃時,自然的想到了從計算層面性能表現(xiàn)更佳的Spark。由它來接替MapReduce完成數(shù)據(jù)格式轉換,并生成HFile的核心工作。

          01
          HiveToHBase 全解析

          實際生產工作中因為工作涉及到了兩個數(shù)據(jù)端的交互,為了更好的理解整體的流程以及如何優(yōu)化,知道ETL流程中為什么需要一些看上去并不需要的步驟,我們首先需要簡單的了解HBase的架構。

          1. HBase結構簡單介紹

          Apache HBase是一個開源的非關系型分布式數(shù)據(jù)庫,運行于HDFS之上。它能夠基于HDFS提供實時計算服務主要是架構與底層數(shù)據(jù)結構決定的,即由 LSM-Tree (Log-Structured Merge-Tree) + HTable (Region分區(qū)) + Cache決定的:

          • LSM樹是目前最流行的不可變磁盤存儲結構之一,僅使用緩存和append file方式來實現(xiàn)順序寫操作。其中關鍵的點是:排序字符串表 Sorted-String-Table,這里我們不深入細節(jié),這種底層結構對于bulkload的要求很重要一點就是數(shù)據(jù)需要排序。而以HBase的存儲形式來看,就是KeyValue需要進行排序!

          • HTable的數(shù)據(jù)需要均勻的分散在各個Region中,訪問HBase時先去HBase系統(tǒng)表查找定位這條記錄屬于哪個Region ,然后定位到這個Region屬于哪個RegionServer,然后就到對應服務器里面查找對應Region中的數(shù)據(jù)。

          最后的bulkload過程都是相同的,差別只是在生成HFile的步驟。這也是下文重點描述的部分。

          2. 數(shù)據(jù)流轉通路

          數(shù)據(jù)從Hive到HBase的流程大致如下圖:

          整個流程真正需要我們cover的就是ETL ( Extract Transfer Load ) 部分,HBase底層文件HFile屬于列存文件,每一列都是以KeyValue的數(shù)據(jù)格式進行存儲。

          邏輯上真正需要我們做的工作很簡單:( 為了簡便、省去了timestamp 版本列 )、HBase一條數(shù)據(jù)在邏輯上的概念簡化如下:

          如果看到了這里,恭喜你已經基本明白本文的行文邏輯了。接下來就是代碼原理時間:

          02
          MapReduce工作流程

          Map/Reduce框架運轉在鍵值對上,也就是說框架把作業(yè)的輸入看為是一組鍵值對,同樣也產出一組鍵值對做為作業(yè)的輸出。在我們的場景中是這樣的:

          1. mapper:數(shù)據(jù)格式轉換

          mapper的目的就是將一行數(shù)據(jù),轉為rowkey:column family:qualifer:value的形式。關鍵的ETL代碼就是將map取得的value,轉成< ImmutableBytesWritable,Put>輸出、進而交給reducer進行處理。

          protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)        throws IOException, InterruptedException {    //由字符串切割成每一列的value數(shù)組    String[] values = value.toString().split("\\x01", -1);    String rowKeyStr = generateRowKey();    ImmutableBytesWritable hKey = new ImmutableBytesWritable(Bytes.toBytes(rowKeyStr));
          Put hPut = new Put(Bytes.toBytes(rowKeyStr)); for (int i = 0; i < columns.length; i++) { String columnStr = columns[i]; String cfNameStr = "cf1";        String cellValueStr = values[i].trim();         byte[] columbByte = Bytes.toBytes(columnStr); byte[] cfNameByte = Bytes.toBytes(cfNameStr); byte[] cellValueByte = Bytes.toBytes(cellValueStr); hPut.addColumn(cfNameByte, columbByte, cellValueByte);            }    context.write(hKey, hPut);}

          mapper寫完了,好像已經把數(shù)據(jù)格式轉完了,還需要reducer嗎?參考官方的資料里也沒有找到關于reducer的消息,我轉念一想 事情沒有這么簡單!研讀了提交Job的主流程代碼后發(fā)現(xiàn)除了輸出文件的格式設置與其他mr程序不一樣:

          job.setOutputFormatClass(HFileOutputFormat2.class);

          還有一個其他程序沒有的部分,那就是:

          HFileOutputFormat2.configureIncrementalLoad(job,htable)

          故名思義就是對job進行HFile相關配置。HFileOutputFormat2 工具包提供的,讓我們看看里面到底干了什么吧!

          2. job的配置

          挑選出比較相關核心的配置:

          • 根據(jù)mapper的輸出格式來自動設置reducer,意味著我們這個mr程序可以只寫mapper,不需要寫reducer了。

          • 獲取對應HBase表各個region的startKey,根據(jù)region的數(shù)量來設置reduce的數(shù)量,同時配置partitioner讓上一步mapper產生的數(shù)據(jù),分散到對應的partition ( reduce ) 中。

          reducer的自動設置

          // Based on the configured map output class, set the correct reducer to properly// sort the incoming values.// TODO it would be nice to pick one or the other of these formats.if (KeyValue.class.equals(job.getMapOutputValueClass())) {  job.setReducerClass(KeyValueSortReducer.class);} else if (Put.class.equals(job.getMapOutputValueClass())) {  job.setReducerClass(PutSortReducer.class);} else if (Text.class.equals(job.getMapOutputValueClass())) {  job.setReducerClass(TextSortReducer.class);} else {  LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());}

          實際上上面三種reducer底層都是會將數(shù)據(jù)轉為KeyValue形式,然后進行排序。需要注意的是KeyValue 的排序是全排序,并不是以單個rowkey進行排序就行的。所謂全排序,就是將整個對象進行比較!

          查看KeyValueSortRducer后會發(fā)現(xiàn)底層是一個叫做KeyValue.COMPARATOR的比較器,它是由Bytes.compareTo(byte[] buffer1, int offset1, int length1, byte[] buffer2, int offset2, int length2)將兩個KeyValue對象的每一個字節(jié)從頭開始比較,這是上面說到的全排序形式。

          我們輸出的文件格式是HFileOutputFormat2,它在我們寫入數(shù)據(jù)的時候也會進行校驗check每次寫入的數(shù)據(jù)是否是按照KeyValue.COMPARATOR 定義的順序,要是沒有排序就會報錯退出!Added a key not lexically larger than previous。

          reduce數(shù)量以及partitioner設置

          為什么要根據(jù)HBase的region的情況來設置我們reduce的分區(qū)器以及數(shù)量呢?在上面的小節(jié)中有提到,region是HBase查詢的一個關鍵點。每個htable的region會有自己的【startKey、endKey】,分布在不同的region server中。

          這個key的范圍是與rowkey匹配的,以上面這張表為例,數(shù)據(jù)進入region時的邏輯場景如下:

          也正是因為這種管理結構,讓HBase的表的rowkey設計與region預分區(qū) ( 其實就是region數(shù)量與其 [starkey,endkey]的設置 ) 在日常的生產過程當中相當?shù)闹匾?。在大批量?shù)據(jù)導入的場景當然也是需要考慮的!

          HBase的文件在hdfs的路徑是:

          /HBase/data/<namespace>/<tbl_name>/<region_id>/<cf>/<hfile_id>

          通過并行處理Region來加快查詢的響應速度,所以會要求每個Region的數(shù)據(jù)量盡量均衡,否則大量的請求就會堆積在某個Region上,造成熱點問題、對于Region Server的壓力也會比較大。

          如何避免熱點問題以及良好的預分區(qū)以及rowkey設計并不是我們的重點,但這能夠解釋為什么在ETL的過程中需要根據(jù)region的startkey進行reduce的分區(qū)。都是為了貼合HBase原本的設計,讓后續(xù)的bulkload能夠簡單便捷,快速的將之前生成HFile直接導入到region中!

          這點是后續(xù)進行優(yōu)化的部分,讓HiveToHBase能夠盡量擺脫其他前置流程 ( 建htable ) 的干擾、更加的專注于ETL部分。其實bulkload并沒有強制的要求一個HFile中都是相同region的記錄!

          3. 執(zhí)行bulkload、完成的儀式感

          講到這里我們開頭講的需要cover的重點部分就已經完成并解析了底層原理,加上最后的job提交以及bulkload,給整個流程加上結尾。

          Job job = Job.getInstance(conf, "HFile Generator ... ");job.setJarByClass(MRMain.class);job.setMapperClass(MRMapper.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(HFileOutputFormat2.class);
          HFileOutputFormat2.configureIncrementalLoad(job, htable);//等待mr運行完成job.waitForCompletion(true);
          LoadIncrementalHFiles loader = new LoadIncrementalHFiles(loadHBaseConf);loader.doBulkLoad(new Path(targetHtablePath), htable);

          4. 現(xiàn)狀分析

          講到這里HiveToHBase的MapReduce工作細節(jié)和流程都已經解析完成了,來看一下實際運行中的任務例子,總數(shù)據(jù)248903451條,60GB經過壓縮的ORC文件。

          痛點

          因為歷史的任務HBase建表時預分區(qū)沒有設置或者設置不合理,導致很多任務的region數(shù)量只有幾個。所以歷史的任務性能卡點基本都是在進行reduce生成HFile的時候,經查驗發(fā)現(xiàn)747個Map執(zhí)行了大約4分鐘,剩下兩個Reduce執(zhí)行了2小時22分鐘。

          而平臺整體HiveToHBase的HBase表region數(shù)量分布如下:

          可以看到大部分的HBase表 region數(shù)量都只有幾個,在這種情況下如果沿用之前的體系進行分區(qū)。那么整體的性能改變可以預想的不會太高!

          而且由于歷史原因HiveToHBase支持用戶寫sql完成Hive數(shù)據(jù)的處理,然后再導入HBase中。mr是不支持sql寫法的,之前是先使用tez引擎以insert overwrite directory + sql的方式產生臨時文件,然后將臨時文件再進行上述的加工。

          解決方案

          經過綜合的考量,決定采用Spark重寫HiveToHBase的流程?,F(xiàn)在官方已經有相應的工具包提供,也有樣例的scala代碼 ( Apache HBase ? Reference Guide、中文版:HBase and Spark-HBase中文參考指南 3.0 ),讓我們可以像寫MR一樣只寫mapper,不需要管分區(qū)和排序。

          但是這樣解決不了我們的痛點,所以決定不借助的官方工具箱,這也正是我們分析mr的job配置的最大原因,可以根據(jù)自己的需求進行定制開發(fā)。

          還記得上文中說過,其實bulkload并沒有強制的要求一個HFile中都是相同region的記錄 嗎?所以我們是可以不按照region數(shù)量切分partition的,擺脫htable region的影響。HBase bulkload的時候會check之前生成的HFile,查看數(shù)據(jù)應該被劃分到哪個Region中。

          如果是之前的方式提前將相同的前綴rowkey的數(shù)據(jù)聚合那么bulkload的速度就會很快,而如果不按照這種方式,各個region的數(shù)據(jù)混雜在一個HFile中,那么就會對bulkload的性能和負載產生一定的影響!這點需要根據(jù)實際情況進行評估。

          使用Spark的原因:

          • 考慮它直接支持sql連接hive,能夠優(yōu)化掉上面提到的步驟,整體流程會更簡便。

          • spark從架構上會比mr運行快得多。

          最后的預期以上述例子為示意 如下圖:

          03
          Spark工作流程

          核心流程代碼:與MR類似,不過它采用的是Spark 將RDD寫成磁盤文件的api。需要我們自己對數(shù)據(jù)進行排序工作。

          1. 排序

          構造一個KeyFamilyQualifier類,然后繼承Comparable進行類似完全排序的設計。實際驗證過程只需要rowkey:family:qualifier進行排序即可。

          public class KeyFamilyQualifier implements Comparable<KeyFamilyQualifier>, Serializable {
          private byte[] rowKey; private byte[] family; private byte[] qualifier;
          public KeyFamilyQualifier(byte[] rowKey, byte[] family, byte[] qualifier) { this.rowKey = rowKey; this.family = family; this.qualifier = qualifier; }
          @Override public int compareTo(KeyFamilyQualifier o) { int result = Bytes.compareTo(rowKey, o.getRowKey()); if (result == 0) { result = Bytes.compareTo(family, o.getFamily()); if (result == 0) { result = Bytes.compareTo(qualifier, o.getQualifier()); }        } return result; }}

          2. 核心處理流程

          spark中由于沒有可以自動配置的reducer,需要我們自己做更多的工作。下面是工作的流程:

          • 將spark的dataset轉為這部分是我們處理ETL的重點。

          • 按照KeyFamilyQualifier進行排序,滿足HBase底層需求,這一步使用 sortByKey(true) 升冪排列就行,因為Key是上面的KeyFamilyQualifier!

          • 將排好序的數(shù)據(jù)轉為,HFile接受的輸入數(shù)據(jù)格式。

          • 將構建完成的rdd數(shù)據(jù)集,轉成hfile格式的文件。

          SparkSession spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate();Dataset<Row> rows = spark.sql(hql);
          JavaPairRDD javaPairRDD = rows.javaRDD() .flatMapToPair(row -> rowToKeyFamilyQualifierPairRdd(row).iterator()) .sortByKey(true) .mapToPair(combineKey -> { return new Tuple2(combineKey._1()._1(), combineKey._2());        });
          Job job = Job.getInstance(conf, HBaseConf.getName());job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(KeyValue.class);HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); //使用job的conf,而不使用job本身;完成后續(xù) compression,bloomType,blockSize,DataBlockSize的配置javaPairRDD.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, job.getConfiguration());

          3. Spark:數(shù)據(jù)格式轉換

          row -> rowToKeyFamilyQualifierPairRdd(row).iterator()  這一part其實就是將row數(shù)據(jù)轉為< KeyFamilyQualifier, KeyValue>

          //獲取字段<value、type> 的tupleTuple2<String, String>[] dTypes = dataset.dtypes();return dataset.javaRDD().flatMapToPair(row -> {    List<Tuple2<KeyFamilyQualifier, KeyValue>> kvs = new ArrayList<>();    byte[] rowKey = generateRowKey();    // 如果rowKey 為null, 跳過    if (rowKey != null) {        for (Tuple2<String, String> dType : dTypes) {            Object obj = row.getAs(dType._1);            if (obj != null) {                kvs.add(new Tuple2<>(new KeyFamilyQualifier(rowkey,"cf1".getBytes(),Bytes.toBytes(dType._1)),getKV(param-x));            }        }    } else {        LOGGER.error("row key is null ,row = {}", row.toString());    }    return kvs.iterator();});

          這樣關于HiveToHBase的spark方式就完成了,關于partition的控制我們單獨設置了參數(shù)維護便于調整:

          // 如果任務的參數(shù) 傳入了 預定的分區(qū)數(shù)量if (partitionNum > 0) {    hiveData = hiveData.repartition(partitionNum);}

          分離了partition與sort的過程,因為repartition也是需要shuffle 有性能損耗,所以默認不開啟。就按照spark正常讀取的策略 一個hdfs block對應一個partition即可。如果有需要特殊維護的任務,例如加大并行度等,也可以通過參數(shù)控制。

          04
          二者對比

          上述例子的任務換成了新的方式運行,運行33分鐘完成。從146分鐘到33分鐘,性能整整提升了4倍有余。由于任務遷移和升級還需要很多前置性的工作,整體的數(shù)據(jù)未能在文章撰寫時產出,所以暫時以單個任務為例子進行對比性實驗。(因為任務的運行情況和集群的資源緊密掛鉤,只作為對照參考作用)

          可以看到策略變化對于bulkload的性能來說是幾乎沒有變化的,實際證明我們這種策略是行得通的:

          還有個任務是原有mr運行方式需要5.29小時,遷移到spark的方式 經過調優(yōu) ( 提高partition數(shù)量 ) 只需要11分鐘45秒。這種方式最重要的是可以手動進行調控,是可靈活維護的。本身離線任務的運行時長就是受到很多因素的制約,實驗雖然缺乏很強的說服力,但是基本還是能夠對比出提升的性能是非常多的。

          限于篇幅,有很多未能細講的點,例如加鹽讓數(shù)據(jù)均勻的分布在region中,partition的自動計算,spark生成hfile過程中導致的oom問題。文筆拙略,希望大家能有點收獲。

          最后感謝開發(fā)測試過程中給予筆者很多幫助的雨松和馮亮,還有同組同學的大力支持。

          參考文章:

          1. 20張圖帶你到HBase的世界遨游【轉】 - sunsky303 - 博客園

          https://www.cnblogs.com/sunsky303/p/14312350.html

          2. HBase原理-數(shù)據(jù)讀取流程解析

          http://HBasefly.com/2016/12/21/HBase-getorscan/?aixuds=6h5ds3

          3. Hive、Spark SQL任務參數(shù)調優(yōu)

          https://www.jianshu.com/p/2964bf816efc

          4. Spark On HBase的官方jar包編譯與使用

          https://juejin.cn/post/6844903961242124295

          5. Apache HBase ? Reference Guide

          https://hbase.apache.org/book.html#_bulk_load

          6. HBase and Spark-HBase中文參考指南 3.0

          https://www.cntofu.com/book/173/docs/17.md

          瀏覽 43
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  A片黄色视频免费 | 毛片基地免费看 | 亚洲第一页在线观看 | 成人美女视频 | 91 乱伦 |