貝殼基于Spark的HiveToHBase實踐
導讀:本文詳細介紹了如何將Hive里的數(shù)據(jù)快速穩(wěn)定的寫進HBase中。由于數(shù)據(jù)量比較大,我們采取的是HBase官方提供的bulkload方式來避免HBase put api寫入壓力大的缺陷。團隊早期采用的是MapReduce進行計算生成HFile,然后使用bulkload進行數(shù)據(jù)導入的工作。
因為結構性的因素,整體的性能不是很理想,對于部分業(yè)務方來說不能接受。其中最重要的因素就是建HBase表時預分區(qū)的規(guī)劃不合理,導致了后面很多任務運行時間太過漫長,很多都達到了4~5個小時才能成功。
在重新審視和規(guī)劃時,自然的想到了從計算層面性能表現(xiàn)更佳的Spark。由它來接替MapReduce完成數(shù)據(jù)格式轉換,并生成HFile的核心工作。
實際生產工作中因為工作涉及到了兩個數(shù)據(jù)端的交互,為了更好的理解整體的流程以及如何優(yōu)化,知道ETL流程中為什么需要一些看上去并不需要的步驟,我們首先需要簡單的了解HBase的架構。
1. HBase結構簡單介紹
Apache HBase是一個開源的非關系型分布式數(shù)據(jù)庫,運行于HDFS之上。它能夠基于HDFS提供實時計算服務主要是架構與底層數(shù)據(jù)結構決定的,即由 LSM-Tree (Log-Structured Merge-Tree) + HTable (Region分區(qū)) + Cache決定的:
LSM樹是目前最流行的不可變磁盤存儲結構之一,僅使用緩存和append file方式來實現(xiàn)順序寫操作。其中關鍵的點是:排序字符串表 Sorted-String-Table,這里我們不深入細節(jié),這種底層結構對于bulkload的要求很重要一點就是數(shù)據(jù)需要排序。而以HBase的存儲形式來看,就是KeyValue需要進行排序!
HTable的數(shù)據(jù)需要均勻的分散在各個Region中,訪問HBase時先去HBase系統(tǒng)表查找定位這條記錄屬于哪個Region ,然后定位到這個Region屬于哪個RegionServer,然后就到對應服務器里面查找對應Region中的數(shù)據(jù)。
最后的bulkload過程都是相同的,差別只是在生成HFile的步驟。這也是下文重點描述的部分。
2. 數(shù)據(jù)流轉通路
數(shù)據(jù)從Hive到HBase的流程大致如下圖:

整個流程真正需要我們cover的就是ETL ( Extract Transfer Load ) 部分,HBase底層文件HFile屬于列存文件,每一列都是以KeyValue的數(shù)據(jù)格式進行存儲。

邏輯上真正需要我們做的工作很簡單:( 為了簡便、省去了timestamp 版本列 )、HBase一條數(shù)據(jù)在邏輯上的概念簡化如下:

如果看到了這里,恭喜你已經基本明白本文的行文邏輯了。接下來就是代碼原理時間:
Map/Reduce框架運轉在
1. mapper:數(shù)據(jù)格式轉換
mapper的目的就是將一行數(shù)據(jù),轉為rowkey:column family:qualifer:value的形式。關鍵的ETL代碼就是將map取得的value,轉成< ImmutableBytesWritable,Put>輸出、進而交給reducer進行處理。
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)throws IOException, InterruptedException {//由字符串切割成每一列的value數(shù)組String[] values = value.toString().split("\\x01", -1);String rowKeyStr = generateRowKey();ImmutableBytesWritable hKey = new ImmutableBytesWritable(Bytes.toBytes(rowKeyStr));Put hPut = new Put(Bytes.toBytes(rowKeyStr));for (int i = 0; i < columns.length; i++) {String columnStr = columns[i];String cfNameStr = "cf1";String cellValueStr = values[i].trim();byte[] columbByte = Bytes.toBytes(columnStr);byte[] cfNameByte = Bytes.toBytes(cfNameStr);byte[] cellValueByte = Bytes.toBytes(cellValueStr);hPut.addColumn(cfNameByte, columbByte, cellValueByte);}context.write(hKey, hPut);}
mapper寫完了,好像已經把數(shù)據(jù)格式轉完了,還需要reducer嗎?參考官方的資料里也沒有找到關于reducer的消息,我轉念一想 事情沒有這么簡單!研讀了提交Job的主流程代碼后發(fā)現(xiàn)除了輸出文件的格式設置與其他mr程序不一樣:
job.setOutputFormatClass(HFileOutputFormat2.class);還有一個其他程序沒有的部分,那就是:
HFileOutputFormat2.configureIncrementalLoad(job,htable)故名思義就是對job進行HFile相關配置。HFileOutputFormat2 是
2. job的配置
挑選出比較相關核心的配置:
根據(jù)mapper的輸出格式來自動設置reducer,意味著我們這個mr程序可以只寫mapper,不需要寫reducer了。
獲取對應HBase表各個region的startKey,根據(jù)region的數(shù)量來設置reduce的數(shù)量,同時配置partitioner讓上一步mapper產生的數(shù)據(jù),分散到對應的partition ( reduce ) 中。
reducer的自動設置
// Based on the configured map output class, set the correct reducer to properly// sort the incoming values.// TODO it would be nice to pick one or the other of these formats.if (KeyValue.class.equals(job.getMapOutputValueClass())) {job.setReducerClass(KeyValueSortReducer.class);} else if (Put.class.equals(job.getMapOutputValueClass())) {job.setReducerClass(PutSortReducer.class);} else if (Text.class.equals(job.getMapOutputValueClass())) {job.setReducerClass(TextSortReducer.class);} else {LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());}
實際上上面三種reducer底層都是會將數(shù)據(jù)轉為KeyValue形式,然后進行排序。需要注意的是KeyValue 的排序是全排序,并不是以單個rowkey進行排序就行的。所謂全排序,就是將整個對象進行比較!
查看KeyValueSortRducer后會發(fā)現(xiàn)底層是一個叫做KeyValue.COMPARATOR的比較器,它是由Bytes.compareTo(byte[] buffer1, int offset1, int length1, byte[] buffer2, int offset2, int length2)將兩個KeyValue對象的每一個字節(jié)從頭開始比較,這是上面說到的全排序形式。
我們輸出的文件格式是HFileOutputFormat2,它在我們寫入數(shù)據(jù)的時候也會進行校驗check每次寫入的數(shù)據(jù)是否是按照KeyValue.COMPARATOR 定義的順序,要是沒有排序就會報錯退出!Added a key not lexically larger than previous。
reduce數(shù)量以及partitioner設置
為什么要根據(jù)HBase的region的情況來設置我們reduce的分區(qū)器以及數(shù)量呢?在上面的小節(jié)中有提到,region是HBase查詢的一個關鍵點。每個htable的region會有自己的【startKey、endKey】,分布在不同的region server中。

這個key的范圍是與rowkey匹配的,以上面這張表為例,數(shù)據(jù)進入region時的邏輯場景如下:

也正是因為這種管理結構,讓HBase的表的rowkey設計與region預分區(qū) ( 其實就是region數(shù)量與其 [starkey,endkey]的設置 ) 在日常的生產過程當中相當?shù)闹匾?。在大批量?shù)據(jù)導入的場景當然也是需要考慮的!
HBase的文件在hdfs的路徑是:
/HBase/data/<namespace>/<tbl_name>/<region_id>/<cf>/<hfile_id>通過并行處理Region來加快查詢的響應速度,所以會要求每個Region的數(shù)據(jù)量盡量均衡,否則大量的請求就會堆積在某個Region上,造成熱點問題、對于Region Server的壓力也會比較大。
如何避免熱點問題以及良好的預分區(qū)以及rowkey設計并不是我們的重點,但這能夠解釋為什么在ETL的過程中需要根據(jù)region的startkey進行reduce的分區(qū)。都是為了貼合HBase原本的設計,讓后續(xù)的bulkload能夠簡單便捷,快速的將之前生成HFile直接導入到region中!
這點是后續(xù)進行優(yōu)化的部分,讓HiveToHBase能夠盡量擺脫其他前置流程 ( 建htable ) 的干擾、更加的專注于ETL部分。其實bulkload并沒有強制的要求一個HFile中都是相同region的記錄!
3. 執(zhí)行bulkload、完成的儀式感
講到這里我們開頭講的需要cover的重點部分就已經完成并解析了底層原理,加上最后的job提交以及bulkload,給整個流程加上結尾。
Job job = Job.getInstance(conf, "HFile Generator ... ");job.setJarByClass(MRMain.class);job.setMapperClass(MRMapper.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(HFileOutputFormat2.class);HFileOutputFormat2.configureIncrementalLoad(job, htable);//等待mr運行完成job.waitForCompletion(true);LoadIncrementalHFiles loader = new LoadIncrementalHFiles(loadHBaseConf);loader.doBulkLoad(new Path(targetHtablePath), htable);
4. 現(xiàn)狀分析
講到這里HiveToHBase的MapReduce工作細節(jié)和流程都已經解析完成了,來看一下實際運行中的任務例子,總數(shù)據(jù)248903451條,60GB經過壓縮的ORC文件。
痛點
因為歷史的任務HBase建表時預分區(qū)沒有設置或者設置不合理,導致很多任務的region數(shù)量只有幾個。所以歷史的任務性能卡點基本都是在進行reduce生成HFile的時候,經查驗發(fā)現(xiàn)747個Map執(zhí)行了大約4分鐘,剩下兩個Reduce執(zhí)行了2小時22分鐘。

而平臺整體HiveToHBase的HBase表region數(shù)量分布如下:

可以看到大部分的HBase表 region數(shù)量都只有幾個,在這種情況下如果沿用之前的體系進行分區(qū)。那么整體的性能改變可以預想的不會太高!
而且由于歷史原因HiveToHBase支持用戶寫sql完成Hive數(shù)據(jù)的處理,然后再導入HBase中。mr是不支持sql寫法的,之前是先使用tez引擎以insert overwrite directory + sql的方式產生臨時文件,然后將臨時文件再進行上述的加工。
解決方案
經過綜合的考量,決定采用Spark重寫HiveToHBase的流程?,F(xiàn)在官方已經有相應的工具包提供
但是這樣解決不了我們的痛點,所以決定不借助
還記得上文中說過,其實bulkload并沒有強制的要求一個HFile中都是相同region的記錄 嗎?所以我們是可以不按照region數(shù)量切分partition的,擺脫htable region的影響。HBase bulkload的時候會check之前生成的HFile,查看數(shù)據(jù)應該被劃分到哪個Region中。
如果是之前的方式提前將相同的前綴rowkey的數(shù)據(jù)聚合那么bulkload的速度就會很快,而如果不按照這種方式,各個region的數(shù)據(jù)混雜在一個HFile中,那么就會對bulkload的性能和負載產生一定的影響!這點需要根據(jù)實際情況進行評估。
使用Spark的原因:
考慮它直接支持sql連接hive,能夠優(yōu)化掉上面提到的步驟,整體流程會更簡便。
spark從架構上會比mr運行快得多。
最后的預期以上述例子為示意 如下圖:

核心流程代碼:與MR類似,不過它采用的是Spark 將RDD寫成磁盤文件的api。需要我們自己對數(shù)據(jù)進行排序工作。
1. 排序
構造一個KeyFamilyQualifier類,然后繼承Comparable進行類似完全排序的設計。實際驗證過程只需要rowkey:family:qualifier進行排序即可。
public class KeyFamilyQualifier implements Comparable<KeyFamilyQualifier>, Serializable {private byte[] rowKey;private byte[] family;private byte[] qualifier;public KeyFamilyQualifier(byte[] rowKey, byte[] family, byte[] qualifier) {this.rowKey = rowKey;this.family = family;this.qualifier = qualifier;}public int compareTo(KeyFamilyQualifier o) {int result = Bytes.compareTo(rowKey, o.getRowKey());if (result == 0) {result = Bytes.compareTo(family, o.getFamily());if (result == 0) {result = Bytes.compareTo(qualifier, o.getQualifier());}}return result;}}
2. 核心處理流程
spark中由于沒有可以自動配置的reducer,需要我們自己做更多的工作。下面是工作的流程:
將spark的dataset
轉為
這部分是我們處理ETL的重點。 將
按照KeyFamilyQualifier進行排序,滿足HBase底層需求,這一步使用 sortByKey(true) 升冪排列就行,因為Key是上面的KeyFamilyQualifier! 將排好序的數(shù)據(jù)轉為
,HFile接受的輸入數(shù)據(jù)格式。 將構建完成的rdd數(shù)據(jù)集,轉成hfile格式的文件。
SparkSession spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate();Dataset<Row> rows = spark.sql(hql);JavaPairRDD javaPairRDD = rows.javaRDD().flatMapToPair(row -> rowToKeyFamilyQualifierPairRdd(row).iterator()).sortByKey(true).mapToPair(combineKey -> {return new Tuple2(combineKey._1()._1(), combineKey._2());});Job job = Job.getInstance(conf, HBaseConf.getName());job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(KeyValue.class);HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); //使用job的conf,而不使用job本身;完成后續(xù) compression,bloomType,blockSize,DataBlockSize的配置javaPairRDD.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, job.getConfiguration());
3. Spark:數(shù)據(jù)格式轉換
row -> rowToKeyFamilyQualifierPairRdd(row).iterator() 這一part其實就是將row數(shù)據(jù)轉為< KeyFamilyQualifier, KeyValue>
//獲取字段<value、type> 的tupleTuple2<String, String>[] dTypes = dataset.dtypes();return dataset.javaRDD().flatMapToPair(row -> {List<Tuple2<KeyFamilyQualifier, KeyValue>> kvs = new ArrayList<>();byte[] rowKey = generateRowKey();// 如果rowKey 為null, 跳過if (rowKey != null) {for (Tuple2<String, String> dType : dTypes) {Object obj = row.getAs(dType._1);if (obj != null) {kvs.add(new Tuple2<>(new KeyFamilyQualifier(rowkey,"cf1".getBytes(),Bytes.toBytes(dType._1)),getKV(param-x));}}} else {LOGGER.error("row key is null ,row = {}", row.toString());}return kvs.iterator();});
這樣關于HiveToHBase的spark方式就完成了,關于partition的控制我們單獨設置了參數(shù)維護便于調整:
// 如果任務的參數(shù) 傳入了 預定的分區(qū)數(shù)量if (partitionNum > 0) {hiveData = hiveData.repartition(partitionNum);}
分離了partition與sort的過程,因為repartition也是需要shuffle 有性能損耗,所以默認不開啟。就按照spark正常讀取的策略 一個hdfs block對應一個partition即可。如果有需要特殊維護的任務,例如加大并行度等,也可以通過參數(shù)控制。
上述例子的任務換成了新的方式運行,運行33分鐘完成。從146分鐘到33分鐘,性能整整提升了4倍有余。由于任務遷移和升級還需要很多前置性的工作,整體的數(shù)據(jù)未能在文章撰寫時產出,所以暫時以單個任務為例子進行對比性實驗。(因為任務的運行情況和集群的資源緊密掛鉤,只作為對照參考作用)
可以看到策略變化對于bulkload的性能來說是幾乎沒有變化的,實際證明我們這種策略是行得通的:

還有個任務是原有mr運行方式需要5.29小時,遷移到spark的方式 經過調優(yōu) ( 提高partition數(shù)量 ) 只需要11分鐘45秒。這種方式最重要的是可以手動進行調控,是可靈活維護的。本身離線任務的運行時長就是受到很多因素的制約,實驗雖然缺乏很強的說服力,但是基本還是能夠對比出提升的性能是非常多的。
限于篇幅,有很多未能細講的點,例如加鹽讓數(shù)據(jù)均勻的分布在region中,partition的自動計算,spark生成hfile過程中導致的oom問題。文筆拙略,希望大家能有點收獲。
最后感謝開發(fā)測試過程中給予筆者很多幫助的雨松和馮亮,還有同組同學的大力支持。
