Hive計(jì)算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎
點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
回復(fù)”面試“獲取更多驚喜

前言
Hive從2008年始于FaceBook工程師之手,經(jīng)過10幾年的發(fā)展至今保持強(qiáng)大的生命力。截止目前Hive已經(jīng)更新至3.1.x版本,Hive從最開始的為人詬病的速度慢迅速發(fā)展,開始支持更多的計(jì)算引擎,計(jì)算速度大大提升。
本文我們將從原理、應(yīng)用、調(diào)優(yōu)分別講解Hive所支持的MapReduce、Tez、Spark引擎。
MapReduce引擎
我們在之前的文章中:
《硬剛Hive|4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)》 《當(dāng)我們在學(xué)習(xí)Hive的時候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」》
對Hive的MapReduce引擎已經(jīng)做過非常詳細(xì)的講解了。
本文首發(fā)自公眾號:
《import_bigdata》,大數(shù)據(jù)技術(shù)與架構(gòu)。
在Hive2.x版本中,HiveSQL會被轉(zhuǎn)化為MR任務(wù),這也是我們經(jīng)常說的HiveSQL的執(zhí)行原理。
我們先來看下 Hive 的底層執(zhí)行架構(gòu)圖, Hive 的主要組件與 Hadoop 交互的過程:

在 Hive 這一側(cè),總共有五個組件:
UI:用戶界面。可看作我們提交SQL語句的命令行界面。 DRIVER:驅(qū)動程序。接收查詢的組件。該組件實(shí)現(xiàn)了會話句柄的概念。 COMPILER:編譯器。負(fù)責(zé)將 SQL 轉(zhuǎn)化為平臺可執(zhí)行的執(zhí)行計(jì)劃。對不同的查詢塊和查詢表達(dá)式進(jìn)行語義分析,并最終借助表和從 metastore 查找的分區(qū)元數(shù)據(jù)來生成執(zhí)行計(jì)劃。 METASTORE:元數(shù)據(jù)庫。存儲 Hive 中各種表和分區(qū)的所有結(jié)構(gòu)信息。 EXECUTION ENGINE:執(zhí)行引擎。負(fù)責(zé)提交 COMPILER 階段編譯好的執(zhí)行計(jì)劃到不同的平臺上。
上圖的基本流程是:
步驟1:UI 調(diào)用 DRIVER 的接口;
步驟2:DRIVER 為查詢創(chuàng)建會話句柄,并將查詢發(fā)送到 COMPILER(編譯器)生成執(zhí)行計(jì)劃;
步驟3和4:編譯器從元數(shù)據(jù)存儲中獲取本次查詢所需要的元數(shù)據(jù),該元數(shù)據(jù)用于對查詢樹中的表達(dá)式進(jìn)行類型檢查,以及基于查詢謂詞修建分區(qū);
步驟5:編譯器生成的計(jì)劃是分階段的DAG,每個階段要么是 map/reduce 作業(yè),要么是一個元數(shù)據(jù)或者HDFS上的操作。將生成的計(jì)劃發(fā)給 DRIVER。
如果是 map/reduce 作業(yè),該計(jì)劃包括 map operator trees 和一個 ?reduce operator tree,執(zhí)行引擎將會把這些作業(yè)發(fā)送給 MapReduce :
步驟6、6.1、6.2和6.3:執(zhí)行引擎將這些階段提交給適當(dāng)?shù)慕M件。在每個 task(mapper/reducer) 中,從HDFS文件中讀取與表或中間輸出相關(guān)聯(lián)的數(shù)據(jù),并通過相關(guān)算子樹傳遞這些數(shù)據(jù)。最終這些數(shù)據(jù)通過序列化器寫入到一個臨時HDFS文件中(如果不需要 reduce 階段,則在 map 中操作)。臨時文件用于向計(jì)劃中后面的 map/reduce 階段提供數(shù)據(jù)。
步驟7、8和9:最終的臨時文件將移動到表的位置,確保不讀取臟數(shù)據(jù)(文件重命名在HDFS中是原子操作)。對于用戶的查詢,臨時文件的內(nèi)容由執(zhí)行引擎直接從HDFS讀取,然后通過Driver發(fā)送到UI。
Hive SQL 編譯成 MapReduce 過程
美團(tuán)博客中有一篇非常詳細(xì)的博客講解《Hive SQL的編譯過程》。
你可以參考:
https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html
編譯 SQL 的任務(wù)是在上節(jié)中介紹的 COMPILER(編譯器組件)中完成的。Hive將SQL轉(zhuǎn)化為MapReduce任務(wù),整個編譯過程分為六個階段:

詞法、語法解析: Antlr 定義 SQL 的語法規(guī)則,完成 SQL 詞法,語法解析,將 SQL 轉(zhuǎn)化為抽象語法樹 AST Tree;
Antlr是一種語言識別的工具,可以用來構(gòu)造領(lǐng)域語言。使用Antlr構(gòu)造特定的語言只需要編寫一個語法文件,定義詞法和語法替換規(guī)則即可,Antlr完成了詞法分析、語法分析、語義分析、中間代碼生成的過程。
語義解析: 遍歷 AST Tree,抽象出查詢的基本組成單元 QueryBlock;
生成邏輯執(zhí)行計(jì)劃: 遍歷 QueryBlock,翻譯為執(zhí)行操作樹 OperatorTree;
優(yōu)化邏輯執(zhí)行計(jì)劃: 邏輯層優(yōu)化器進(jìn)行 OperatorTree 變換,合并 Operator,達(dá)到減少 MapReduce Job,減少數(shù)據(jù)傳輸及 shuffle 數(shù)據(jù)量;
生成物理執(zhí)行計(jì)劃: 遍歷 OperatorTree,翻譯為 MapReduce 任務(wù);
優(yōu)化物理執(zhí)行計(jì)劃: 物理層優(yōu)化器進(jìn)行 MapReduce 任務(wù)的變換,生成最終的執(zhí)行計(jì)劃。
下面對這六個階段詳細(xì)解析:
為便于理解,我們拿一個簡單的查詢語句進(jìn)行展示,對5月23號的地區(qū)維表進(jìn)行查詢:
select?*?from?dim.dim_region?where?dt?=?'2021-05-23';
階段一:詞法、語法解析
根據(jù)Antlr定義的sql語法規(guī)則,將相關(guān)sql進(jìn)行詞法、語法解析,轉(zhuǎn)化為抽象語法樹AST Tree:
ABSTRACT?SYNTAX?TREE:
TOK_QUERY
????TOK_FROM?
????TOK_TABREF
???????????TOK_TABNAME
???????????????dim
?????????????????dim_region
????TOK_INSERT
??????TOK_DESTINATION
??????????TOK_DIR
??????????????TOK_TMP_FILE
????????TOK_SELECT
??????????TOK_SELEXPR
??????????????TOK_ALLCOLREF
????????TOK_WHERE
??????????=
??????????????TOK_TABLE_OR_COL
??????????????????dt
????????????????????'2021-05-23'
階段二:語義解析
遍歷AST Tree,抽象出查詢的基本組成單元QueryBlock:
AST Tree生成后由于其復(fù)雜度依舊較高,不便于翻譯為mapreduce程序,需要進(jìn)行進(jìn)一步抽象和結(jié)構(gòu)化,形成QueryBlock。
QueryBlock是一條SQL最基本的組成單元,包括三個部分:輸入源,計(jì)算過程,輸出。簡單來講一個QueryBlock就是一個子查詢。
QueryBlock的生成過程為一個遞歸過程,先序遍歷 AST Tree ,遇到不同的 Token 節(jié)點(diǎn)(理解為特殊標(biāo)記),保存到相應(yīng)的屬性中。
階段三:生成邏輯執(zhí)行計(jì)劃
遍歷QueryBlock,翻譯為執(zhí)行操作樹OperatorTree:
Hive最終生成的MapReduce任務(wù),Map階段和Reduce階段均由OperatorTree組成。
基本的操作符包括:
TableScanOperator
SelectOperator
FilterOperator
JoinOperator
GroupByOperator
ReduceSinkOperator`
Operator在Map Reduce階段之間的數(shù)據(jù)傳遞都是一個流式的過程。每一個Operator對一行數(shù)據(jù)完成操作后之后將數(shù)據(jù)傳遞給childOperator計(jì)算。
由于Join/GroupBy/OrderBy均需要在Reduce階段完成,所以在生成相應(yīng)操作的Operator之前都會先生成一個ReduceSinkOperator,將字段組合并序列化為Reduce Key/value, Partition Key。
階段四:優(yōu)化邏輯執(zhí)行計(jì)劃
Hive中的邏輯查詢優(yōu)化可以大致分為以下幾類:
投影修剪 推導(dǎo)傳遞謂詞 謂詞下推 將Select-Select,F(xiàn)ilter-Filter合并為單個操作 多路 Join 查詢重寫以適應(yīng)某些列值的Join傾斜
階段五:生成物理執(zhí)行計(jì)劃
生成物理執(zhí)行計(jì)劃即是將邏輯執(zhí)行計(jì)劃生成的OperatorTree轉(zhuǎn)化為MapReduce Job的過程,主要分為下面幾個階段:
對輸出表生成MoveTask 從OperatorTree的其中一個根節(jié)點(diǎn)向下深度優(yōu)先遍歷 ReduceSinkOperator標(biāo)示Map/Reduce的界限,多個Job間的界限 遍歷其他根節(jié)點(diǎn),遇過碰到JoinOperator合并MapReduceTask 生成StatTask更新元數(shù)據(jù) 剪斷Map與Reduce間的Operator的關(guān)系
階段六:優(yōu)化物理執(zhí)行計(jì)劃
Hive中的物理優(yōu)化可以大致分為以下幾類:
分區(qū)修剪(Partition Pruning) 基于分區(qū)和桶的掃描修剪(Scan pruning) 如果查詢基于抽樣,則掃描修剪 在某些情況下,在 map 端應(yīng)用 Group By 在 mapper 上執(zhí)行 Join 優(yōu)化 Union,使Union只在 map 端執(zhí)行 在多路 Join 中,根據(jù)用戶提示決定最后流哪個表 刪除不必要的 ReduceSinkOperators 對于帶有Limit子句的查詢,減少需要為該表掃描的文件數(shù) 對于帶有Limit子句的查詢,通過限制 ReduceSinkOperator 生成的內(nèi)容來限制來自 mapper 的輸出 減少用戶提交的SQL查詢所需的Tez作業(yè)數(shù)量 如果是簡單的提取查詢,避免使用MapReduce作業(yè) 對于帶有聚合的簡單獲取查詢,執(zhí)行不帶 MapReduce 任務(wù)的聚合 重寫 Group By 查詢使用索引表代替原來的表 當(dāng)表掃描之上的謂詞是相等謂詞且謂詞中的列具有索引時,使用索引掃描
經(jīng)過以上六個階段,SQL 就被解析映射成了集群上的 MapReduce 任務(wù)。
Explain語法
Hive Explain 語句類似Mysql 的Explain 語句,提供了對應(yīng)查詢的執(zhí)行計(jì)劃,對于我們在理解Hive底層邏輯、Hive調(diào)優(yōu)、Hive SQL書寫等方面提供了一個參照,在我們的生產(chǎn)工作了是一個很有意義的工具。
Hive Explain語法
EXPLAIN [EXTENDED|CBO|AST|DEPENDENCY|AUTHORIZATION|LOCKS|VECTORIZATION|ANALYZE] query
Hive Explain的語法規(guī)則如上,后面將按照對應(yīng)的子句進(jìn)行探討。
EXTENDED 語句會在執(zhí)行計(jì)劃中產(chǎn)生關(guān)于算子(Operator)的額外信息,這些信息都是典型的物理信息,如文件名稱等。
在執(zhí)行Explain QUERY 之后,一個查詢會被轉(zhuǎn)化為包含多個Stage的語句(看起來更像一個DAG)。這些Stages要么是map/reduce Stage,要么是做些元數(shù)據(jù)或文件系統(tǒng)操作的Stage (如 move 、rename等)。Explain的輸出包含2個部分:
執(zhí)行計(jì)劃不同Stage之間的以來關(guān)系(Dependency) 每個Stage的執(zhí)行描述信息(Description)
以下將通過一個簡單的例子進(jìn)行解釋。
執(zhí)行Explain 語句
EXPLAIN?
SELECT?SUM(id)?FROM?test1;
Explain輸出結(jié)果解析
依賴圖
STAGE?DEPENDENCIES:
??Stage-1?is?a?root?stage
??Stage-0?depends?on?stages:?Stage-1
STAGE?PLANS:
??Stage:?Stage-1
????Map?Reduce
??????Map?Operator?Tree:
??????????TableScan
????????????alias:?test1
????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????Select?Operator
??????????????expressions:?id?(type:?int)
??????????????outputColumnNames:?id
??????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????????Group?By?Operator
????????????????aggregations:?sum(id)
????????????????mode:?hash
????????????????outputColumnNames:?_col0
????????????????Statistics:?Num?rows:?1?Data?size:?8?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????????Reduce?Output?Operator
??????????????????sort?order:
??????????????????Statistics:?Num?rows:?1?Data?size:?8?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????????????value?expressions:?_col0?(type:?bigint)
??????Reduce?Operator?Tree:
????????Group?By?Operator
??????????aggregations:?sum(VALUE._col0)
??????????mode:?mergepartial
??????????outputColumnNames:?_col0
??????????Statistics:?Num?rows:?1?Data?size:?8?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????File?Output?Operator
????????????compressed:?false
????????????Statistics:?Num?rows:?1?Data?size:?8?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????table:
????????????????input?format:?org.apache.hadoop.mapred.SequenceFileInputFormat
????????????????output?format:?org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
????????????????serde:?org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
??Stage:?Stage-0
????Fetch?Operator
??????limit:?-1
??????Processor?Tree:
????????ListSink
一個HIVE查詢被轉(zhuǎn)換為一個由一個或多個stage組成的序列(有向無環(huán)圖DAG)。這些stage可以是MapReduce stage,也可以是負(fù)責(zé)元數(shù)據(jù)存儲的stage,也可以是負(fù)責(zé)文件系統(tǒng)的操作(比如移動和重命名)的stage。
我們將上述結(jié)果拆分看,先從最外層開始,包含兩個大的部分:
stage dependencies:各個stage之間的依賴性 stage plan:各個stage的執(zhí)行計(jì)劃
先看第一部分 stage dependencies ,包含兩個 stage,Stage-1 是根stage,說明這是開始的stage,Stage-0 依賴 Stage-1,Stage-1執(zhí)行完成后執(zhí)行Stage-0。
再看第二部分 stage plan,里面有一個 Map Reduce,一個MR的執(zhí)行計(jì)劃分為兩個部分
Map Operator Tree:MAP端的執(zhí)行計(jì)劃樹 Reduce Operator Tree:Reduce端的執(zhí)行計(jì)劃樹
這兩個執(zhí)行計(jì)劃樹里面包含這條sql語句的 operator
TableScan:表掃描操作,map端第一個操作肯定是加載表,所以就是表掃描操作,常見的屬性:
alias:表名稱
Statistics:表統(tǒng)計(jì)信息,包含表中數(shù)據(jù)條數(shù),數(shù)據(jù)大小等
Select Operator:選取操作,常見的屬性 :
expressions:需要的字段名稱及字段類型
outputColumnNames:輸出的列名稱
Statistics:表統(tǒng)計(jì)信息,包含表中數(shù)據(jù)條數(shù),數(shù)據(jù)大小等
Group By Operator:分組聚合操作,常見的屬性:
aggregations:顯示聚合函數(shù)信息.
mode:聚合模式,值有?hash:隨機(jī)聚合,就是hash?partition;partial:局部聚合;final:最終聚合.
keys:分組的字段,如果沒有分組,則沒有此字段.
outputColumnNames:聚合之后輸出列名.
Statistics:表統(tǒng)計(jì)信息,包含分組聚合之后的數(shù)據(jù)條數(shù),數(shù)據(jù)大小等.
Reduce Output Operator:輸出到reduce操作,常見屬性:
sort order:值為空?不排序;值為?+?正序排序,值為?-?倒序排序;值為?±?排序的列為兩列,第一列為正序,第二列為倒序.
Filter Operator:過濾操作,常見的屬性:
predicate:過濾條件,如sql語句中的where?id>=1,則此處顯示(id?>=?1).
Map Join Operator:join 操作,常見的屬性:
condition map:join方式?,如Inner Join 0 to 1 Left Outer Join0 to 2
keys:?join?的條件字段
outputColumnNames:join 完成之后輸出的字段
Statistics:join 完成之后生成的數(shù)據(jù)條數(shù),大小等
File Output Operator:文件輸出操作,常見的屬性:
compressed:是否壓縮
table:表的信息,包含輸入輸出文件格式化方式,序列化方式等
Fetch Operator 客戶端獲取數(shù)據(jù)操作,常見的屬性:
limit,值為?-1?表示不限制條數(shù),其他值為限制的條數(shù)
Explain使用場景
那么Explain能夠?yàn)槲覀冊谏a(chǎn)實(shí)踐中帶來哪些便利及解決我們哪些迷惑呢?
本文首發(fā)自公眾號:
《import_bigdata》,大數(shù)據(jù)技術(shù)與架構(gòu)。
join 語句會過濾 Null 的值嗎?
現(xiàn)在,我們在hive cli 輸入以下查詢計(jì)劃語句
select?a.id,b.user_name?from?test1?a?join?test2?b?on?a.id=b.id;
然后執(zhí)行:
explain?select?a.id,b.user_name?from?test1?a?join?test2?b?on?a.id=b.id;
我們來看結(jié)果:
TableScan
?alias:?a
?Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
?Filter?Operator
????predicate:?id?is?not?null?(type:?boolean)
????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
????Select?Operator
????????expressions:?id?(type:?int)
????????outputColumnNames:?_col0
????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
????????HashTable?Sink?Operator
???????????keys:
?????????????0?_col0?(type:?int)
?????????????1?_col0?(type:?int)
?...
從上述結(jié)果可以看到 predicate: id is not null 這樣一行,說明 join 時會自動過濾掉關(guān)聯(lián)字段為 null 值的情況,但 left join 或 full join 是不會自動過濾null值的,大家可以自行嘗試下。
group by 分組語句會進(jìn)行排序嗎?
select?id,max(user_name)?from?test1?group?by?id;
直接來看 explain 之后結(jié)果:
TableScan
????alias:?test1
????Statistics:?Num?rows:?9?Data?size:?108?Basic?stats:?COMPLETE?Column?stats:?NONE
????Select?Operator
????????expressions:?id?(type:?int),?user_name?(type:?string)
????????outputColumnNames:?id,?user_name
????????Statistics:?Num?rows:?9?Data?size:?108?Basic?stats:?COMPLETE?Column?stats:?NONE
????????Group?By?Operator
???????????aggregations:?max(user_name)
???????????keys:?id?(type:?int)
???????????mode:?hash
???????????outputColumnNames:?_col0,?_col1
???????????Statistics:?Num?rows:?9?Data?size:?108?Basic?stats:?COMPLETE?Column?stats:?NONE
???????????Reduce?Output?Operator
?????????????key?expressions:?_col0?(type:?int)
?????????????sort?order:?+
?????????????Map-reduce?partition?columns:?_col0?(type:?int)
?????????????Statistics:?Num?rows:?9?Data?size:?108?Basic?stats:?COMPLETE?Column?stats:?NONE
?????????????value?expressions:?_col1?(type:?string)
?...
我們看 Group By Operator,里面有 keys: id (type: int) 說明按照 id 進(jìn)行分組的,再往下看還有 sort order: + ,說明是按照 id 字段進(jìn)行正序排序的。
哪條sql執(zhí)行效率高
觀察如下兩條sql:
SELECT
?a.id,
?b.user_name
FROM
?test1?a
JOIN?test2?b?ON?a.id?=?b.id
WHERE
?a.id?>?2;
SELECT
?a.id,
?b.user_name
FROM
?(SELECT?*?FROM?test1?WHERE?id?>?2)?a
JOIN?test2?b?ON?a.id?=?b.id;
這兩條sql語句輸出的結(jié)果是一樣的,但是哪條sql執(zhí)行效率高呢?
有人說第一條sql執(zhí)行效率高,因?yàn)榈诙lsql有子查詢,子查詢會影響性能;有人說第二條sql執(zhí)行效率高,因?yàn)橄冗^濾之后,在進(jìn)行join時的條數(shù)減少了,所以執(zhí)行效率就高了。到底哪條sql效率高呢,我們直接在sql語句前面加上 explain,看下執(zhí)行計(jì)劃不就知道了嘛!
在第一條sql語句前加上 explain,得到如下結(jié)果:
hive?(default)>?explain?select?a.id,b.user_name?from?test1?a?join?test2?b?on?a.id=b.id?where?a.id?>2;
OK
Explain
STAGE?DEPENDENCIES:
??Stage-4?is?a?root?stage
??Stage-3?depends?on?stages:?Stage-4
??Stage-0?depends?on?stages:?Stage-3
STAGE?PLANS:
??Stage:?Stage-4
????Map?Reduce?Local?Work
??????Alias?->?Map?Local?Tables:
????????$hdt$_0:a
??????????Fetch?Operator
????????????limit:?-1
??????Alias?->?Map?Local?Operator?Tree:
????????$hdt$_0:a
??????????TableScan
????????????alias:?a
????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????Filter?Operator
??????????????predicate:?(id?>?2)?(type:?boolean)
??????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????????Select?Operator
????????????????expressions:?id?(type:?int)
????????????????outputColumnNames:?_col0
????????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????????HashTable?Sink?Operator
??????????????????keys:
????????????????????0?_col0?(type:?int)
????????????????????1?_col0?(type:?int)
??Stage:?Stage-3
????Map?Reduce
??????Map?Operator?Tree:
??????????TableScan
????????????alias:?b
????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????Filter?Operator
??????????????predicate:?(id?>?2)?(type:?boolean)
??????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????????Select?Operator
????????????????expressions:?id?(type:?int),?user_name?(type:?string)
????????????????outputColumnNames:?_col0,?_col1
????????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????????Map?Join?Operator
??????????????????condition?map:
???????????????????????Inner?Join?0?to?1
??????????????????keys:
????????????????????0?_col0?(type:?int)
????????????????????1?_col0?(type:?int)
??????????????????outputColumnNames:?_col0,?_col2
??????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????????????Select?Operator
????????????????????expressions:?_col0?(type:?int),?_col2?(type:?string)
????????????????????outputColumnNames:?_col0,?_col1
????????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????????????File?Output?Operator
??????????????????????compressed:?false
??????????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????????????????table:
??????????????????????????input?format:?org.apache.hadoop.mapred.SequenceFileInputFormat
??????????????????????????output?format:?org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
??????????????????????????serde:?org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
??????Local?Work:
????????Map?Reduce?Local?Work
??Stage:?Stage-0
????Fetch?Operator
??????limit:?-1
??????Processor?Tree:
????????ListSink
在第二條sql語句前加上 explain,得到如下結(jié)果:
hive?(default)>?explain?select?a.id,b.user_name?from(select?*?from??test1?where?id>2?)?a?join?test2?b?on?a.id=b.id;
OK
Explain
STAGE?DEPENDENCIES:
??Stage-4?is?a?root?stage
??Stage-3?depends?on?stages:?Stage-4
??Stage-0?depends?on?stages:?Stage-3
STAGE?PLANS:
??Stage:?Stage-4
????Map?Reduce?Local?Work
??????Alias?->?Map?Local?Tables:
????????$hdt$_0:test1
??????????Fetch?Operator
????????????limit:?-1
??????Alias?->?Map?Local?Operator?Tree:
????????$hdt$_0:test1
??????????TableScan
????????????alias:?test1
????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????Filter?Operator
??????????????predicate:?(id?>?2)?(type:?boolean)
??????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????????Select?Operator
????????????????expressions:?id?(type:?int)
????????????????outputColumnNames:?_col0
????????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????????HashTable?Sink?Operator
??????????????????keys:
????????????????????0?_col0?(type:?int)
????????????????????1?_col0?(type:?int)
??Stage:?Stage-3
????Map?Reduce
??????Map?Operator?Tree:
??????????TableScan
????????????alias:?b
????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????Filter?Operator
??????????????predicate:?(id?>?2)?(type:?boolean)
??????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????????Select?Operator
????????????????expressions:?id?(type:?int),?user_name?(type:?string)
????????????????outputColumnNames:?_col0,?_col1
????????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????????Map?Join?Operator
??????????????????condition?map:
???????????????????????Inner?Join?0?to?1
??????????????????keys:
????????????????????0?_col0?(type:?int)
????????????????????1?_col0?(type:?int)
??????????????????outputColumnNames:?_col0,?_col2
??????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????????????Select?Operator
????????????????????expressions:?_col0?(type:?int),?_col2?(type:?string)
????????????????????outputColumnNames:?_col0,?_col1
????????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
????????????????????File?Output?Operator
??????????????????????compressed:?false
??????????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
??????????????????????table:
??????????????????????????input?format:?org.apache.hadoop.mapred.SequenceFileInputFormat
??????????????????????????output?format:?org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
??????????????????????????serde:?org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
??????Local?Work:
????????Map?Reduce?Local?Work
??Stage:?Stage-0
????Fetch?Operator
??????limit:?-1
??????Processor?Tree:
????????ListSink
大家有什么發(fā)現(xiàn),除了表別名不一樣,其他的執(zhí)行計(jì)劃完全一樣,都是先進(jìn)行 where 條件過濾,在進(jìn)行 join 條件關(guān)聯(lián)。說明 hive 底層會自動幫我們進(jìn)行優(yōu)化,所以這兩條sql語句執(zhí)行效率是一樣的。 以上僅列舉了3個我們生產(chǎn)中既熟悉又有點(diǎn)迷糊的例子,explain 還有很多其他的用途,如查看stage的依賴情況、排查數(shù)據(jù)傾斜、hive 調(diào)優(yōu)等,小伙伴們可以自行嘗試。
explain dependency的用法
explain dependency用于描述一段SQL需要的數(shù)據(jù)來源,輸出是一個json格式的數(shù)據(jù),里面包含以下兩個部分的內(nèi)容:
input_partitions:描述一段SQL依賴的數(shù)據(jù)來源表分區(qū),里面存儲的是分區(qū)名的列表,如果整段SQL包含的所有表都是非分區(qū)表,則顯示為空。 input_tables:描述一段SQL依賴的數(shù)據(jù)來源表,里面存儲的是Hive表名的列表。
使用explain dependency查看SQL查詢非分區(qū)普通表,在 hive cli 中輸入以下命令:
explain?dependency?select?s_age,count(1)?num?from?student_orc;
得到如下結(jié)果:
{"input_partitions":[],"input_tables":[{"tablename":"default@student_tb?_orc","tabletype":"MANAGED_TABLE"}]}
使用explain dependency查看SQL查詢分區(qū)表,在 hive cli 中輸入以下命令:
explain?dependency?select?s_age,count(1)?num?from?student_orc_partition;
得到結(jié)果:
{"input_partitions":[{"partitionName":"default@student_orc_partition@?part=0"},?
{"partitionName":"default@student_orc_partition@part=1"},?
{"partitionName":"default@student_orc_partition@part=2"},?
{"partitionName":"default@student_orc_partition@part=3"},
{"partitionName":"default@student_orc_partition@part=4"},?
{"partitionName":"default@student_orc_partition@part=5"},
{"partitionName":"default@student_orc_partition@part=6"},
{"partitionName":"default@student_orc_partition@part=7"},
{"partitionName":"default@student_orc_partition@part=8"},
{"partitionName":"default@student_orc_partition@part=9"}],?
"input_tables":[{"tablename":"default@student_orc_partition",?"tabletype":"MANAGED_TABLE"}]
explain dependency的使用場景有兩個:
場景一:快速排除。快速排除因?yàn)樽x取不到相應(yīng)分區(qū)的數(shù)據(jù)而導(dǎo)致任務(wù)數(shù)據(jù)輸出異常。例如,在一個以天分區(qū)的任務(wù)中,上游任務(wù)因?yàn)樯a(chǎn)過程不可控因素出現(xiàn)異常或者空跑,導(dǎo)致下游任務(wù)引發(fā)異常。通過這種方式,可以快速查看SQL讀取的分區(qū)是否出現(xiàn)異常。 場景二:理清表的輸入,幫助理解程序的運(yùn)行,特別是有助于理解有多重子查詢,多表連接的依賴輸入。
下面通過兩個案例來看explain dependency的實(shí)際運(yùn)用:
識別看似等價(jià)的代碼
有如下兩條看似相等的sql:
代碼一:
select?
a.s_no?
from?student_orc_partition?a?
inner?join?
student_orc_partition_only?b?
on?a.s_no=b.s_no?and?a.part=b.part?and?a.part>=1?and?a.part<=2;
代碼二:
select?
a.s_no?
from?student_orc_partition?a?
inner?join?
student_orc_partition_only?b?
on?a.s_no=b.s_no?and?a.part=b.part?
where?a.part>=1?and?a.part<=2;
我們看下上述兩段代碼explain dependency的輸出結(jié)果:
代碼1的explain dependency結(jié)果:
{"input_partitions":?
[{"partitionName":"default@student_orc_partition@part=0"},?
{"partitionName":"default@student_orc_partition@part=1"},?
{"partitionName":"default@student_orc_partition@part=2"},
{"partitionName":"default@student_orc_partition_only@part=1"},?
{"partitionName":"default@student_orc_partition_only@part=2"}],?
"input_tables":?[{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"},?{"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}
代碼2的explain dependency結(jié)果:
{"input_partitions":?
[{"partitionName":"default@student_orc_partition@part=1"},?
{"partitionName"?:?"default@student_orc_partition@part=2"},
{"partitionName"?:"default@student_orc_partition_only@part=1"},
{"partitionName":"default@student_orc_partition_only@part=2"}],?
"input_tables":?[{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"},?{"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}
通過上面的輸出結(jié)果可以看到,其實(shí)上述的兩個SQL并不等價(jià),代碼1在內(nèi)連接(inner join)中的連接條件(on)中加入非等值的過濾條件后,并沒有將內(nèi)連接的左右兩個表按照過濾條件進(jìn)行過濾,內(nèi)連接在執(zhí)行時會多讀取part=0的分區(qū)數(shù)據(jù)。而在代碼2中,會過濾掉不符合條件的分區(qū)。
識別SQL讀取數(shù)據(jù)范圍的差別
有如下兩段代碼:
代碼一:
explain?dependency
select
a.s_no?
from?student_orc_partition?a?
left?join?
student_orc_partition_only?b?
on?a.s_no=b.s_no?and?a.part=b.part?and?b.part>=1?and?b.part<=2;
代碼二:
explain?dependency?
select?
a.s_no?
from?student_orc_partition?a?
left?join?
student_orc_partition_only?b?
on?a.s_no=b.s_no?and?a.part=b.part?and?a.part>=1?and?a.part<=2;
以上兩個代碼的數(shù)據(jù)讀取范圍是一樣的嗎?答案是不一樣,我們通過explain dependency來看下:
代碼1的explain dependency結(jié)果:
{"input_partitions":?
[{"partitionName":?"default@student_orc_partition@part=0"},?
{"partitionName":"default@student_orc_partition@part=1"},?…中間省略7個分區(qū)
{"partitionName":"default@student_orc_partition@part=9"},?
{"partitionName":"default@student_orc_partition_only@part=1"},?
{"partitionName":"default@student_orc_partition_only@part=2"}],?
"input_tables":?[{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"},?{"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}
代碼2的explain dependency結(jié)果:
{"input_partitions":?
[{"partitionName":"default@student_orc_partition@part=0"},?
{"partitionName":"default@student_orc_partition@part=1"},?…中間省略7個分區(qū)?
{"partitionName":"default@student_orc_partition@part=9"},?
{"partitionName":"default@student_orc_partition_only@part=0"},?
{"partitionName":"default@student_orc_partition_only@part=1"},?…中間省略7個分區(qū)?
{"partitionName":"default@student_orc_partition_only@part=9"}],
"input_tables":?[{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"},?{"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}
可以看到,對左外連接在連接條件中加入非等值過濾的條件,如果過濾條件是作用于右表(b表)有起到過濾的效果,則右表只要掃描兩個分區(qū)即可,但是左表(a表)會進(jìn)行全表掃描。如果過濾條件是針對左表,則完全沒有起到過濾的作用,那么兩個表將進(jìn)行全表掃描。這時的情況就如同全外連接一樣都需要對兩個數(shù)據(jù)進(jìn)行全表掃描。
在使用過程中,容易認(rèn)為代碼片段2可以像代碼片段1一樣進(jìn)行數(shù)據(jù)過濾,通過查看explain dependency的輸出結(jié)果,可以知道不是如此。
explain authorization 的用法
通過explain authorization可以知道當(dāng)前SQL訪問的數(shù)據(jù)來源(INPUTS) 和數(shù)據(jù)輸出(OUTPUTS),以及當(dāng)前Hive的訪問用戶 (CURRENT_USER)和操作(OPERATION)。
在 hive cli 中輸入以下命令:
explain?authorization?
select?variance(s_score)?from?student_tb_orc;
結(jié)果如下:
INPUTS:?
??default@student_tb_orc?
OUTPUTS:?
??hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194-?90f1475a3ed5/-mr-10000?
CURRENT_USER:?
??hdfs?
OPERATION:?
??QUERY?
AUTHORIZATION_FAILURES:?
??No?privilege?'Select'?found?for?inputs?{?database:default,?table:student_?tb_orc,?columnName:s_score}
從上面的信息可知:
上面案例的數(shù)據(jù)來源是defalut數(shù)據(jù)庫中的 student_tb_orc表; 數(shù)據(jù)的輸出路徑是hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194-90f1475a3ed5/-mr-10000; 當(dāng)前的操作用戶是hdfs,操作是查詢; 觀察上面的信息我們還會看到AUTHORIZATION_FAILURES信息,提示對當(dāng)前的輸入沒有查詢權(quán)限,但如果運(yùn)行上面的SQL的話也能夠正常運(yùn)行。為什么會出現(xiàn)這種情況?Hive在默認(rèn)不配置權(quán)限管理的情況下不進(jìn)行權(quán)限驗(yàn)證,所有的用戶在Hive里面都是超級管理員,即使不對特定的用戶進(jìn)行賦權(quán),也能夠正常查詢。
Tez引擎
Tez是Apache開源的支持DAG作業(yè)的計(jì)算框架,是支持HADOOP2.x的重要引擎。它源于MapReduce框架,核心思想是將Map和Reduce兩個操作進(jìn)一步拆分,分解后的元操作可以任意靈活組合,產(chǎn)生新的操作,這些操作經(jīng)過一些控制程序組裝后,可形成一個大的DAG作業(yè)。

Tez將Map task和Reduce task進(jìn)一步拆分為如下圖所示:

Tez的task由Input、processor、output階段組成,可以表達(dá)所有復(fù)雜的map、reduce操作,如下圖:

本文首發(fā)自公眾號:
《import_bigdata》,大數(shù)據(jù)技術(shù)與架構(gòu)。
Tez的實(shí)現(xiàn)
Tez對外提供了6種可編程組件,分別是:
1)Input:對輸入數(shù)據(jù)源的抽象,它解析輸入數(shù)據(jù)格式,并吐出一個個Key/value
2)Output:對輸出數(shù)據(jù)源的抽象,它將用戶程序產(chǎn)生的Key/value寫入文件系統(tǒng)
3)Paritioner:對數(shù)據(jù)進(jìn)行分片,類似于MR中的Partitioner
4)Processor:對計(jì)算的抽象,它從一個Input中獲取數(shù)據(jù),經(jīng)處理后,通過Output輸出
5)Task:對任務(wù)的抽象,每個Task由一個Input、Ouput和Processor組成
6)Maser:管理各個Task的依賴關(guān)系,并按順依賴關(guān)系執(zhí)行他們
除了以上6種組件,Tez還提供了兩種算子,分別是Sort(排序)和Shuffle(混洗),為了用戶使用方便,它還提供了多種Input、Output、Task和Sort的實(shí)現(xiàn),具體如下:
1)Input實(shí)現(xiàn):LocalMergedInput(文件本地合并后作為輸入),ShuffledMergedInput(遠(yuǎn)程拷貝數(shù)據(jù)且合并后作為輸入)
2)Output實(shí)現(xiàn):InMemorySortedOutput(內(nèi)存排序后輸出),LocalOnFileSorterOutput(本地磁盤排序后輸出),OnFileSortedOutput(磁盤排序后輸出)
3)Task實(shí)現(xiàn):RunTimeTask(非常簡單的Task,基本沒做什么事)
4)Sort實(shí)現(xiàn):DefaultSorter(本地?cái)?shù)據(jù)排序),InMemoryShuffleSorter(遠(yuǎn)程拷貝數(shù)據(jù)并排序)
為了展示Tez的使用方法和驗(yàn)證Tez框架的可用性,Apache在YARN MRAppMaster基礎(chǔ)上使用Tez編程接口重新設(shè)計(jì)了MapReduce框架,使之可運(yùn)行在YARN中。為此,Tez提供了以下幾個組件:
1)Input:SimpleInput(直接使用MR InputFormat獲取數(shù)據(jù))
2)Output:SimpleOutput(直接使用MR OutputFormat獲取數(shù)據(jù))
3)Partition:MRPartitioner(直接使用MR Partitioner獲取數(shù)據(jù))
4)Processor:MapProcessor(執(zhí)行Map Task),ReduceProcessor(執(zhí)行Reduce Task)
5)Task:FinalTask,InitialTask,initialTaskWithInMemSort,InitialTaskWithLocalSort ,IntermediateTask,LocalFinalTask,MapOnlyTask。
對于MapReduce作業(yè)而言,如果只有Map Task,則使用MapOnlyTask,否則,Map Task使用InitialTaskWithInMemSort而Reduce Task用FinalTask。當(dāng)然,如果你想編寫其他類型的作業(yè),可使用以上任何幾種Task進(jìn)行組合,比如”InitialTaskWithInMemSort –> FinalTask”是MapReduce作業(yè)。
為了減少Tez開發(fā)工作量,并讓Tez能夠運(yùn)行在YARN之上,Tez重用了大部分YARN中MRAppMater的代碼,包括客戶端、資源申請、任務(wù)推測執(zhí)行、任務(wù)啟動等。
Tez和MapReduce作業(yè)的比較:
Tez繞過了MapReduce很多不必要的中間的數(shù)據(jù)存儲和讀取的過程,直接在一個作業(yè)中表達(dá)了MapReduce需要多個作業(yè)共同協(xié)作才能完成的事情。
Tez和MapReduce一樣都運(yùn)行使用YARN作為資源調(diào)度和管理。但與MapReduce on YARN不同,Tez on YARN并不是將作業(yè)提交到ResourceManager,而是提交到AMPoolServer的服務(wù)上,AMPoolServer存放著若干已經(jīng)預(yù)先啟動ApplicationMaster的服務(wù)。
當(dāng)用戶提交一個作業(yè)上來后,AMPoolServer從中選擇一個ApplicationMaster用于管理用戶提交上來的作業(yè),這樣既可以節(jié)省ResourceManager創(chuàng)建ApplicationMaster的時間,而又能夠重用每個ApplicationMaster的資源,節(jié)省了資源釋放和創(chuàng)建時間。
Tez相比于MapReduce有幾點(diǎn)重大改進(jìn):
當(dāng)查詢需要有多個reduce邏輯時,Hive的MapReduce引擎會將計(jì)劃分解,每個Redcue提交一個MR作業(yè)。這個鏈中的所有MR作業(yè)都需要逐個調(diào)度,每個作業(yè)都必須從HDFS中重新讀取上一個作業(yè)的輸出并重新洗牌。而在Tez中,幾個reduce接收器可以直接連接,數(shù)據(jù)可以流水線傳輸,而不需要臨時HDFS文件,這種模式稱為MRR(Map-reduce-reduce*)。
Tez還允許一次發(fā)送整個查詢計(jì)劃,實(shí)現(xiàn)應(yīng)用程序動態(tài)規(guī)劃,從而使框架能夠更智能地分配資源,并通過各個階段流水線傳輸數(shù)據(jù)。對于更復(fù)雜的查詢來說,這是一個巨大的改進(jìn),因?yàn)樗薎O/sync障礙和各個階段之間的調(diào)度開銷。
在MapReduce計(jì)算引擎中,無論數(shù)據(jù)大小,在Shuffle階段都以相同的方式執(zhí)行,將數(shù)據(jù)序列化到磁盤,再由下游的程序去拉取,并反序列化。Tez可以允許小數(shù)據(jù)集完全在內(nèi)存中處理,而MapReduce中沒有這樣的優(yōu)化。倉庫查詢經(jīng)常需要在處理完大量的數(shù)據(jù)后對小型數(shù)據(jù)集進(jìn)行排序或聚合,Tez的優(yōu)化也能極大地提升效率。
給 Hive 換上 Tez 非常簡單,只需給 hive-site.xml 中設(shè)置:
????hive.execution.engine
????tez
設(shè)置hive.execution.engine為 tez 后進(jìn)入到 Hive 執(zhí)行 SQL:
hive>?select?count(*)?as?c?from?userinfo;
Query?ID?=?zhenqin_20161104150743_4155afab-4bfa-4e8a-acb0-90c8c50ecfb5
Total?jobs?=?1
Launching?Job?1?out?of?1
?
?
Status:?Running?(Executing?on?YARN?cluster?with?App?id?application_1478229439699_0007)
?
--------------------------------------------------------------------------------
????????VERTICES??????STATUS??TOTAL??COMPLETED??RUNNING??PENDING??FAILED??KILLED
--------------------------------------------------------------------------------
Map?1?..........???SUCCEEDED??????2??????????2????????0????????0???????0???????0
Reducer?2?......???SUCCEEDED??????1??????????1????????0????????0???????0???????0
--------------------------------------------------------------------------------
VERTICES:?02/02??[==========================>>]?100%??ELAPSED?TIME:?6.19?s?????
--------------------------------------------------------------------------------
OK
1000000
Time?taken:?6.611?seconds,?Fetched:?1?row(s)
可以看到,我的 userinfo 中有 100W 條記錄,執(zhí)行一遍 count 需要 6.19s。現(xiàn)在把 engine 換為 mr
set?hive.execution.engine=mr;
再次執(zhí)行 count userinfo:
hive>?select?count(*)?as?c?from?userinfo;
Query?ID?=?zhenqin_20161104152022_c7e6c5bd-d456-4ec7-b895-c81a369aab27
Total?jobs?=?1
Launching?Job?1?out?of?1
Starting?Job?=?job_1478229439699_0010,?Tracking?URL?=?http://localhost:8088/proxy/application_1478229439699_0010/
Kill?Command?=?/Users/zhenqin/software/hadoop/bin/hadoop?job??-kill?job_1478229439699_0010
Hadoop?job?information?for?Stage-1:?number?of?mappers:?1;?number?of?reducers:?1
2016-11-04?15:20:28,323?Stage-1?map?=?0%,??reduce?=?0%
2016-11-04?15:20:34,587?Stage-1?map?=?100%,??reduce?=?0%
2016-11-04?15:20:40,796?Stage-1?map?=?100%,??reduce?=?100%
Ended?Job?=?job_1478229439699_0010
MapReduce?Jobs?Launched:?
Stage-Stage-1:?Map:?1??Reduce:?1???HDFS?Read:?215?HDFS?Write:?0?SUCCESS
Total?MapReduce?CPU?Time?Spent:?0?msec
OK
1000000
Time?taken:?19.46?seconds,?Fetched:?1?row(s)
hive>?
可以看到,使用 Tez 效率比 MapReduce 有近3倍的提升。而且,Hive 在使用 Tez 引擎執(zhí)行時,有 ==>> 動態(tài)的進(jìn)度指示。而在使用 mr 時,只有日志輸出 map and reduce 的進(jìn)度百分比。使用 tez,輸出的日志也清爽很多。
在我測試的很多復(fù)雜的 SQL,Tez 的都比 MapReduce 快很多,快慢取決于 SQL 的復(fù)雜度。執(zhí)行簡單的 select 等并不能體現(xiàn) tez 的優(yōu)勢。Tez 內(nèi)部翻譯 SQL 能任意的 Map,Reduce,Reduce 組合,而 MR 只能 Map->Reduce->Map->Reduce,因此在執(zhí)行復(fù)雜 SQL 時, Tez 的優(yōu)勢明顯。
Tez 參數(shù)優(yōu)化
優(yōu)化參參數(shù)(在同樣條件下,使用了tez從300s+降到200s+)
set?hive.execution.engine=tez;
set?mapred.job.name=recommend_user_profile_$idate;
set?mapred.reduce.tasks=-1;
set?hive.exec.reducers.max=160;
set?hive.auto.convert.join=true;
set?hive.exec.parallel=true;
set?hive.exec.parallel.thread.number=16;?
set?hive.optimize.skewjoin=true;
set?hive.exec.reducers.bytes.per.reducer=100000000;
set?mapred.max.split.size=200000000;
set?mapred.min.split.size.per.node=100000000;
set?mapred.min.split.size.per.rack=100000000;
set?hive.hadoop.supports.splittable.combineinputformat=true;
set?hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
Tez內(nèi)存優(yōu)化
1. AM、Container大小設(shè)置
tez.am.resource.memory.mb
參數(shù)說明:Set tez.am.resource.memory.mb tobe the same as yarn.scheduler.minimum-allocation-mb the YARNminimum container size.
hive.tez.container.size
參數(shù)說明:Set hive.tez.container.size to be the same as or a small multiple(1 or 2 times that) of YARN container size yarn.scheduler.minimum-allocation-mb but NEVER more than yarn.scheduler.maximum-allocation-mb.
2. AM、Container JVM參數(shù)設(shè)置
tez.am.launch.cmd-opts?
默認(rèn)值:80% * tez.am.resource.memory.mb,一般不需要調(diào)整
hive.tez.java.ops
默認(rèn)值:80% * hive.tez.container.size 參數(shù)說明:Hortonworks建議"–server –Djava.net.preferIPv4Stack=true–XX:NewRatio=8 –XX:+UseNUMA –XX:UseG1G"
tez.container.max.java.heap.fraction
默認(rèn)值:0.8,參數(shù)說明:task/AM占用JVM Xmx的比例,該參數(shù)建議調(diào)整,需根據(jù)具體業(yè)務(wù)情況修改;
3. Hive內(nèi)存Map Join參數(shù)設(shè)置
tez.runtime.io.sort.mb
默認(rèn)值:100,參數(shù)說明:輸出排序需要的內(nèi)存大小。建議值:40% * hive.tez.container.size,一般不超過2G.
hive.auto.convert.join.noconditionaltask
默認(rèn)值:true,參數(shù)說明:是否將多個mapjoin合并為一個,使用默認(rèn)值
hive.auto.convert.join.noconditionaltask.size
默認(rèn)值為10MB,參數(shù)說明:多個mapjoin轉(zhuǎn)換為1個時,所有小表的文件大小總和的最大值,這個值只是限制輸入的表文件的大小,并不代表實(shí)際mapjoin時hashtable的大小。建議值:1/3 * hive.tez.container.size
tez.runtime.unordered.output.buffer.size-mb
默認(rèn)值:100,參數(shù)說明:Size of the buffer to use if not writing directly to disk。建議值: 10% * hive.tez.container.size.
4. Container重用設(shè)置
tez.am.container.reuse.enabled
默認(rèn)值:true,參數(shù)說明:Container重用開關(guān)
Spark引擎
Hive社區(qū)于2014年推出了Hive on Spark項(xiàng)目(HIVE-7292),將Spark作為繼MapReduce和Tez之后Hive的第三個計(jì)算引擎。該項(xiàng)目由Cloudera、Intel和MapR等幾家公司共同開發(fā),并受到了來自Hive和Spark兩個社區(qū)的共同關(guān)注。通過該項(xiàng)目,可以提高Hive查詢的性能,同時為已經(jīng)部署了Hive或者Spark的用戶提供了更加靈活的選擇,從而進(jìn)一步提高Hive和Spark的普及率。

本文首發(fā)自公眾號:
《import_bigdata》,大數(shù)據(jù)技術(shù)與架構(gòu)。
總體設(shè)計(jì)
Hive on Spark總體的設(shè)計(jì)思路是,盡可能重用Hive邏輯層面的功能;從生成物理計(jì)劃開始,提供一整套針對Spark的實(shí)現(xiàn),比如 SparkCompiler、SparkTask等,這樣Hive的查詢就可以作為Spark的任務(wù)來執(zhí)行了。以下是幾點(diǎn)主要的設(shè)計(jì)原則。
盡可能減少對Hive原有代碼的修改。這是和之前的Shark設(shè)計(jì)思路最大的不同。Shark對Hive的改動太大以至于無法被Hive社區(qū)接受,Hive on Spark盡可能少改動Hive的代碼,從而不影響Hive目前對MapReduce和Tez的支持。同時,Hive on Spark保證對現(xiàn)有的MapReduce和Tez模式在功能和性能方面不會有任何影響。 對于選擇Spark的用戶,應(yīng)使其能夠自動的獲取Hive現(xiàn)有的和未來新增的功能。 盡可能降低維護(hù)成本,保持對Spark依賴的松耦合。
基于以上思路和原則,具體的一些設(shè)計(jì)架構(gòu)如下。
Hive 的用戶可以通過hive.execution.engine來設(shè)置計(jì)算引擎,目前該參數(shù)可選的值為mr和tez。為了實(shí)現(xiàn)Hive on Spark,我們將spark作為該參數(shù)的第三個選項(xiàng)。要開啟Hive on Spark模式,用戶僅需將這個參數(shù)設(shè)置為spark即可。
在hive中使用以下語句開啟:
hive>?set?hive.execution.engine=spark;
總體設(shè)計(jì)
Spark 以分布式可靠數(shù)據(jù)集(Resilient Distributed Dataset,RDD)作為其數(shù)據(jù)抽象,因此我們需要將Hive的表轉(zhuǎn)化為RDD以便Spark處理。本質(zhì)上,Hive的表和Spark的 HadoopRDD都是HDFS上的一組文件,通過InputFormat和RecordReader讀取其中的數(shù)據(jù),因此這個轉(zhuǎn)化是自然而然的。
Spark為RDD提供了一系列的轉(zhuǎn)換(Transformation),其中有些轉(zhuǎn)換也是面向SQL 的,如groupByKey、join等。但如果使用這些轉(zhuǎn)換(就如Shark所做的那樣),就意味著我們要重新實(shí)現(xiàn)一些Hive已有的功能;而且當(dāng) Hive增加新的功能時,我們需要相應(yīng)地修改Hive on Spark模式。有鑒于此,我們選擇將Hive的操作符包裝為Function,然后應(yīng)用到RDD上。這樣,我們只需要依賴較少的幾種RDD的轉(zhuǎn)換,而主要的計(jì)算邏輯仍由Hive提供。
由于使用了Hive的原語,因此我們需要顯式地調(diào)用一些Transformation來實(shí)現(xiàn)Shuffle的功能。下表中列舉了Hive on Spark使用的所有轉(zhuǎn)換。

對repartitionAndSortWithinPartitions 簡單說明一下,這個功能由SPARK-2978引入,目的是提供一種MapReduce風(fēng)格的Shuffle。雖然sortByKey也提供了排序的功 能,但某些情況下我們并不需要全局有序,另外其使用的Range Partitioner對于某些Hive的查詢并不適用。
物理執(zhí)行計(jì)劃
通過SparkCompiler將Operator Tree轉(zhuǎn)換為Task Tree,其中需要提交給Spark執(zhí)行的任務(wù)即為SparkTask。不同于MapReduce中Map+Reduce的兩階段執(zhí)行模式,Spark采用DAG執(zhí)行模式,因此一個SparkTask包含了一個表示RDD轉(zhuǎn)換的DAG,我們將這個DAG包裝為SparkWork。執(zhí)行SparkTask 時,就根據(jù)SparkWork所表示的DAG計(jì)算出最終的RDD,然后通過RDD的foreachAsync來觸發(fā)運(yùn)算。使用foreachAsync是因?yàn)槲覀兪褂昧薍ive原語,因此不需要RDD返回結(jié)果;此外foreachAsync異步提交任務(wù)便于我們對任務(wù)進(jìn)行監(jiān)控。
SparkContext生命周期
SparkContext 是用戶與Spark集群進(jìn)行交互的接口,Hive on Spark應(yīng)該為每個用戶的會話創(chuàng)建一個SparkContext。但是Spark目前的使用方式假設(shè)SparkContext的生命周期是Spark應(yīng) 用級別的,而且目前在同一個JVM中不能創(chuàng)建多個SparkContext。這明顯無法滿足HiveServer2的應(yīng)用場景,因?yàn)槎鄠€客戶端需要通過同一個HiveServer2來提供服務(wù)。鑒于此,我們需要在單獨(dú)的JVM中啟動SparkContext,并通過RPC與遠(yuǎn)程的SparkContext進(jìn)行通信。
任務(wù)監(jiān)控與統(tǒng)計(jì)信息收集
Spark提供了SparkListener接口來監(jiān)聽任務(wù)執(zhí)行期間的各種事件,因此我們可以實(shí)現(xiàn)一個Listener來監(jiān)控任務(wù)執(zhí)行進(jìn)度以及收集任務(wù)級別的統(tǒng)計(jì)信 息(目前任務(wù)級別的統(tǒng)計(jì)由SparkListener采集,任務(wù)進(jìn)度則由Spark提供的專門的API來監(jiān)控)。另外Hive還提供了Operator級 別的統(tǒng)計(jì)數(shù)據(jù)信息,比如讀取的行數(shù)等。在MapReduce模式下,這些信息通過Hadoop Counter收集。我們可以使用Spark提供的Accumulator來實(shí)現(xiàn)該功能。
細(xì)節(jié)實(shí)現(xiàn)
Hive on Spark解析SQL的過程

SQL語句在分析執(zhí)行過程中會經(jīng)歷下圖所示的幾個步驟
語法解析 操作綁定 優(yōu)化執(zhí)行策略 交付執(zhí)行
語法解析
語法解析之后,會形成一棵語法樹,如下圖所示。樹中的每個節(jié)點(diǎn)是執(zhí)行的rule,整棵樹稱之為執(zhí)行策略。

策略優(yōu)化
形成上述的執(zhí)行策略樹還只是第一步,因?yàn)檫@個執(zhí)行策略可以進(jìn)行優(yōu)化,所謂的優(yōu)化就是對樹中節(jié)點(diǎn)進(jìn)行合并或是進(jìn)行順序上的調(diào)整。
以大家熟悉的join操作為例,下圖給出一個join優(yōu)化的示例。A JOIN B等同于B JOIN A,但是順序的調(diào)整可能給執(zhí)行的性能帶來極大的影響,下圖就是調(diào)整前后的對比圖。

在Hash Join中,首先被訪問的表稱之為“內(nèi)部構(gòu)建表”,第二個表為“探針輸入”。創(chuàng)建內(nèi)部表時,會將數(shù)據(jù)移動到數(shù)據(jù)倉庫指向的路徑;創(chuàng)建外部表,僅記錄數(shù)據(jù)所在的路徑。
再舉一例,一般來說盡可能的先實(shí)施聚合操作(Aggregate)然后再join

這種優(yōu)化自動完成,在調(diào)優(yōu)時不需要考慮。
SQL到Spark作業(yè)的轉(zhuǎn)換過程
native command的執(zhí)行流程
由于native command是一些非耗時的操作,直接使用Hive中原有的exeucte engine來執(zhí)行即可。這些command的執(zhí)行示意圖如下:

SparkTask的生成和執(zhí)行
我們通過一個例子來看一下一個簡單的兩表JOIN查詢?nèi)绾伪晦D(zhuǎn)換為SparkTask并被執(zhí)行。下圖左半部分展示了這個查詢的Operator Tree,以及該Operator Tree如何被轉(zhuǎn)化成SparkTask;右半部分展示了該SparkTask執(zhí)行時如何得到最終的RDD并通過foreachAsync提交Spark任務(wù)。

SparkCompiler遍歷Operator Tree,將其劃分為不同的MapWork和ReduceWork。
MapWork為根節(jié)點(diǎn),總是由TableScanOperator(Hive中對表進(jìn)行掃描的操作符)開始;后續(xù)的Work均為ReduceWork。ReduceSinkOperator(Hive中進(jìn)行Shuffle輸出的操作符)用來標(biāo)記兩個Work之間的界線,出現(xiàn)ReduceSinkOperator表示當(dāng)前Work到下一個Work之間的數(shù)據(jù)需要進(jìn)行Shuffle。因此,當(dāng)我們發(fā)現(xiàn)ReduceSinkOperator時,就會創(chuàng)建一個新的ReduceWork并作為當(dāng)前Work的子節(jié)點(diǎn)。包含了FileSinkOperator(Hive中將結(jié)果輸出到文件的操作符)的Work為葉子節(jié)點(diǎn)。
與MapReduce最大的不同在于,我們并不要求ReduceWork一定是葉子節(jié)點(diǎn),即ReduceWork之后可以鏈接更多的ReduceWork,并在同一個SparkTask中執(zhí)行。
從該圖可以看出,這個查詢的Operator Tree被轉(zhuǎn)化成了兩個MapWork和一個ReduceWork。
執(zhí)行SparkTask步驟:
根據(jù)MapWork來生成最底層的HadoopRDD, 將各個MapWork和ReduceWork包裝成Function應(yīng)用到RDD上。 在有依賴的Work之間,需要顯式地調(diào)用Shuffle轉(zhuǎn)換,具體選用哪種Shuffle則要根據(jù)查詢的類型來確定。另外,由于這個例子涉及多表查詢,因此在Shuffle之前還要對RDD進(jìn)行Union。 經(jīng)過這一系列轉(zhuǎn)換后,得到最終的RDD,并通過foreachAsync提交到Spark集群上進(jìn)行計(jì)算。
在logicalPlan到physicalPlan的轉(zhuǎn)換過程中,toRdd最關(guān)鍵的元素
override?lazy?val?toRdd:?RDD[Row]?=
??????analyzed?match?{
????????case?NativeCommand(cmd)?=>
??????????val?output?=?runSqlHive(cmd)
??????????if?(output.size?==?0)?{
????????????emptyResult
??????????}?else?{
????????????val?asRows?=?output.map(r?=>?new?GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
????????????sparkContext.parallelize(asRows,?1)
??????????}
????????case?_?=>
??????????executedPlan.execute().map(_.copy())
??????}
SparkTask的生成和執(zhí)行
我們通過一個例子來看一下一個簡單的兩表JOIN查詢?nèi)绾伪晦D(zhuǎn)換為SparkTask并被執(zhí)行。下圖左半部分展示了這個查詢的Operator Tree,以及該Operator Tree如何被轉(zhuǎn)化成SparkTask;右半部分展示了該SparkTask執(zhí)行時如何得到最終的RDD并通過foreachAsync提交Spark任務(wù)。

SparkCompiler遍歷Operator Tree,將其劃分為不同的MapWork和ReduceWork。MapWork為根節(jié)點(diǎn),總是由TableScanOperator(Hive中對表 進(jìn)行掃描的操作符)開始;后續(xù)的Work均為ReduceWork。ReduceSinkOperator(Hive中進(jìn)行Shuffle輸出的操作符) 用來標(biāo)記兩個Work之間的界線,出現(xiàn)ReduceSinkOperator表示當(dāng)前Work到下一個Work之間的數(shù)據(jù)需要進(jìn)行Shuffle。因此, 當(dāng)我們發(fā)現(xiàn)ReduceSinkOperator時,就會創(chuàng)建一個新的ReduceWork并作為當(dāng)前Work的子節(jié)點(diǎn)。包含了 FileSinkOperator(Hive中將結(jié)果輸出到文件的操作符)的Work為葉子節(jié)點(diǎn)。與MapReduce最大的不同在于,我們并不要求 ReduceWork一定是葉子節(jié)點(diǎn),即ReduceWork之后可以鏈接更多的ReduceWork,并在同一個SparkTask中執(zhí)行。
從該圖可以看出,這個查詢的Operator Tree被轉(zhuǎn)化成了兩個MapWork和一個ReduceWork。在執(zhí)行SparkTask時,首先根據(jù)MapWork來生成最底層的 HadoopRDD,然后將各個MapWork和ReduceWork包裝成Function應(yīng)用到RDD上。在有依賴的Work之間,需要顯式地調(diào)用 Shuffle轉(zhuǎn)換,具體選用哪種Shuffle則要根據(jù)查詢的類型來確定。另外,由于這個例子涉及多表查詢,因此在Shuffle之前還要對RDD進(jìn)行 Union。經(jīng)過這一系列轉(zhuǎn)換后,得到最終的RDD,并通過foreachAsync提交到Spark集群上進(jìn)行計(jì)算。
運(yùn)行模式
Hive on Spark支持兩種運(yùn)行模式:本地和遠(yuǎn)程。當(dāng)用戶把Spark Master URL設(shè)置為local時,采用本地模式;其余情況則采用遠(yuǎn)程模式。本地模式下,SparkContext與客戶端運(yùn)行在同一個JVM中;遠(yuǎn)程模式 下,SparkContext運(yùn)行在一個獨(dú)立的JVM中。提供本地模式主要是為了方便調(diào)試,一般用戶不應(yīng)選擇該模式。因此我們這里也主要介紹遠(yuǎn)程模式 (Remote SparkContext,RSC)。下圖展示了RSC的工作原理。

用戶的每個Session會創(chuàng)建一個SparkClient,SparkClient會啟動RemoteDriver進(jìn)程,并由RemoteDriver創(chuàng) 建SparkContext。SparkTask執(zhí)行時,通過Session提交任務(wù),任務(wù)的主體就是對應(yīng)的SparkWork。SparkClient 將任務(wù)提交給RemoteDriver,并返回一個SparkJobRef,通過該SparkJobRef,客戶端可以監(jiān)控任務(wù)執(zhí)行進(jìn)度,進(jìn)行錯誤處理, 以及采集統(tǒng)計(jì)信息等。由于最終的RDD計(jì)算沒有返回結(jié)果,因此客戶端只需要監(jiān)控執(zhí)行進(jìn)度而不需要處理返回值。RemoteDriver通過 SparkListener收集任務(wù)級別的統(tǒng)計(jì)數(shù)據(jù),通過Accumulator收集Operator級別的統(tǒng)計(jì)數(shù)據(jù)(Accumulator被包裝為 SparkCounter),并在任務(wù)結(jié)束時返回給SparkClient。
SparkClient 與RemoteDriver之間通過基于Netty的RPC進(jìn)行通信。除了提交任務(wù),SparkClient還提供了諸如添加Jar包、獲取集群信息等接 口。如果客戶端需要使用更一般的SparkContext的功能,可以自定義一個任務(wù)并通過SparkClient發(fā)送到RemoteDriver上執(zhí) 行。
理論上來說,Hive on Spark對于Spark集群的部署方式?jīng)]有特別的要求,除了local以外,RemoteDriver可以連接到任意的Spark集群來執(zhí)行任務(wù)。在我 們的測試中,Hive on Spark在Standalone和Spark on YARN的集群上都能正常工作(需要動態(tài)添加Jar包的查詢在yarn-cluster模式下還不能運(yùn)行,請參考HIVE-9425)。
優(yōu)化
Yarn的配置
yarn.nodemanager.resource.cpu-vcores和yarn.nodemanager.resource.memory-mb,這兩個參數(shù)決定這集群資源管理器能夠有多少資源用于運(yùn)行yarn上的任務(wù)。這兩個參數(shù)的值是由機(jī)器的配置及同時在機(jī)器上運(yùn)行的其它進(jìn)程共同決定。本文假設(shè)僅有hdfs的datanode和yarn的nodemanager運(yùn)行于該節(jié)點(diǎn)。
配置cores
基本配置是datanode和nodemanager各一個核,操作系統(tǒng)兩個核,然后剩下28核配置作為yarn資源。也即是yarn.nodemanager.resource.cpu-vcores=28
配置內(nèi)存
對于內(nèi)存,預(yù)留20GB給操作系統(tǒng),datanode,nodemanager,剩余100GB作為yarn資源。也即是 yarn.nodemanager.resource.memory-mb=100*1024
Spark配置
假設(shè)Yarn節(jié)點(diǎn)機(jī)器配置,假設(shè)有32核,120GB內(nèi)存。
給Yarn分配資源以后,那就要想著spark如何使用這些資源了,主要配置對象:
execurtor 和driver內(nèi)存,executro配額,并行度。
executor內(nèi)存
設(shè)置executor內(nèi)存需要考慮如下因素:
executor內(nèi)存越多,越能為更多的查詢提供map join的優(yōu)化。由于垃圾回收的壓力會導(dǎo)致開銷增加。
某些情況下hdfs的客戶端不能很好的處理并發(fā)寫入,所以過多的核心可能會導(dǎo)致競爭。
為了最大化使用core,建議將core設(shè)置為4,5,6(多核心會導(dǎo)致并發(fā)問題,所以寫代碼的時候尤其是靜態(tài)的鏈接等要考慮并發(fā)問題)具體分配核心數(shù)要結(jié)合yarn所提供的核心數(shù)。由于本文中涉及到的node節(jié)點(diǎn)是28核,那么很明顯分配為4的化可以被整除,spark.executor.cores設(shè)置為4 不會有多余的核剩下,設(shè)置為5,6都會有core剩余。spark.executor.cores=4,由于總共有28個核,那么最大可以申請的executor數(shù)是7。總內(nèi)存處以7,也即是 100/7,可以得到每個executor約14GB內(nèi)存。
要知道 spark.executor.memory 和spark.executor.memoryOverhead共同決定著executor內(nèi)存。建議spark.executor.memoryOverhead站總內(nèi)存的 15%-20%。那么最終spark.executor.memoryOverhead=2G和spark.executor.memory=12G.
根據(jù)上面的配置的化,每個主機(jī)就可以申請7個executor,每個executor可以運(yùn)行4個任務(wù),每個core一個task。那么每個task的平均內(nèi)存是 14/4 = 3.5GB。在executor運(yùn)行的task共享內(nèi)存。其實(shí),executor內(nèi)部是用newCachedThreadPool運(yùn)行task的。
確保spark.executor.memoryOverhead和spark.executor.memory的和不超過yarn.scheduler.maximum-allocation-mb。
driver內(nèi)存
對于drvier的內(nèi)存配置,當(dāng)然也包括兩個參數(shù)。
spark.driver.memoryOverhead 每個driver能從yarn申請的堆外內(nèi)存的大小。
spark.driver.memory 當(dāng)運(yùn)行hive on spark的時候,每個spark driver能申請的最大jvm 堆內(nèi)存。該參數(shù)結(jié)合 spark.driver.memoryOverhead共同決定著driver的內(nèi)存大小。
driver的內(nèi)存大小并不直接影響性能,但是也不要job的運(yùn)行受限于driver的內(nèi)存. 這里給出spark driver內(nèi)存申請的方案,假設(shè)yarn.nodemanager.resource.memory-mb是 X。
driver內(nèi)存申請12GB,假設(shè) X > 50GB driver內(nèi)存申請 4GB,假設(shè) 12GB < X <50GB driver內(nèi)存申請1GB,假設(shè) 1GB < X < 12 GB driver內(nèi)存申請256MB,假設(shè) X < 1GB
這些數(shù)值是spark.driver.memory和 spark.driver.memoryOverhead內(nèi)存的總和。對外內(nèi)存站總內(nèi)存的10%-15%。假設(shè) yarn.nodemanager.resource.memory-mb=100*1024MB,那么driver內(nèi)存設(shè)置為12GB,此時 spark.driver.memory=10.5gb和spark.driver.memoryOverhead=1.5gb
注意,資源多少直接對應(yīng)的是數(shù)據(jù)量的大小。所以要結(jié)合資源和數(shù)據(jù)量進(jìn)行適當(dāng)縮減和增加。
executor數(shù)
executor的數(shù)目是由每個節(jié)點(diǎn)運(yùn)行的executor數(shù)目和集群的節(jié)點(diǎn)數(shù)共同決定。如果你有四十個節(jié)點(diǎn),那么hive可以使用的最大executor數(shù)就是 280(40*7). 最大數(shù)目可能比這個小點(diǎn),因?yàn)閐river也會消耗1core和12GB。
當(dāng)前假設(shè)是沒有yarn應(yīng)用在跑。
Hive性能與用于運(yùn)行查詢的executor數(shù)量直接相關(guān)。但是,不通查詢還是不通。通常,性能與executor的數(shù)量成比例。例如,查詢使用四個executor大約需要使用兩個executor的一半時間。但是,性能在一定數(shù)量的executor中達(dá)到峰值,高于此值時,增加數(shù)量不會改善性能并且可能產(chǎn)生不利影響。
在大多數(shù)情況下,使用一半的集群容量(executor數(shù)量的一半)可以提供良好的性能。為了獲得最佳性能,最好使用所有可用的executor。例如,設(shè)置spark.executor.instances = 280。對于基準(zhǔn)測試和性能測量,強(qiáng)烈建議這樣做。
動態(tài)executor申請
雖然將spark.executor.instances設(shè)置為最大值通常可以最大限度地提高性能,但不建議在多個用戶運(yùn)行Hive查詢的生產(chǎn)環(huán)境中這樣做。避免為用戶會話分配固定數(shù)量的executor,因?yàn)槿绻鹐xecutor空閑,executor不能被其他用戶查詢使用。在生產(chǎn)環(huán)境中,應(yīng)該好好計(jì)劃executor分配,以允許更多的資源共享。
Spark允許您根據(jù)工作負(fù)載動態(tài)擴(kuò)展分配給Spark應(yīng)用程序的集群資源集。要啟用動態(tài)分配,請按照動態(tài)分配中的步驟進(jìn)行操作。除了在某些情況下,強(qiáng)烈建議啟用動態(tài)分配。
并行度
要使可用的executor得到充分利用,必須同時運(yùn)行足夠的任務(wù)(并行)。在大多數(shù)情況下,Hive會自動確定并行度,但也可以在調(diào)優(yōu)并發(fā)度方面有一些控制權(quán)。在輸入端,map任務(wù)的數(shù)量等于輸入格式生成的split數(shù)。對于Hive on Spark,輸入格式為CombineHiveInputFormat,它可以根據(jù)需要對基礎(chǔ)輸入格式生成的split進(jìn)行分組。可以更好地控制stage邊界的并行度。調(diào)整hive.exec.reducers.bytes.per.reducer以控制每個reducer處理的數(shù)據(jù)量,Hive根據(jù)可用的executor,執(zhí)行程序內(nèi)存,以及其他因素來確定最佳分區(qū)數(shù)。實(shí)驗(yàn)表明,只要生成足夠的任務(wù)來保持所有可用的executor繁忙,Spark就比MapReduce對hive.exec.reducers.bytes.per.reducer指定的值敏感度低。為獲得最佳性能,請為該屬性選擇一個值,以便Hive生成足夠的任務(wù)以完全使用所有可用的executor。
Hive配置
Hive on spark 共享了很多hive性能相關(guān)的配置。可以像調(diào)優(yōu)hive on mapreduce一樣調(diào)優(yōu)hive on spark。然而,hive.auto.convert.join.noconditionaltask.size是基于統(tǒng)計(jì)信息將基礎(chǔ)join轉(zhuǎn)化為map join的閾值,可能會對性能產(chǎn)生重大影響。盡管該配置可以用hive on mr和hive on spark,但是兩者的解釋不同。
數(shù)據(jù)的大小有兩個統(tǒng)計(jì)指標(biāo):
totalSize- 數(shù)據(jù)在磁盤上的近似大小 rawDataSize- 數(shù)據(jù)在內(nèi)存中的近似大小
hive on mr用的是totalSize。hive on spark使用的是rawDataSize。由于可能存在壓縮和序列化,這兩個值會有較大的差別。對于hive on spark 需要將 hive.auto.convert.join.noconditionaltask.size指定為更大的值,才能將與hive on mr相同的join轉(zhuǎn)化為map join。
可以增加此參數(shù)的值,以使地圖連接轉(zhuǎn)換更具兇猛。將common join 轉(zhuǎn)換為 map join 可以提高性能。如果此值設(shè)置得太大,則來自小表的數(shù)據(jù)將使用過多內(nèi)存,任務(wù)可能會因內(nèi)存不足而失敗。根據(jù)群集環(huán)境調(diào)整此值。
通過參數(shù) hive.stats.collect.rawdatasize 可以控制是否收集 rawDataSize 統(tǒng)計(jì)信息。
對于hiveserver2,建議再配置兩個額外的參數(shù): hive.stats.fetch.column.stats=true 和 hive.optimize.index.filter=true.
Hive性能調(diào)優(yōu)通常建議使用以下屬性:
hive.optimize.reducededuplication.min.reducer=4
hive.optimize.reducededuplication=true
hive.merge.mapfiles=true
hive.merge.mapredfiles=false
hive.merge.smallfiles.avgsize=16000000
hive.merge.size.per.task=256000000
hive.merge.sparkfiles=true
hive.auto.convert.join=true
hive.auto.convert.join.noconditionaltask=true
hive.auto.convert.join.noconditionaltask.size=20M(might?need?to?increase?for?Spark,?200M)
hive.optimize.bucketmapjoin.sortedmerge=false
hive.map.aggr.hash.percentmemory=0.5
hive.map.aggr=true
hive.optimize.sort.dynamic.partition=false
hive.stats.autogather=true
hive.stats.fetch.column.stats=true
hive.compute.query.using.stats=true
hive.limit.pushdown.memory.usage=0.4?(MR?and?Spark)
hive.optimize.index.filter=true
hive.exec.reducers.bytes.per.reducer=67108864
hive.smbjoin.cache.rows=10000
hive.fetch.task.conversion=more
hive.fetch.task.conversion.threshold=1073741824
hive.optimize.ppd=true
預(yù)啟動YARN容器
在開始新會話后提交第一個查詢時,在查看查詢開始之前可能會遇到稍長的延遲。還會注意到,如果再次運(yùn)行相同的查詢,它的完成速度比第一個快得多。
Spark執(zhí)行程序需要額外的時間來啟動和初始化yarn上的Spark,這會導(dǎo)致較長的延遲。此外,Spark不會等待所有executor在啟動作業(yè)之前全部啟動完成,因此在將作業(yè)提交到群集后,某些executor可能仍在啟動。但是,對于在Spark上運(yùn)行的作業(yè),作業(yè)提交時可用executor的數(shù)量部分決定了reducer的數(shù)量。當(dāng)就緒executor的數(shù)量未達(dá)到最大值時,作業(yè)可能沒有最大并行度。這可能會進(jìn)一步影響第一個查詢的性能。
在用戶較長期會話中,這個額外時間不會導(dǎo)致任何問題,因?yàn)樗辉诘谝淮尾樵儓?zhí)行時發(fā)生。然而,諸如Oozie發(fā)起的Hive工作之類的短期繪畫可能無法實(shí)現(xiàn)最佳性能。
為減少啟動時間,可以在作業(yè)開始前啟用容器預(yù)熱。只有在請求的executor準(zhǔn)備就緒時,作業(yè)才會開始運(yùn)行。這樣,在reduce那一側(cè)不會減少短會話的并行性。
要啟用預(yù)熱功能,請?jiān)诎l(fā)出查詢之前將hive.prewarm.enabled設(shè)置為true。還可以通過設(shè)置hive.prewarm.numcontainers來設(shè)置容器數(shù)量。默認(rèn)值為10。
預(yù)熱的executor的實(shí)際數(shù)量受spark.executor.instances(靜態(tài)分配)或spark.dynamicAllocation.maxExecutors(動態(tài)分配)的值限制。hive.prewarm.numcontainers的值不應(yīng)超過分配給用戶會話的值。
注意:預(yù)熱需要幾秒鐘,對于短會話來說是一個很好的做法,特別是如果查詢涉及reduce階段。但是,如果hive.prewarm.numcontainers的值高于群集中可用的值,則該過程最多可能需要30秒。請謹(jǐn)慎使用預(yù)熱。
另外,一個完整調(diào)優(yōu)案例你可以參考:https://blog.csdn.net/u010010664/article/details/77066031

八千里路云和月 | 從零到大數(shù)據(jù)專家學(xué)習(xí)路徑指南
我們在學(xué)習(xí)Flink的時候,到底在學(xué)習(xí)什么?
193篇文章暴揍Flink,這個合集你需要關(guān)注一下
Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS
Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn)
我們在學(xué)習(xí)Spark的時候,到底在學(xué)習(xí)什么?
在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!
硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)
4萬字長文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
【面試&個人成長】2021年過半,社招和校招的經(jīng)驗(yàn)之談
大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)
我寫過的關(guān)于成長/面試/職場進(jìn)階的文章
你好,我是王知無,一個大數(shù)據(jù)領(lǐng)域的硬核原創(chuàng)作者。
做過后端架構(gòu)、數(shù)據(jù)中間件、數(shù)據(jù)平臺&架構(gòu)、算法工程化。
專注大數(shù)據(jù)領(lǐng)域?qū)崟r動態(tài)&技術(shù)提升&個人成長&職場進(jìn)階,歡迎關(guān)注。
