<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Hive計(jì)算引擎大PK,萬字長文解析MapRuce、Tez、Spark三大引擎

          共 30724字,需瀏覽 62分鐘

           ·

          2021-10-26 03:08

          點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

          回復(fù)”面試“獲取更多驚喜

          前言

          Hive從2008年始于FaceBook工程師之手,經(jīng)過10幾年的發(fā)展至今保持強(qiáng)大的生命力。截止目前Hive已經(jīng)更新至3.1.x版本,Hive從最開始的為人詬病的速度慢迅速發(fā)展,開始支持更多的計(jì)算引擎,計(jì)算速度大大提升。

          本文我們將從原理、應(yīng)用、調(diào)優(yōu)分別講解Hive所支持的MapReduce、Tez、Spark引擎。


          MapReduce引擎

          我們在之前的文章中:

          對Hive的MapReduce引擎已經(jīng)做過非常詳細(xì)的講解了。

          本文首發(fā)自公眾號:

          《import_bigdata》,大數(shù)據(jù)技術(shù)與架構(gòu)。

          在Hive2.x版本中,HiveSQL會被轉(zhuǎn)化為MR任務(wù),這也是我們經(jīng)常說的HiveSQL的執(zhí)行原理。

          我們先來看下 Hive 的底層執(zhí)行架構(gòu)圖, Hive 的主要組件與 Hadoop 交互的過程:

          Hive底層執(zhí)行架構(gòu)

          在 Hive 這一側(cè),總共有五個組件:

          • UI:用戶界面。可看作我們提交SQL語句的命令行界面。
          • DRIVER:驅(qū)動程序。接收查詢的組件。該組件實(shí)現(xiàn)了會話句柄的概念。
          • COMPILER:編譯器。負(fù)責(zé)將 SQL 轉(zhuǎn)化為平臺可執(zhí)行的執(zhí)行計(jì)劃。對不同的查詢塊和查詢表達(dá)式進(jìn)行語義分析,并最終借助表和從 metastore 查找的分區(qū)元數(shù)據(jù)來生成執(zhí)行計(jì)劃。
          • METASTORE:元數(shù)據(jù)庫。存儲 Hive 中各種表和分區(qū)的所有結(jié)構(gòu)信息。
          • EXECUTION ENGINE:執(zhí)行引擎。負(fù)責(zé)提交 COMPILER 階段編譯好的執(zhí)行計(jì)劃到不同的平臺上。

          上圖的基本流程是:

          • 步驟1:UI 調(diào)用 DRIVER 的接口;

          • 步驟2:DRIVER 為查詢創(chuàng)建會話句柄,并將查詢發(fā)送到 COMPILER(編譯器)生成執(zhí)行計(jì)劃;

          • 步驟3和4:編譯器從元數(shù)據(jù)存儲中獲取本次查詢所需要的元數(shù)據(jù),該元數(shù)據(jù)用于對查詢樹中的表達(dá)式進(jìn)行類型檢查,以及基于查詢謂詞修建分區(qū);

          • 步驟5:編譯器生成的計(jì)劃是分階段的DAG,每個階段要么是 map/reduce 作業(yè),要么是一個元數(shù)據(jù)或者HDFS上的操作。將生成的計(jì)劃發(fā)給 DRIVER。

          如果是 map/reduce 作業(yè),該計(jì)劃包括 map operator trees 和一個 ?reduce operator tree,執(zhí)行引擎將會把這些作業(yè)發(fā)送給 MapReduce :

          • 步驟6、6.1、6.2和6.3:執(zhí)行引擎將這些階段提交給適當(dāng)?shù)慕M件。在每個 task(mapper/reducer) 中,從HDFS文件中讀取與表或中間輸出相關(guān)聯(lián)的數(shù)據(jù),并通過相關(guān)算子樹傳遞這些數(shù)據(jù)。最終這些數(shù)據(jù)通過序列化器寫入到一個臨時HDFS文件中(如果不需要 reduce 階段,則在 map 中操作)。臨時文件用于向計(jì)劃中后面的 map/reduce 階段提供數(shù)據(jù)。

          • 步驟7、8和9:最終的臨時文件將移動到表的位置,確保不讀取臟數(shù)據(jù)(文件重命名在HDFS中是原子操作)。對于用戶的查詢,臨時文件的內(nèi)容由執(zhí)行引擎直接從HDFS讀取,然后通過Driver發(fā)送到UI。


          Hive SQL 編譯成 MapReduce 過程

          美團(tuán)博客中有一篇非常詳細(xì)的博客講解《Hive SQL的編譯過程》。

          你可以參考:

          https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html

          編譯 SQL 的任務(wù)是在上節(jié)中介紹的 COMPILER(編譯器組件)中完成的。Hive將SQL轉(zhuǎn)化為MapReduce任務(wù),整個編譯過程分為六個階段:

          Hive SQL編譯過程
          1. 詞法、語法解析: Antlr 定義 SQL 的語法規(guī)則,完成 SQL 詞法,語法解析,將 SQL 轉(zhuǎn)化為抽象語法樹 AST Tree;

          Antlr是一種語言識別的工具,可以用來構(gòu)造領(lǐng)域語言。使用Antlr構(gòu)造特定的語言只需要編寫一個語法文件,定義詞法和語法替換規(guī)則即可,Antlr完成了詞法分析、語法分析、語義分析、中間代碼生成的過程。

          1. 語義解析: 遍歷 AST Tree,抽象出查詢的基本組成單元 QueryBlock;

          2. 生成邏輯執(zhí)行計(jì)劃: 遍歷 QueryBlock,翻譯為執(zhí)行操作樹 OperatorTree;

          3. 優(yōu)化邏輯執(zhí)行計(jì)劃: 邏輯層優(yōu)化器進(jìn)行 OperatorTree 變換,合并 Operator,達(dá)到減少 MapReduce Job,減少數(shù)據(jù)傳輸及 shuffle 數(shù)據(jù)量;

          4. 生成物理執(zhí)行計(jì)劃: 遍歷 OperatorTree,翻譯為 MapReduce 任務(wù);

          5. 優(yōu)化物理執(zhí)行計(jì)劃: 物理層優(yōu)化器進(jìn)行 MapReduce 任務(wù)的變換,生成最終的執(zhí)行計(jì)劃。

          下面對這六個階段詳細(xì)解析:

          為便于理解,我們拿一個簡單的查詢語句進(jìn)行展示,對5月23號的地區(qū)維表進(jìn)行查詢:

          select?*?from?dim.dim_region?where?dt?=?'2021-05-23';

          階段一:詞法、語法解析

          根據(jù)Antlr定義的sql語法規(guī)則,將相關(guān)sql進(jìn)行詞法、語法解析,轉(zhuǎn)化為抽象語法樹AST Tree:

          ABSTRACT?SYNTAX?TREE:
          TOK_QUERY
          ????TOK_FROM?
          ????TOK_TABREF
          ???????????TOK_TABNAME
          ???????????????dim
          ?????????????????dim_region
          ????TOK_INSERT
          ??????TOK_DESTINATION
          ??????????TOK_DIR
          ??????????????TOK_TMP_FILE
          ????????TOK_SELECT
          ??????????TOK_SELEXPR
          ??????????????TOK_ALLCOLREF
          ????????TOK_WHERE
          ??????????=
          ??????????????TOK_TABLE_OR_COL
          ??????????????????dt
          ????????????????????'2021-05-23'

          階段二:語義解析

          遍歷AST Tree,抽象出查詢的基本組成單元QueryBlock:

          AST Tree生成后由于其復(fù)雜度依舊較高,不便于翻譯為mapreduce程序,需要進(jìn)行進(jìn)一步抽象和結(jié)構(gòu)化,形成QueryBlock。

          QueryBlock是一條SQL最基本的組成單元,包括三個部分:輸入源,計(jì)算過程,輸出。簡單來講一個QueryBlock就是一個子查詢。

          QueryBlock的生成過程為一個遞歸過程,先序遍歷 AST Tree ,遇到不同的 Token 節(jié)點(diǎn)(理解為特殊標(biāo)記),保存到相應(yīng)的屬性中。

          階段三:生成邏輯執(zhí)行計(jì)劃

          遍歷QueryBlock,翻譯為執(zhí)行操作樹OperatorTree:

          Hive最終生成的MapReduce任務(wù),Map階段和Reduce階段均由OperatorTree組成。

          基本的操作符包括:

          TableScanOperator

          SelectOperator

          FilterOperator

          JoinOperator

          GroupByOperator

          ReduceSinkOperator`

          Operator在Map Reduce階段之間的數(shù)據(jù)傳遞都是一個流式的過程。每一個Operator對一行數(shù)據(jù)完成操作后之后將數(shù)據(jù)傳遞給childOperator計(jì)算。

          由于Join/GroupBy/OrderBy均需要在Reduce階段完成,所以在生成相應(yīng)操作的Operator之前都會先生成一個ReduceSinkOperator,將字段組合并序列化為Reduce Key/value, Partition Key。

          階段四:優(yōu)化邏輯執(zhí)行計(jì)劃

          Hive中的邏輯查詢優(yōu)化可以大致分為以下幾類:

          • 投影修剪
          • 推導(dǎo)傳遞謂詞
          • 謂詞下推
          • 將Select-Select,F(xiàn)ilter-Filter合并為單個操作
          • 多路 Join
          • 查詢重寫以適應(yīng)某些列值的Join傾斜

          階段五:生成物理執(zhí)行計(jì)劃

          生成物理執(zhí)行計(jì)劃即是將邏輯執(zhí)行計(jì)劃生成的OperatorTree轉(zhuǎn)化為MapReduce Job的過程,主要分為下面幾個階段:

          1. 對輸出表生成MoveTask
          2. 從OperatorTree的其中一個根節(jié)點(diǎn)向下深度優(yōu)先遍歷
          3. ReduceSinkOperator標(biāo)示Map/Reduce的界限,多個Job間的界限
          4. 遍歷其他根節(jié)點(diǎn),遇過碰到JoinOperator合并MapReduceTask
          5. 生成StatTask更新元數(shù)據(jù)
          6. 剪斷Map與Reduce間的Operator的關(guān)系

          階段六:優(yōu)化物理執(zhí)行計(jì)劃

          Hive中的物理優(yōu)化可以大致分為以下幾類:

          • 分區(qū)修剪(Partition Pruning)
          • 基于分區(qū)和桶的掃描修剪(Scan pruning)
          • 如果查詢基于抽樣,則掃描修剪
          • 在某些情況下,在 map 端應(yīng)用 Group By
          • 在 mapper 上執(zhí)行 Join
          • 優(yōu)化 Union,使Union只在 map 端執(zhí)行
          • 在多路 Join 中,根據(jù)用戶提示決定最后流哪個表
          • 刪除不必要的 ReduceSinkOperators
          • 對于帶有Limit子句的查詢,減少需要為該表掃描的文件數(shù)
          • 對于帶有Limit子句的查詢,通過限制 ReduceSinkOperator 生成的內(nèi)容來限制來自 mapper 的輸出
          • 減少用戶提交的SQL查詢所需的Tez作業(yè)數(shù)量
          • 如果是簡單的提取查詢,避免使用MapReduce作業(yè)
          • 對于帶有聚合的簡單獲取查詢,執(zhí)行不帶 MapReduce 任務(wù)的聚合
          • 重寫 Group By 查詢使用索引表代替原來的表
          • 當(dāng)表掃描之上的謂詞是相等謂詞且謂詞中的列具有索引時,使用索引掃描

          經(jīng)過以上六個階段,SQL 就被解析映射成了集群上的 MapReduce 任務(wù)。


          Explain語法

          Hive Explain 語句類似Mysql 的Explain 語句,提供了對應(yīng)查詢的執(zhí)行計(jì)劃,對于我們在理解Hive底層邏輯、Hive調(diào)優(yōu)、Hive SQL書寫等方面提供了一個參照,在我們的生產(chǎn)工作了是一個很有意義的工具。

          Hive Explain語法

          EXPLAIN [EXTENDED|CBO|AST|DEPENDENCY|AUTHORIZATION|LOCKS|VECTORIZATION|ANALYZE] query

          Hive Explain的語法規(guī)則如上,后面將按照對應(yīng)的子句進(jìn)行探討。

          EXTENDED 語句會在執(zhí)行計(jì)劃中產(chǎn)生關(guān)于算子(Operator)的額外信息,這些信息都是典型的物理信息,如文件名稱等。

          在執(zhí)行Explain QUERY 之后,一個查詢會被轉(zhuǎn)化為包含多個Stage的語句(看起來更像一個DAG)。這些Stages要么是map/reduce Stage,要么是做些元數(shù)據(jù)或文件系統(tǒng)操作的Stage (如 move 、rename等)。Explain的輸出包含2個部分:

          • 執(zhí)行計(jì)劃不同Stage之間的以來關(guān)系(Dependency)
          • 每個Stage的執(zhí)行描述信息(Description)

          以下將通過一個簡單的例子進(jìn)行解釋。

          執(zhí)行Explain 語句

          EXPLAIN?
          SELECT?SUM(id)?FROM?test1;

          Explain輸出結(jié)果解析

          • 依賴圖
          STAGE?DEPENDENCIES:
          ??Stage-1?is?a?root?stage
          ??Stage-0?depends?on?stages:?Stage-1

          STAGE?PLANS:
          ??Stage:?Stage-1
          ????Map?Reduce
          ??????Map?Operator?Tree:
          ??????????TableScan
          ????????????alias:?test1
          ????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????Select?Operator
          ??????????????expressions:?id?(type:?int)
          ??????????????outputColumnNames:?id
          ??????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????????Group?By?Operator
          ????????????????aggregations:?sum(id)
          ????????????????mode:?hash
          ????????????????outputColumnNames:?_col0
          ????????????????Statistics:?Num?rows:?1?Data?size:?8?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????????Reduce?Output?Operator
          ??????????????????sort?order:
          ??????????????????Statistics:?Num?rows:?1?Data?size:?8?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????????????value?expressions:?_col0?(type:?bigint)
          ??????Reduce?Operator?Tree:
          ????????Group?By?Operator
          ??????????aggregations:?sum(VALUE._col0)
          ??????????mode:?mergepartial
          ??????????outputColumnNames:?_col0
          ??????????Statistics:?Num?rows:?1?Data?size:?8?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????File?Output?Operator
          ????????????compressed:?false
          ????????????Statistics:?Num?rows:?1?Data?size:?8?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????table:
          ????????????????input?format:?org.apache.hadoop.mapred.SequenceFileInputFormat
          ????????????????output?format:?org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
          ????????????????serde:?org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

          ??Stage:?Stage-0
          ????Fetch?Operator
          ??????limit:?-1
          ??????Processor?Tree:
          ????????ListSink

          一個HIVE查詢被轉(zhuǎn)換為一個由一個或多個stage組成的序列(有向無環(huán)圖DAG)。這些stage可以是MapReduce stage,也可以是負(fù)責(zé)元數(shù)據(jù)存儲的stage,也可以是負(fù)責(zé)文件系統(tǒng)的操作(比如移動和重命名)的stage。

          我們將上述結(jié)果拆分看,先從最外層開始,包含兩個大的部分:

          • stage dependencies:各個stage之間的依賴性
          • stage plan:各個stage的執(zhí)行計(jì)劃

          先看第一部分 stage dependencies ,包含兩個 stage,Stage-1 是根stage,說明這是開始的stage,Stage-0 依賴 Stage-1,Stage-1執(zhí)行完成后執(zhí)行Stage-0。

          再看第二部分 stage plan,里面有一個 Map Reduce,一個MR的執(zhí)行計(jì)劃分為兩個部分

          • Map Operator Tree:MAP端的執(zhí)行計(jì)劃樹
          • Reduce Operator Tree:Reduce端的執(zhí)行計(jì)劃樹

          這兩個執(zhí)行計(jì)劃樹里面包含這條sql語句的 operator

          1. TableScan:表掃描操作,map端第一個操作肯定是加載表,所以就是表掃描操作,常見的屬性:
          alias:表名稱
          Statistics:表統(tǒng)計(jì)信息,包含表中數(shù)據(jù)條數(shù),數(shù)據(jù)大小等
          1. Select Operator:選取操作,常見的屬性 :
          expressions:需要的字段名稱及字段類型
          outputColumnNames:輸出的列名稱
          Statistics:表統(tǒng)計(jì)信息,包含表中數(shù)據(jù)條數(shù),數(shù)據(jù)大小等
          1. Group By Operator:分組聚合操作,常見的屬性:
          aggregations:顯示聚合函數(shù)信息.
          mode:聚合模式,值有?hash:隨機(jī)聚合,就是hash?partition;partial:局部聚合;final:最終聚合.
          keys:分組的字段,如果沒有分組,則沒有此字段.
          outputColumnNames:聚合之后輸出列名.
          Statistics:表統(tǒng)計(jì)信息,包含分組聚合之后的數(shù)據(jù)條數(shù),數(shù)據(jù)大小等.
          1. Reduce Output Operator:輸出到reduce操作,常見屬性:
          sort order:值為空?不排序;值為?+?正序排序,值為?-?倒序排序;值為?±?排序的列為兩列,第一列為正序,第二列為倒序.
          1. Filter Operator:過濾操作,常見的屬性:
          predicate:過濾條件,如sql語句中的where?id>=1,則此處顯示(id?>=?1).
          1. Map Join Operator:join 操作,常見的屬性:
          condition map:join方式?,如Inner Join 0 to 1 Left Outer Join0 to 2
          keys:?join?的條件字段
          outputColumnNames:join 完成之后輸出的字段
          Statistics:join 完成之后生成的數(shù)據(jù)條數(shù),大小等
          1. File Output Operator:文件輸出操作,常見的屬性:
          compressed:是否壓縮
          table:表的信息,包含輸入輸出文件格式化方式,序列化方式等
          1. Fetch Operator 客戶端獲取數(shù)據(jù)操作,常見的屬性:
          limit,值為?-1?表示不限制條數(shù),其他值為限制的條數(shù)

          Explain使用場景

          那么Explain能夠?yàn)槲覀冊谏a(chǎn)實(shí)踐中帶來哪些便利及解決我們哪些迷惑呢?

          本文首發(fā)自公眾號:

          《import_bigdata》,大數(shù)據(jù)技術(shù)與架構(gòu)。

          join 語句會過濾 Null 的值嗎?

          現(xiàn)在,我們在hive cli 輸入以下查詢計(jì)劃語句

          select?a.id,b.user_name?from?test1?a?join?test2?b?on?a.id=b.id;

          然后執(zhí)行:

          explain?select?a.id,b.user_name?from?test1?a?join?test2?b?on?a.id=b.id;

          我們來看結(jié)果:

          TableScan
          ?alias:?a
          ?Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
          ?Filter?Operator
          ????predicate:?id?is?not?null?(type:?boolean)
          ????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????Select?Operator
          ????????expressions:?id?(type:?int)
          ????????outputColumnNames:?_col0
          ????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????HashTable?Sink?Operator
          ???????????keys:
          ?????????????0?_col0?(type:?int)
          ?????????????1?_col0?(type:?int)
          ?...

          從上述結(jié)果可以看到 predicate: id is not null 這樣一行,說明 join 時會自動過濾掉關(guān)聯(lián)字段為 null 值的情況,但 left join 或 full join 是不會自動過濾null值的,大家可以自行嘗試下。

          group by 分組語句會進(jìn)行排序嗎?
          select?id,max(user_name)?from?test1?group?by?id;

          直接來看 explain 之后結(jié)果:

          TableScan
          ????alias:?test1
          ????Statistics:?Num?rows:?9?Data?size:?108?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????Select?Operator
          ????????expressions:?id?(type:?int),?user_name?(type:?string)
          ????????outputColumnNames:?id,?user_name
          ????????Statistics:?Num?rows:?9?Data?size:?108?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????Group?By?Operator
          ???????????aggregations:?max(user_name)
          ???????????keys:?id?(type:?int)
          ???????????mode:?hash
          ???????????outputColumnNames:?_col0,?_col1
          ???????????Statistics:?Num?rows:?9?Data?size:?108?Basic?stats:?COMPLETE?Column?stats:?NONE
          ???????????Reduce?Output?Operator
          ?????????????key?expressions:?_col0?(type:?int)
          ?????????????sort?order:?+
          ?????????????Map-reduce?partition?columns:?_col0?(type:?int)
          ?????????????Statistics:?Num?rows:?9?Data?size:?108?Basic?stats:?COMPLETE?Column?stats:?NONE
          ?????????????value?expressions:?_col1?(type:?string)
          ?...

          我們看 Group By Operator,里面有 keys: id (type: int) 說明按照 id 進(jìn)行分組的,再往下看還有 sort order: + ,說明是按照 id 字段進(jìn)行正序排序的。

          哪條sql執(zhí)行效率高

          觀察如下兩條sql:

          SELECT
          ?a.id,
          ?b.user_name
          FROM
          ?test1?a
          JOIN?test2?b?ON?a.id?=?b.id
          WHERE
          ?a.id?>?2;
          SELECT
          ?a.id,
          ?b.user_name
          FROM
          ?(SELECT?*?FROM?test1?WHERE?id?>?2)?a
          JOIN?test2?b?ON?a.id?=?b.id;

          這兩條sql語句輸出的結(jié)果是一樣的,但是哪條sql執(zhí)行效率高呢?

          有人說第一條sql執(zhí)行效率高,因?yàn)榈诙lsql有子查詢,子查詢會影響性能;有人說第二條sql執(zhí)行效率高,因?yàn)橄冗^濾之后,在進(jìn)行join時的條數(shù)減少了,所以執(zhí)行效率就高了。到底哪條sql效率高呢,我們直接在sql語句前面加上 explain,看下執(zhí)行計(jì)劃不就知道了嘛!

          在第一條sql語句前加上 explain,得到如下結(jié)果:

          hive?(default)>?explain?select?a.id,b.user_name?from?test1?a?join?test2?b?on?a.id=b.id?where?a.id?>2;
          OK
          Explain
          STAGE?DEPENDENCIES:
          ??Stage-4?is?a?root?stage
          ??Stage-3?depends?on?stages:?Stage-4
          ??Stage-0?depends?on?stages:?Stage-3

          STAGE?PLANS:
          ??Stage:?Stage-4
          ????Map?Reduce?Local?Work
          ??????Alias?->?Map?Local?Tables:
          ????????$hdt$_0:a
          ??????????Fetch?Operator
          ????????????limit:?-1
          ??????Alias?->?Map?Local?Operator?Tree:
          ????????$hdt$_0:a
          ??????????TableScan
          ????????????alias:?a
          ????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????Filter?Operator
          ??????????????predicate:?(id?>?2)?(type:?boolean)
          ??????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????????Select?Operator
          ????????????????expressions:?id?(type:?int)
          ????????????????outputColumnNames:?_col0
          ????????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????????HashTable?Sink?Operator
          ??????????????????keys:
          ????????????????????0?_col0?(type:?int)
          ????????????????????1?_col0?(type:?int)

          ??Stage:?Stage-3
          ????Map?Reduce
          ??????Map?Operator?Tree:
          ??????????TableScan
          ????????????alias:?b
          ????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????Filter?Operator
          ??????????????predicate:?(id?>?2)?(type:?boolean)
          ??????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????????Select?Operator
          ????????????????expressions:?id?(type:?int),?user_name?(type:?string)
          ????????????????outputColumnNames:?_col0,?_col1
          ????????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????????Map?Join?Operator
          ??????????????????condition?map:
          ???????????????????????Inner?Join?0?to?1
          ??????????????????keys:
          ????????????????????0?_col0?(type:?int)
          ????????????????????1?_col0?(type:?int)
          ??????????????????outputColumnNames:?_col0,?_col2
          ??????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????????????Select?Operator
          ????????????????????expressions:?_col0?(type:?int),?_col2?(type:?string)
          ????????????????????outputColumnNames:?_col0,?_col1
          ????????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????????????File?Output?Operator
          ??????????????????????compressed:?false
          ??????????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????????????????table:
          ??????????????????????????input?format:?org.apache.hadoop.mapred.SequenceFileInputFormat
          ??????????????????????????output?format:?org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
          ??????????????????????????serde:?org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
          ??????Local?Work:
          ????????Map?Reduce?Local?Work

          ??Stage:?Stage-0
          ????Fetch?Operator
          ??????limit:?-1
          ??????Processor?Tree:
          ????????ListSink

          在第二條sql語句前加上 explain,得到如下結(jié)果:

          hive?(default)>?explain?select?a.id,b.user_name?from(select?*?from??test1?where?id>2?)?a?join?test2?b?on?a.id=b.id;
          OK
          Explain
          STAGE?DEPENDENCIES:
          ??Stage-4?is?a?root?stage
          ??Stage-3?depends?on?stages:?Stage-4
          ??Stage-0?depends?on?stages:?Stage-3

          STAGE?PLANS:
          ??Stage:?Stage-4
          ????Map?Reduce?Local?Work
          ??????Alias?->?Map?Local?Tables:
          ????????$hdt$_0:test1
          ??????????Fetch?Operator
          ????????????limit:?-1
          ??????Alias?->?Map?Local?Operator?Tree:
          ????????$hdt$_0:test1
          ??????????TableScan
          ????????????alias:?test1
          ????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????Filter?Operator
          ??????????????predicate:?(id?>?2)?(type:?boolean)
          ??????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????????Select?Operator
          ????????????????expressions:?id?(type:?int)
          ????????????????outputColumnNames:?_col0
          ????????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????????HashTable?Sink?Operator
          ??????????????????keys:
          ????????????????????0?_col0?(type:?int)
          ????????????????????1?_col0?(type:?int)

          ??Stage:?Stage-3
          ????Map?Reduce
          ??????Map?Operator?Tree:
          ??????????TableScan
          ????????????alias:?b
          ????????????Statistics:?Num?rows:?6?Data?size:?75?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????Filter?Operator
          ??????????????predicate:?(id?>?2)?(type:?boolean)
          ??????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????????Select?Operator
          ????????????????expressions:?id?(type:?int),?user_name?(type:?string)
          ????????????????outputColumnNames:?_col0,?_col1
          ????????????????Statistics:?Num?rows:?2?Data?size:?25?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????????Map?Join?Operator
          ??????????????????condition?map:
          ???????????????????????Inner?Join?0?to?1
          ??????????????????keys:
          ????????????????????0?_col0?(type:?int)
          ????????????????????1?_col0?(type:?int)
          ??????????????????outputColumnNames:?_col0,?_col2
          ??????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????????????Select?Operator
          ????????????????????expressions:?_col0?(type:?int),?_col2?(type:?string)
          ????????????????????outputColumnNames:?_col0,?_col1
          ????????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
          ????????????????????File?Output?Operator
          ??????????????????????compressed:?false
          ??????????????????????Statistics:?Num?rows:?2?Data?size:?27?Basic?stats:?COMPLETE?Column?stats:?NONE
          ??????????????????????table:
          ??????????????????????????input?format:?org.apache.hadoop.mapred.SequenceFileInputFormat
          ??????????????????????????output?format:?org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
          ??????????????????????????serde:?org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
          ??????Local?Work:
          ????????Map?Reduce?Local?Work

          ??Stage:?Stage-0
          ????Fetch?Operator
          ??????limit:?-1
          ??????Processor?Tree:
          ????????ListSink
          • 大家有什么發(fā)現(xiàn),除了表別名不一樣,其他的執(zhí)行計(jì)劃完全一樣,都是先進(jìn)行 where 條件過濾,在進(jìn)行 join 條件關(guān)聯(lián)。說明 hive 底層會自動幫我們進(jìn)行優(yōu)化,所以這兩條sql語句執(zhí)行效率是一樣的。
          • 以上僅列舉了3個我們生產(chǎn)中既熟悉又有點(diǎn)迷糊的例子,explain 還有很多其他的用途,如查看stage的依賴情況、排查數(shù)據(jù)傾斜、hive 調(diào)優(yōu)等,小伙伴們可以自行嘗試。
          explain dependency的用法

          explain dependency用于描述一段SQL需要的數(shù)據(jù)來源,輸出是一個json格式的數(shù)據(jù),里面包含以下兩個部分的內(nèi)容:

          • input_partitions:描述一段SQL依賴的數(shù)據(jù)來源表分區(qū),里面存儲的是分區(qū)名的列表,如果整段SQL包含的所有表都是非分區(qū)表,則顯示為空。
          • input_tables:描述一段SQL依賴的數(shù)據(jù)來源表,里面存儲的是Hive表名的列表。

          使用explain dependency查看SQL查詢非分區(qū)普通表,在 hive cli 中輸入以下命令:

          explain?dependency?select?s_age,count(1)?num?from?student_orc;

          得到如下結(jié)果:

          {"input_partitions":[],"input_tables":[{"tablename":"default@student_tb?_orc","tabletype":"MANAGED_TABLE"}]}

          使用explain dependency查看SQL查詢分區(qū)表,在 hive cli 中輸入以下命令:

          explain?dependency?select?s_age,count(1)?num?from?student_orc_partition;

          得到結(jié)果:

          {"input_partitions":[{"partitionName":"default@student_orc_partition@?part=0"},?
          {"partitionName":"default@student_orc_partition@part=1"},?
          {"partitionName":"default@student_orc_partition@part=2"},?
          {"partitionName":"default@student_orc_partition@part=3"},
          {"partitionName":"default@student_orc_partition@part=4"},?
          {"partitionName":"default@student_orc_partition@part=5"},
          {"partitionName":"default@student_orc_partition@part=6"},
          {"partitionName":"default@student_orc_partition@part=7"},
          {"partitionName":"default@student_orc_partition@part=8"},
          {"partitionName":"default@student_orc_partition@part=9"}],?
          "input_tables":[{"tablename":"default@student_orc_partition",?"tabletype":"MANAGED_TABLE"}]

          explain dependency的使用場景有兩個:

          • 場景一:快速排除。快速排除因?yàn)樽x取不到相應(yīng)分區(qū)的數(shù)據(jù)而導(dǎo)致任務(wù)數(shù)據(jù)輸出異常。例如,在一個以天分區(qū)的任務(wù)中,上游任務(wù)因?yàn)樯a(chǎn)過程不可控因素出現(xiàn)異常或者空跑,導(dǎo)致下游任務(wù)引發(fā)異常。通過這種方式,可以快速查看SQL讀取的分區(qū)是否出現(xiàn)異常。
          • 場景二:理清表的輸入,幫助理解程序的運(yùn)行,特別是有助于理解有多重子查詢,多表連接的依賴輸入。

          下面通過兩個案例來看explain dependency的實(shí)際運(yùn)用:

          識別看似等價(jià)的代碼

          有如下兩條看似相等的sql:

          代碼一:

          select?
          a.s_no?
          from?student_orc_partition?a?
          inner?join?
          student_orc_partition_only?b?
          on?a.s_no=b.s_no?and?a.part=b.part?and?a.part>=1?and?a.part<=2;

          代碼二:

          select?
          a.s_no?
          from?student_orc_partition?a?
          inner?join?
          student_orc_partition_only?b?
          on?a.s_no=b.s_no?and?a.part=b.part?
          where?a.part>=1?and?a.part<=2;

          我們看下上述兩段代碼explain dependency的輸出結(jié)果:

          代碼1的explain dependency結(jié)果:

          {"input_partitions":?
          [{"partitionName":"default@student_orc_partition@part=0"},?
          {"partitionName":"default@student_orc_partition@part=1"},?
          {"partitionName":"default@student_orc_partition@part=2"},
          {"partitionName":"default@student_orc_partition_only@part=1"},?
          {"partitionName":"default@student_orc_partition_only@part=2"}],?
          "input_tables":?[{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"},?{"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

          代碼2的explain dependency結(jié)果:

          {"input_partitions":?
          [{"partitionName":"default@student_orc_partition@part=1"},?
          {"partitionName"?:?"default@student_orc_partition@part=2"},
          {"partitionName"?:"default@student_orc_partition_only@part=1"},
          {"partitionName":"default@student_orc_partition_only@part=2"}],?
          "input_tables":?[{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"},?{"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

          通過上面的輸出結(jié)果可以看到,其實(shí)上述的兩個SQL并不等價(jià),代碼1在內(nèi)連接(inner join)中的連接條件(on)中加入非等值的過濾條件后,并沒有將內(nèi)連接的左右兩個表按照過濾條件進(jìn)行過濾,內(nèi)連接在執(zhí)行時會多讀取part=0的分區(qū)數(shù)據(jù)。而在代碼2中,會過濾掉不符合條件的分區(qū)。

          識別SQL讀取數(shù)據(jù)范圍的差別

          有如下兩段代碼:

          代碼一:

          explain?dependency
          select
          a.s_no?
          from?student_orc_partition?a?
          left?join?
          student_orc_partition_only?b?
          on?a.s_no=b.s_no?and?a.part=b.part?and?b.part>=1?and?b.part<=2;

          代碼二:

          explain?dependency?
          select?
          a.s_no?
          from?student_orc_partition?a?
          left?join?
          student_orc_partition_only?b?
          on?a.s_no=b.s_no?and?a.part=b.part?and?a.part>=1?and?a.part<=2;

          以上兩個代碼的數(shù)據(jù)讀取范圍是一樣的嗎?答案是不一樣,我們通過explain dependency來看下:

          代碼1的explain dependency結(jié)果:

          {"input_partitions":?
          [{"partitionName":?"default@student_orc_partition@part=0"},?
          {"partitionName":"default@student_orc_partition@part=1"},?…中間省略7個分區(qū)
          {"partitionName":"default@student_orc_partition@part=9"},?
          {"partitionName":"default@student_orc_partition_only@part=1"},?
          {"partitionName":"default@student_orc_partition_only@part=2"}],?
          "input_tables":?[{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"},?{"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

          代碼2的explain dependency結(jié)果:

          {"input_partitions":?
          [{"partitionName":"default@student_orc_partition@part=0"},?
          {"partitionName":"default@student_orc_partition@part=1"},?…中間省略7個分區(qū)?
          {"partitionName":"default@student_orc_partition@part=9"},?
          {"partitionName":"default@student_orc_partition_only@part=0"},?
          {"partitionName":"default@student_orc_partition_only@part=1"},?…中間省略7個分區(qū)?
          {"partitionName":"default@student_orc_partition_only@part=9"}],
          "input_tables":?[{"tablename":"default@student_orc_partition","tabletype":"MANAGED_TABLE"},?{"tablename":"default@student_orc_partition_only","tabletype":"MANAGED_TABLE"}]}

          可以看到,對左外連接在連接條件中加入非等值過濾的條件,如果過濾條件是作用于右表(b表)有起到過濾的效果,則右表只要掃描兩個分區(qū)即可,但是左表(a表)會進(jìn)行全表掃描。如果過濾條件是針對左表,則完全沒有起到過濾的作用,那么兩個表將進(jìn)行全表掃描。這時的情況就如同全外連接一樣都需要對兩個數(shù)據(jù)進(jìn)行全表掃描。

          在使用過程中,容易認(rèn)為代碼片段2可以像代碼片段1一樣進(jìn)行數(shù)據(jù)過濾,通過查看explain dependency的輸出結(jié)果,可以知道不是如此。

          explain authorization 的用法

          通過explain authorization可以知道當(dāng)前SQL訪問的數(shù)據(jù)來源(INPUTS) 和數(shù)據(jù)輸出(OUTPUTS),以及當(dāng)前Hive的訪問用戶 (CURRENT_USER)和操作(OPERATION)。

          在 hive cli 中輸入以下命令:

          explain?authorization?
          select?variance(s_score)?from?student_tb_orc;

          結(jié)果如下:

          INPUTS:?
          ??default@student_tb_orc?
          OUTPUTS:?
          ??hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194-?90f1475a3ed5/-mr-10000?
          CURRENT_USER:?
          ??hdfs?
          OPERATION:?
          ??QUERY?
          AUTHORIZATION_FAILURES:?
          ??No?privilege?'Select'?found?for?inputs?{?database:default,?table:student_?tb_orc,?columnName:s_score}

          從上面的信息可知:

          • 上面案例的數(shù)據(jù)來源是defalut數(shù)據(jù)庫中的 student_tb_orc表;
          • 數(shù)據(jù)的輸出路徑是hdfs://node01:8020/tmp/hive/hdfs/cbf182a5-8258-4157-9194-90f1475a3ed5/-mr-10000;
          • 當(dāng)前的操作用戶是hdfs,操作是查詢;
          • 觀察上面的信息我們還會看到AUTHORIZATION_FAILURES信息,提示對當(dāng)前的輸入沒有查詢權(quán)限,但如果運(yùn)行上面的SQL的話也能夠正常運(yùn)行。為什么會出現(xiàn)這種情況?Hive在默認(rèn)不配置權(quán)限管理的情況下不進(jìn)行權(quán)限驗(yàn)證,所有的用戶在Hive里面都是超級管理員,即使不對特定的用戶進(jìn)行賦權(quán),也能夠正常查詢。

          Tez引擎

          Tez是Apache開源的支持DAG作業(yè)的計(jì)算框架,是支持HADOOP2.x的重要引擎。它源于MapReduce框架,核心思想是將Map和Reduce兩個操作進(jìn)一步拆分,分解后的元操作可以任意靈活組合,產(chǎn)生新的操作,這些操作經(jīng)過一些控制程序組裝后,可形成一個大的DAG作業(yè)。

          Tez將Map task和Reduce task進(jìn)一步拆分為如下圖所示:

          Tez的task由Input、processor、output階段組成,可以表達(dá)所有復(fù)雜的map、reduce操作,如下圖:

          本文首發(fā)自公眾號:

          《import_bigdata》,大數(shù)據(jù)技術(shù)與架構(gòu)。

          Tez的實(shí)現(xiàn)

          Tez對外提供了6種可編程組件,分別是:

          1)Input:對輸入數(shù)據(jù)源的抽象,它解析輸入數(shù)據(jù)格式,并吐出一個個Key/value

          2)Output:對輸出數(shù)據(jù)源的抽象,它將用戶程序產(chǎn)生的Key/value寫入文件系統(tǒng)

          3)Paritioner:對數(shù)據(jù)進(jìn)行分片,類似于MR中的Partitioner

          4)Processor:對計(jì)算的抽象,它從一個Input中獲取數(shù)據(jù),經(jīng)處理后,通過Output輸出

          5)Task:對任務(wù)的抽象,每個Task由一個Input、Ouput和Processor組成

          6)Maser:管理各個Task的依賴關(guān)系,并按順依賴關(guān)系執(zhí)行他們

          除了以上6種組件,Tez還提供了兩種算子,分別是Sort(排序)和Shuffle(混洗),為了用戶使用方便,它還提供了多種Input、Output、Task和Sort的實(shí)現(xiàn),具體如下:

          1)Input實(shí)現(xiàn):LocalMergedInput(文件本地合并后作為輸入),ShuffledMergedInput(遠(yuǎn)程拷貝數(shù)據(jù)且合并后作為輸入)

          2)Output實(shí)現(xiàn):InMemorySortedOutput(內(nèi)存排序后輸出),LocalOnFileSorterOutput(本地磁盤排序后輸出),OnFileSortedOutput(磁盤排序后輸出)

          3)Task實(shí)現(xiàn):RunTimeTask(非常簡單的Task,基本沒做什么事)

          4)Sort實(shí)現(xiàn):DefaultSorter(本地?cái)?shù)據(jù)排序),InMemoryShuffleSorter(遠(yuǎn)程拷貝數(shù)據(jù)并排序)

          為了展示Tez的使用方法和驗(yàn)證Tez框架的可用性,Apache在YARN MRAppMaster基礎(chǔ)上使用Tez編程接口重新設(shè)計(jì)了MapReduce框架,使之可運(yùn)行在YARN中。為此,Tez提供了以下幾個組件:

          1)Input:SimpleInput(直接使用MR InputFormat獲取數(shù)據(jù))

          2)Output:SimpleOutput(直接使用MR OutputFormat獲取數(shù)據(jù))

          3)Partition:MRPartitioner(直接使用MR Partitioner獲取數(shù)據(jù))

          4)Processor:MapProcessor(執(zhí)行Map Task),ReduceProcessor(執(zhí)行Reduce Task)

          5)Task:FinalTask,InitialTask,initialTaskWithInMemSort,InitialTaskWithLocalSort ,IntermediateTask,LocalFinalTask,MapOnlyTask。

          對于MapReduce作業(yè)而言,如果只有Map Task,則使用MapOnlyTask,否則,Map Task使用InitialTaskWithInMemSort而Reduce Task用FinalTask。當(dāng)然,如果你想編寫其他類型的作業(yè),可使用以上任何幾種Task進(jìn)行組合,比如”InitialTaskWithInMemSort –> FinalTask”是MapReduce作業(yè)。

          為了減少Tez開發(fā)工作量,并讓Tez能夠運(yùn)行在YARN之上,Tez重用了大部分YARN中MRAppMater的代碼,包括客戶端、資源申請、任務(wù)推測執(zhí)行、任務(wù)啟動等。

          Tez和MapReduce作業(yè)的比較:

          • Tez繞過了MapReduce很多不必要的中間的數(shù)據(jù)存儲和讀取的過程,直接在一個作業(yè)中表達(dá)了MapReduce需要多個作業(yè)共同協(xié)作才能完成的事情。

          • Tez和MapReduce一樣都運(yùn)行使用YARN作為資源調(diào)度和管理。但與MapReduce on YARN不同,Tez on YARN并不是將作業(yè)提交到ResourceManager,而是提交到AMPoolServer的服務(wù)上,AMPoolServer存放著若干已經(jīng)預(yù)先啟動ApplicationMaster的服務(wù)。

          • 當(dāng)用戶提交一個作業(yè)上來后,AMPoolServer從中選擇一個ApplicationMaster用于管理用戶提交上來的作業(yè),這樣既可以節(jié)省ResourceManager創(chuàng)建ApplicationMaster的時間,而又能夠重用每個ApplicationMaster的資源,節(jié)省了資源釋放和創(chuàng)建時間。

          Tez相比于MapReduce有幾點(diǎn)重大改進(jìn):

          • 當(dāng)查詢需要有多個reduce邏輯時,Hive的MapReduce引擎會將計(jì)劃分解,每個Redcue提交一個MR作業(yè)。這個鏈中的所有MR作業(yè)都需要逐個調(diào)度,每個作業(yè)都必須從HDFS中重新讀取上一個作業(yè)的輸出并重新洗牌。而在Tez中,幾個reduce接收器可以直接連接,數(shù)據(jù)可以流水線傳輸,而不需要臨時HDFS文件,這種模式稱為MRR(Map-reduce-reduce*)。

          • Tez還允許一次發(fā)送整個查詢計(jì)劃,實(shí)現(xiàn)應(yīng)用程序動態(tài)規(guī)劃,從而使框架能夠更智能地分配資源,并通過各個階段流水線傳輸數(shù)據(jù)。對于更復(fù)雜的查詢來說,這是一個巨大的改進(jìn),因?yàn)樗薎O/sync障礙和各個階段之間的調(diào)度開銷。

          • 在MapReduce計(jì)算引擎中,無論數(shù)據(jù)大小,在Shuffle階段都以相同的方式執(zhí)行,將數(shù)據(jù)序列化到磁盤,再由下游的程序去拉取,并反序列化。Tez可以允許小數(shù)據(jù)集完全在內(nèi)存中處理,而MapReduce中沒有這樣的優(yōu)化。倉庫查詢經(jīng)常需要在處理完大量的數(shù)據(jù)后對小型數(shù)據(jù)集進(jìn)行排序或聚合,Tez的優(yōu)化也能極大地提升效率。

          給 Hive 換上 Tez 非常簡單,只需給 hive-site.xml 中設(shè)置:


          ????hive.execution.engine
          ????tez

          設(shè)置hive.execution.engine為 tez 后進(jìn)入到 Hive 執(zhí)行 SQL:

          hive>?select?count(*)?as?c?from?userinfo;
          Query?ID?=?zhenqin_20161104150743_4155afab-4bfa-4e8a-acb0-90c8c50ecfb5
          Total?jobs?=?1
          Launching?Job?1?out?of?1
          ?
          ?
          Status:?Running?(Executing?on?YARN?cluster?with?App?id?application_1478229439699_0007)
          ?
          --------------------------------------------------------------------------------
          ????????VERTICES??????STATUS??TOTAL??COMPLETED??RUNNING??PENDING??FAILED??KILLED
          --------------------------------------------------------------------------------
          Map?1?..........???SUCCEEDED??????2??????????2????????0????????0???????0???????0
          Reducer?2?......???SUCCEEDED??????1??????????1????????0????????0???????0???????0
          --------------------------------------------------------------------------------
          VERTICES:?02/02??[==========================>>]?100%??ELAPSED?TIME:?6.19?s?????
          --------------------------------------------------------------------------------
          OK
          1000000
          Time?taken:?6.611?seconds,?Fetched:?1?row(s)

          可以看到,我的 userinfo 中有 100W 條記錄,執(zhí)行一遍 count 需要 6.19s。現(xiàn)在把 engine 換為 mr

          set?hive.execution.engine=mr;

          再次執(zhí)行 count userinfo:

          hive>?select?count(*)?as?c?from?userinfo;
          Query?ID?=?zhenqin_20161104152022_c7e6c5bd-d456-4ec7-b895-c81a369aab27
          Total?jobs?=?1
          Launching?Job?1?out?of?1
          Starting?Job?=?job_1478229439699_0010,?Tracking?URL?=?http://localhost:8088/proxy/application_1478229439699_0010/
          Kill?Command?=?/Users/zhenqin/software/hadoop/bin/hadoop?job??-kill?job_1478229439699_0010
          Hadoop?job?information?for?Stage-1:?number?of?mappers:?1;?number?of?reducers:?1
          2016-11-04?15:20:28,323?Stage-1?map?=?0%,??reduce?=?0%
          2016-11-04?15:20:34,587?Stage-1?map?=?100%,??reduce?=?0%
          2016-11-04?15:20:40,796?Stage-1?map?=?100%,??reduce?=?100%
          Ended?Job?=?job_1478229439699_0010
          MapReduce?Jobs?Launched:?
          Stage-Stage-1:?Map:?1??Reduce:?1???HDFS?Read:?215?HDFS?Write:?0?SUCCESS
          Total?MapReduce?CPU?Time?Spent:?0?msec
          OK
          1000000
          Time?taken:?19.46?seconds,?Fetched:?1?row(s)
          hive>?

          可以看到,使用 Tez 效率比 MapReduce 有近3倍的提升。而且,Hive 在使用 Tez 引擎執(zhí)行時,有 ==>> 動態(tài)的進(jìn)度指示。而在使用 mr 時,只有日志輸出 map and reduce 的進(jìn)度百分比。使用 tez,輸出的日志也清爽很多。

          在我測試的很多復(fù)雜的 SQL,Tez 的都比 MapReduce 快很多,快慢取決于 SQL 的復(fù)雜度。執(zhí)行簡單的 select 等并不能體現(xiàn) tez 的優(yōu)勢。Tez 內(nèi)部翻譯 SQL 能任意的 Map,Reduce,Reduce 組合,而 MR 只能 Map->Reduce->Map->Reduce,因此在執(zhí)行復(fù)雜 SQL 時, Tez 的優(yōu)勢明顯。


          Tez 參數(shù)優(yōu)化

          優(yōu)化參參數(shù)(在同樣條件下,使用了tez從300s+降到200s+)

          set?hive.execution.engine=tez;
          set?mapred.job.name=recommend_user_profile_$idate;
          set?mapred.reduce.tasks=-1;
          set?hive.exec.reducers.max=160;
          set?hive.auto.convert.join=true;
          set?hive.exec.parallel=true;
          set?hive.exec.parallel.thread.number=16;?
          set?hive.optimize.skewjoin=true;
          set?hive.exec.reducers.bytes.per.reducer=100000000;
          set?mapred.max.split.size=200000000;
          set?mapred.min.split.size.per.node=100000000;
          set?mapred.min.split.size.per.rack=100000000;
          set?hive.hadoop.supports.splittable.combineinputformat=true;
          set?hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

          Tez內(nèi)存優(yōu)化

          1. AM、Container大小設(shè)置

          tez.am.resource.memory.mb

          參數(shù)說明:Set tez.am.resource.memory.mb tobe the same as yarn.scheduler.minimum-allocation-mb the YARNminimum container size.

          hive.tez.container.size

          參數(shù)說明:Set hive.tez.container.size to be the same as or a small multiple(1 or 2 times that) of YARN container size yarn.scheduler.minimum-allocation-mb but NEVER more than yarn.scheduler.maximum-allocation-mb.

          2. AM、Container JVM參數(shù)設(shè)置

          tez.am.launch.cmd-opts?

          默認(rèn)值:80% * tez.am.resource.memory.mb,一般不需要調(diào)整

          hive.tez.java.ops

          默認(rèn)值:80% * hive.tez.container.size 參數(shù)說明:Hortonworks建議"–server –Djava.net.preferIPv4Stack=true–XX:NewRatio=8 –XX:+UseNUMA –XX:UseG1G"

          tez.container.max.java.heap.fraction

          默認(rèn)值:0.8,參數(shù)說明:task/AM占用JVM Xmx的比例,該參數(shù)建議調(diào)整,需根據(jù)具體業(yè)務(wù)情況修改;

          3. Hive內(nèi)存Map Join參數(shù)設(shè)置

          tez.runtime.io.sort.mb

          默認(rèn)值:100,參數(shù)說明:輸出排序需要的內(nèi)存大小。建議值:40% * hive.tez.container.size,一般不超過2G.

          hive.auto.convert.join.noconditionaltask

          默認(rèn)值:true,參數(shù)說明:是否將多個mapjoin合并為一個,使用默認(rèn)值

          hive.auto.convert.join.noconditionaltask.size

          默認(rèn)值為10MB,參數(shù)說明:多個mapjoin轉(zhuǎn)換為1個時,所有小表的文件大小總和的最大值,這個值只是限制輸入的表文件的大小,并不代表實(shí)際mapjoin時hashtable的大小。建議值:1/3 * hive.tez.container.size

          tez.runtime.unordered.output.buffer.size-mb

          默認(rèn)值:100,參數(shù)說明:Size of the buffer to use if not writing directly to disk。建議值: 10% * hive.tez.container.size.

          4. Container重用設(shè)置

          tez.am.container.reuse.enabled

          默認(rèn)值:true,參數(shù)說明:Container重用開關(guān)


          Spark引擎

          Hive社區(qū)于2014年推出了Hive on Spark項(xiàng)目(HIVE-7292),將Spark作為繼MapReduce和Tez之后Hive的第三個計(jì)算引擎。該項(xiàng)目由Cloudera、Intel和MapR等幾家公司共同開發(fā),并受到了來自Hive和Spark兩個社區(qū)的共同關(guān)注。通過該項(xiàng)目,可以提高Hive查詢的性能,同時為已經(jīng)部署了Hive或者Spark的用戶提供了更加靈活的選擇,從而進(jìn)一步提高Hive和Spark的普及率。

          本文首發(fā)自公眾號:

          《import_bigdata》,大數(shù)據(jù)技術(shù)與架構(gòu)。

          總體設(shè)計(jì)

          Hive on Spark總體的設(shè)計(jì)思路是,盡可能重用Hive邏輯層面的功能;從生成物理計(jì)劃開始,提供一整套針對Spark的實(shí)現(xiàn),比如 SparkCompiler、SparkTask等,這樣Hive的查詢就可以作為Spark的任務(wù)來執(zhí)行了。以下是幾點(diǎn)主要的設(shè)計(jì)原則。

          • 盡可能減少對Hive原有代碼的修改。這是和之前的Shark設(shè)計(jì)思路最大的不同。Shark對Hive的改動太大以至于無法被Hive社區(qū)接受,Hive on Spark盡可能少改動Hive的代碼,從而不影響Hive目前對MapReduce和Tez的支持。同時,Hive on Spark保證對現(xiàn)有的MapReduce和Tez模式在功能和性能方面不會有任何影響。
          • 對于選擇Spark的用戶,應(yīng)使其能夠自動的獲取Hive現(xiàn)有的和未來新增的功能。
          • 盡可能降低維護(hù)成本,保持對Spark依賴的松耦合。

          基于以上思路和原則,具體的一些設(shè)計(jì)架構(gòu)如下。

          Hive 的用戶可以通過hive.execution.engine來設(shè)置計(jì)算引擎,目前該參數(shù)可選的值為mr和tez。為了實(shí)現(xiàn)Hive on Spark,我們將spark作為該參數(shù)的第三個選項(xiàng)。要開啟Hive on Spark模式,用戶僅需將這個參數(shù)設(shè)置為spark即可。

          在hive中使用以下語句開啟:

          hive>?set?hive.execution.engine=spark;

          總體設(shè)計(jì)

          Spark 以分布式可靠數(shù)據(jù)集(Resilient Distributed Dataset,RDD)作為其數(shù)據(jù)抽象,因此我們需要將Hive的表轉(zhuǎn)化為RDD以便Spark處理。本質(zhì)上,Hive的表和Spark的 HadoopRDD都是HDFS上的一組文件,通過InputFormat和RecordReader讀取其中的數(shù)據(jù),因此這個轉(zhuǎn)化是自然而然的。

          Spark為RDD提供了一系列的轉(zhuǎn)換(Transformation),其中有些轉(zhuǎn)換也是面向SQL 的,如groupByKey、join等。但如果使用這些轉(zhuǎn)換(就如Shark所做的那樣),就意味著我們要重新實(shí)現(xiàn)一些Hive已有的功能;而且當(dāng) Hive增加新的功能時,我們需要相應(yīng)地修改Hive on Spark模式。有鑒于此,我們選擇將Hive的操作符包裝為Function,然后應(yīng)用到RDD上。這樣,我們只需要依賴較少的幾種RDD的轉(zhuǎn)換,而主要的計(jì)算邏輯仍由Hive提供。

          由于使用了Hive的原語,因此我們需要顯式地調(diào)用一些Transformation來實(shí)現(xiàn)Shuffle的功能。下表中列舉了Hive on Spark使用的所有轉(zhuǎn)換。

          Hive on Spark

          對repartitionAndSortWithinPartitions 簡單說明一下,這個功能由SPARK-2978引入,目的是提供一種MapReduce風(fēng)格的Shuffle。雖然sortByKey也提供了排序的功 能,但某些情況下我們并不需要全局有序,另外其使用的Range Partitioner對于某些Hive的查詢并不適用。

          物理執(zhí)行計(jì)劃

          通過SparkCompiler將Operator Tree轉(zhuǎn)換為Task Tree,其中需要提交給Spark執(zhí)行的任務(wù)即為SparkTask。不同于MapReduce中Map+Reduce的兩階段執(zhí)行模式,Spark采用DAG執(zhí)行模式,因此一個SparkTask包含了一個表示RDD轉(zhuǎn)換的DAG,我們將這個DAG包裝為SparkWork。執(zhí)行SparkTask 時,就根據(jù)SparkWork所表示的DAG計(jì)算出最終的RDD,然后通過RDD的foreachAsync來觸發(fā)運(yùn)算。使用foreachAsync是因?yàn)槲覀兪褂昧薍ive原語,因此不需要RDD返回結(jié)果;此外foreachAsync異步提交任務(wù)便于我們對任務(wù)進(jìn)行監(jiān)控。

          SparkContext生命周期

          SparkContext 是用戶與Spark集群進(jìn)行交互的接口,Hive on Spark應(yīng)該為每個用戶的會話創(chuàng)建一個SparkContext。但是Spark目前的使用方式假設(shè)SparkContext的生命周期是Spark應(yīng) 用級別的,而且目前在同一個JVM中不能創(chuàng)建多個SparkContext。這明顯無法滿足HiveServer2的應(yīng)用場景,因?yàn)槎鄠€客戶端需要通過同一個HiveServer2來提供服務(wù)。鑒于此,我們需要在單獨(dú)的JVM中啟動SparkContext,并通過RPC與遠(yuǎn)程的SparkContext進(jìn)行通信。

          任務(wù)監(jiān)控與統(tǒng)計(jì)信息收集

          Spark提供了SparkListener接口來監(jiān)聽任務(wù)執(zhí)行期間的各種事件,因此我們可以實(shí)現(xiàn)一個Listener來監(jiān)控任務(wù)執(zhí)行進(jìn)度以及收集任務(wù)級別的統(tǒng)計(jì)信 息(目前任務(wù)級別的統(tǒng)計(jì)由SparkListener采集,任務(wù)進(jìn)度則由Spark提供的專門的API來監(jiān)控)。另外Hive還提供了Operator級 別的統(tǒng)計(jì)數(shù)據(jù)信息,比如讀取的行數(shù)等。在MapReduce模式下,這些信息通過Hadoop Counter收集。我們可以使用Spark提供的Accumulator來實(shí)現(xiàn)該功能。

          細(xì)節(jié)實(shí)現(xiàn)

          Hive on Spark解析SQL的過程

          SQL語句在分析執(zhí)行過程中會經(jīng)歷下圖所示的幾個步驟

          1. 語法解析
          2. 操作綁定
          3. 優(yōu)化執(zhí)行策略
          4. 交付執(zhí)行

          語法解析

          語法解析之后,會形成一棵語法樹,如下圖所示。樹中的每個節(jié)點(diǎn)是執(zhí)行的rule,整棵樹稱之為執(zhí)行策略。

          策略優(yōu)化

          形成上述的執(zhí)行策略樹還只是第一步,因?yàn)檫@個執(zhí)行策略可以進(jìn)行優(yōu)化,所謂的優(yōu)化就是對樹中節(jié)點(diǎn)進(jìn)行合并或是進(jìn)行順序上的調(diào)整。

          以大家熟悉的join操作為例,下圖給出一個join優(yōu)化的示例。A JOIN B等同于B JOIN A,但是順序的調(diào)整可能給執(zhí)行的性能帶來極大的影響,下圖就是調(diào)整前后的對比圖。

          在Hash Join中,首先被訪問的表稱之為“內(nèi)部構(gòu)建表”,第二個表為“探針輸入”。創(chuàng)建內(nèi)部表時,會將數(shù)據(jù)移動到數(shù)據(jù)倉庫指向的路徑;創(chuàng)建外部表,僅記錄數(shù)據(jù)所在的路徑。

          再舉一例,一般來說盡可能的先實(shí)施聚合操作(Aggregate)然后再join

          這種優(yōu)化自動完成,在調(diào)優(yōu)時不需要考慮。

          SQL到Spark作業(yè)的轉(zhuǎn)換過程

          • native command的執(zhí)行流程

          由于native command是一些非耗時的操作,直接使用Hive中原有的exeucte engine來執(zhí)行即可。這些command的執(zhí)行示意圖如下:

          • SparkTask的生成和執(zhí)行

          我們通過一個例子來看一下一個簡單的兩表JOIN查詢?nèi)绾伪晦D(zhuǎn)換為SparkTask并被執(zhí)行。下圖左半部分展示了這個查詢的Operator Tree,以及該Operator Tree如何被轉(zhuǎn)化成SparkTask;右半部分展示了該SparkTask執(zhí)行時如何得到最終的RDD并通過foreachAsync提交Spark任務(wù)。

          SparkCompiler遍歷Operator Tree,將其劃分為不同的MapWork和ReduceWork。

          MapWork為根節(jié)點(diǎn),總是由TableScanOperator(Hive中對表進(jìn)行掃描的操作符)開始;后續(xù)的Work均為ReduceWork。ReduceSinkOperator(Hive中進(jìn)行Shuffle輸出的操作符)用來標(biāo)記兩個Work之間的界線,出現(xiàn)ReduceSinkOperator表示當(dāng)前Work到下一個Work之間的數(shù)據(jù)需要進(jìn)行Shuffle。因此,當(dāng)我們發(fā)現(xiàn)ReduceSinkOperator時,就會創(chuàng)建一個新的ReduceWork并作為當(dāng)前Work的子節(jié)點(diǎn)。包含了FileSinkOperator(Hive中將結(jié)果輸出到文件的操作符)的Work為葉子節(jié)點(diǎn)。

          與MapReduce最大的不同在于,我們并不要求ReduceWork一定是葉子節(jié)點(diǎn),即ReduceWork之后可以鏈接更多的ReduceWork,并在同一個SparkTask中執(zhí)行。

          從該圖可以看出,這個查詢的Operator Tree被轉(zhuǎn)化成了兩個MapWork和一個ReduceWork。

          執(zhí)行SparkTask步驟:

          1. 根據(jù)MapWork來生成最底層的HadoopRDD,
          2. 將各個MapWork和ReduceWork包裝成Function應(yīng)用到RDD上。
          3. 在有依賴的Work之間,需要顯式地調(diào)用Shuffle轉(zhuǎn)換,具體選用哪種Shuffle則要根據(jù)查詢的類型來確定。另外,由于這個例子涉及多表查詢,因此在Shuffle之前還要對RDD進(jìn)行Union。
          4. 經(jīng)過這一系列轉(zhuǎn)換后,得到最終的RDD,并通過foreachAsync提交到Spark集群上進(jìn)行計(jì)算。

          在logicalPlan到physicalPlan的轉(zhuǎn)換過程中,toRdd最關(guān)鍵的元素

          override?lazy?val?toRdd:?RDD[Row]?=
          ??????analyzed?match?{
          ????????case?NativeCommand(cmd)?=>
          ??????????val?output?=?runSqlHive(cmd)

          ??????????if?(output.size?==?0)?{
          ????????????emptyResult
          ??????????}?else?{
          ????????????val?asRows?=?output.map(r?=>?new?GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
          ????????????sparkContext.parallelize(asRows,?1)
          ??????????}
          ????????case?_?=>
          ??????????executedPlan.execute().map(_.copy())
          ??????}

          SparkTask的生成和執(zhí)行

          我們通過一個例子來看一下一個簡單的兩表JOIN查詢?nèi)绾伪晦D(zhuǎn)換為SparkTask并被執(zhí)行。下圖左半部分展示了這個查詢的Operator Tree,以及該Operator Tree如何被轉(zhuǎn)化成SparkTask;右半部分展示了該SparkTask執(zhí)行時如何得到最終的RDD并通過foreachAsync提交Spark任務(wù)。

          SparkCompiler遍歷Operator Tree,將其劃分為不同的MapWork和ReduceWork。MapWork為根節(jié)點(diǎn),總是由TableScanOperator(Hive中對表 進(jìn)行掃描的操作符)開始;后續(xù)的Work均為ReduceWork。ReduceSinkOperator(Hive中進(jìn)行Shuffle輸出的操作符) 用來標(biāo)記兩個Work之間的界線,出現(xiàn)ReduceSinkOperator表示當(dāng)前Work到下一個Work之間的數(shù)據(jù)需要進(jìn)行Shuffle。因此, 當(dāng)我們發(fā)現(xiàn)ReduceSinkOperator時,就會創(chuàng)建一個新的ReduceWork并作為當(dāng)前Work的子節(jié)點(diǎn)。包含了 FileSinkOperator(Hive中將結(jié)果輸出到文件的操作符)的Work為葉子節(jié)點(diǎn)。與MapReduce最大的不同在于,我們并不要求 ReduceWork一定是葉子節(jié)點(diǎn),即ReduceWork之后可以鏈接更多的ReduceWork,并在同一個SparkTask中執(zhí)行。

          從該圖可以看出,這個查詢的Operator Tree被轉(zhuǎn)化成了兩個MapWork和一個ReduceWork。在執(zhí)行SparkTask時,首先根據(jù)MapWork來生成最底層的 HadoopRDD,然后將各個MapWork和ReduceWork包裝成Function應(yīng)用到RDD上。在有依賴的Work之間,需要顯式地調(diào)用 Shuffle轉(zhuǎn)換,具體選用哪種Shuffle則要根據(jù)查詢的類型來確定。另外,由于這個例子涉及多表查詢,因此在Shuffle之前還要對RDD進(jìn)行 Union。經(jīng)過這一系列轉(zhuǎn)換后,得到最終的RDD,并通過foreachAsync提交到Spark集群上進(jìn)行計(jì)算。

          運(yùn)行模式

          Hive on Spark支持兩種運(yùn)行模式:本地和遠(yuǎn)程。當(dāng)用戶把Spark Master URL設(shè)置為local時,采用本地模式;其余情況則采用遠(yuǎn)程模式。本地模式下,SparkContext與客戶端運(yùn)行在同一個JVM中;遠(yuǎn)程模式 下,SparkContext運(yùn)行在一個獨(dú)立的JVM中。提供本地模式主要是為了方便調(diào)試,一般用戶不應(yīng)選擇該模式。因此我們這里也主要介紹遠(yuǎn)程模式 (Remote SparkContext,RSC)。下圖展示了RSC的工作原理。

          用戶的每個Session會創(chuàng)建一個SparkClient,SparkClient會啟動RemoteDriver進(jìn)程,并由RemoteDriver創(chuàng) 建SparkContext。SparkTask執(zhí)行時,通過Session提交任務(wù),任務(wù)的主體就是對應(yīng)的SparkWork。SparkClient 將任務(wù)提交給RemoteDriver,并返回一個SparkJobRef,通過該SparkJobRef,客戶端可以監(jiān)控任務(wù)執(zhí)行進(jìn)度,進(jìn)行錯誤處理, 以及采集統(tǒng)計(jì)信息等。由于最終的RDD計(jì)算沒有返回結(jié)果,因此客戶端只需要監(jiān)控執(zhí)行進(jìn)度而不需要處理返回值。RemoteDriver通過 SparkListener收集任務(wù)級別的統(tǒng)計(jì)數(shù)據(jù),通過Accumulator收集Operator級別的統(tǒng)計(jì)數(shù)據(jù)(Accumulator被包裝為 SparkCounter),并在任務(wù)結(jié)束時返回給SparkClient。

          SparkClient 與RemoteDriver之間通過基于Netty的RPC進(jìn)行通信。除了提交任務(wù),SparkClient還提供了諸如添加Jar包、獲取集群信息等接 口。如果客戶端需要使用更一般的SparkContext的功能,可以自定義一個任務(wù)并通過SparkClient發(fā)送到RemoteDriver上執(zhí) 行。

          理論上來說,Hive on Spark對于Spark集群的部署方式?jīng)]有特別的要求,除了local以外,RemoteDriver可以連接到任意的Spark集群來執(zhí)行任務(wù)。在我 們的測試中,Hive on Spark在Standalone和Spark on YARN的集群上都能正常工作(需要動態(tài)添加Jar包的查詢在yarn-cluster模式下還不能運(yùn)行,請參考HIVE-9425)。

          優(yōu)化

          Yarn的配置

          yarn.nodemanager.resource.cpu-vcoresyarn.nodemanager.resource.memory-mb,這兩個參數(shù)決定這集群資源管理器能夠有多少資源用于運(yùn)行yarn上的任務(wù)。這兩個參數(shù)的值是由機(jī)器的配置及同時在機(jī)器上運(yùn)行的其它進(jìn)程共同決定。本文假設(shè)僅有hdfs的datanode和yarn的nodemanager運(yùn)行于該節(jié)點(diǎn)。

          1. 配置cores

          基本配置是datanode和nodemanager各一個核,操作系統(tǒng)兩個核,然后剩下28核配置作為yarn資源。也即是yarn.nodemanager.resource.cpu-vcores=28

          1. 配置內(nèi)存

          對于內(nèi)存,預(yù)留20GB給操作系統(tǒng),datanode,nodemanager,剩余100GB作為yarn資源。也即是 yarn.nodemanager.resource.memory-mb=100*1024

          Spark配置

          假設(shè)Yarn節(jié)點(diǎn)機(jī)器配置,假設(shè)有32核,120GB內(nèi)存。

          給Yarn分配資源以后,那就要想著spark如何使用這些資源了,主要配置對象:

          execurtor 和driver內(nèi)存,executro配額,并行度。

          1. executor內(nèi)存

          設(shè)置executor內(nèi)存需要考慮如下因素:

          • executor內(nèi)存越多,越能為更多的查詢提供map join的優(yōu)化。由于垃圾回收的壓力會導(dǎo)致開銷增加。

          • 某些情況下hdfs的客戶端不能很好的處理并發(fā)寫入,所以過多的核心可能會導(dǎo)致競爭。

          為了最大化使用core,建議將core設(shè)置為4,5,6(多核心會導(dǎo)致并發(fā)問題,所以寫代碼的時候尤其是靜態(tài)的鏈接等要考慮并發(fā)問題)具體分配核心數(shù)要結(jié)合yarn所提供的核心數(shù)。由于本文中涉及到的node節(jié)點(diǎn)是28核,那么很明顯分配為4的化可以被整除,spark.executor.cores設(shè)置為4 不會有多余的核剩下,設(shè)置為5,6都會有core剩余。spark.executor.cores=4,由于總共有28個核,那么最大可以申請的executor數(shù)是7。總內(nèi)存處以7,也即是 100/7,可以得到每個executor約14GB內(nèi)存。

          要知道 spark.executor.memoryspark.executor.memoryOverhead共同決定著executor內(nèi)存。建議spark.executor.memoryOverhead站總內(nèi)存的 15%-20%。那么最終spark.executor.memoryOverhead=2Gspark.executor.memory=12G.

          根據(jù)上面的配置的化,每個主機(jī)就可以申請7個executor,每個executor可以運(yùn)行4個任務(wù),每個core一個task。那么每個task的平均內(nèi)存是 14/4 = 3.5GB。在executor運(yùn)行的task共享內(nèi)存。其實(shí),executor內(nèi)部是用newCachedThreadPool運(yùn)行task的。

          確保spark.executor.memoryOverheadspark.executor.memory的和不超過yarn.scheduler.maximum-allocation-mb

          1. driver內(nèi)存

          對于drvier的內(nèi)存配置,當(dāng)然也包括兩個參數(shù)。

          • spark.driver.memoryOverhead 每個driver能從yarn申請的堆外內(nèi)存的大小。

          • spark.driver.memory 當(dāng)運(yùn)行hive on spark的時候,每個spark driver能申請的最大jvm 堆內(nèi)存。該參數(shù)結(jié)合 spark.driver.memoryOverhead共同決定著driver的內(nèi)存大小。

          driver的內(nèi)存大小并不直接影響性能,但是也不要job的運(yùn)行受限于driver的內(nèi)存. 這里給出spark driver內(nèi)存申請的方案,假設(shè)yarn.nodemanager.resource.memory-mb是 X。

          • driver內(nèi)存申請12GB,假設(shè) X > 50GB
          • driver內(nèi)存申請 4GB,假設(shè) 12GB < X <50GB
          • driver內(nèi)存申請1GB,假設(shè) 1GB < X < 12 GB
          • driver內(nèi)存申請256MB,假設(shè) X < 1GB

          這些數(shù)值是spark.driver.memoryspark.driver.memoryOverhead內(nèi)存的總和。對外內(nèi)存站總內(nèi)存的10%-15%。假設(shè) yarn.nodemanager.resource.memory-mb=100*1024MB,那么driver內(nèi)存設(shè)置為12GB,此時 spark.driver.memory=10.5gbspark.driver.memoryOverhead=1.5gb

          注意,資源多少直接對應(yīng)的是數(shù)據(jù)量的大小。所以要結(jié)合資源和數(shù)據(jù)量進(jìn)行適當(dāng)縮減和增加。

          1. executor數(shù)

          executor的數(shù)目是由每個節(jié)點(diǎn)運(yùn)行的executor數(shù)目和集群的節(jié)點(diǎn)數(shù)共同決定。如果你有四十個節(jié)點(diǎn),那么hive可以使用的最大executor數(shù)就是 280(40*7). 最大數(shù)目可能比這個小點(diǎn),因?yàn)閐river也會消耗1core和12GB。

          當(dāng)前假設(shè)是沒有yarn應(yīng)用在跑。

          Hive性能與用于運(yùn)行查詢的executor數(shù)量直接相關(guān)。但是,不通查詢還是不通。通常,性能與executor的數(shù)量成比例。例如,查詢使用四個executor大約需要使用兩個executor的一半時間。但是,性能在一定數(shù)量的executor中達(dá)到峰值,高于此值時,增加數(shù)量不會改善性能并且可能產(chǎn)生不利影響。

          在大多數(shù)情況下,使用一半的集群容量(executor數(shù)量的一半)可以提供良好的性能。為了獲得最佳性能,最好使用所有可用的executor。例如,設(shè)置spark.executor.instances = 280。對于基準(zhǔn)測試和性能測量,強(qiáng)烈建議這樣做。

          1. 動態(tài)executor申請

          雖然將spark.executor.instances設(shè)置為最大值通常可以最大限度地提高性能,但不建議在多個用戶運(yùn)行Hive查詢的生產(chǎn)環(huán)境中這樣做。避免為用戶會話分配固定數(shù)量的executor,因?yàn)槿绻鹐xecutor空閑,executor不能被其他用戶查詢使用。在生產(chǎn)環(huán)境中,應(yīng)該好好計(jì)劃executor分配,以允許更多的資源共享。

          Spark允許您根據(jù)工作負(fù)載動態(tài)擴(kuò)展分配給Spark應(yīng)用程序的集群資源集。要啟用動態(tài)分配,請按照動態(tài)分配中的步驟進(jìn)行操作。除了在某些情況下,強(qiáng)烈建議啟用動態(tài)分配。

          1. 并行度

          要使可用的executor得到充分利用,必須同時運(yùn)行足夠的任務(wù)(并行)。在大多數(shù)情況下,Hive會自動確定并行度,但也可以在調(diào)優(yōu)并發(fā)度方面有一些控制權(quán)。在輸入端,map任務(wù)的數(shù)量等于輸入格式生成的split數(shù)。對于Hive on Spark,輸入格式為CombineHiveInputFormat,它可以根據(jù)需要對基礎(chǔ)輸入格式生成的split進(jìn)行分組。可以更好地控制stage邊界的并行度。調(diào)整hive.exec.reducers.bytes.per.reducer以控制每個reducer處理的數(shù)據(jù)量,Hive根據(jù)可用的executor,執(zhí)行程序內(nèi)存,以及其他因素來確定最佳分區(qū)數(shù)。實(shí)驗(yàn)表明,只要生成足夠的任務(wù)來保持所有可用的executor繁忙,Spark就比MapReduce對hive.exec.reducers.bytes.per.reducer指定的值敏感度低。為獲得最佳性能,請為該屬性選擇一個值,以便Hive生成足夠的任務(wù)以完全使用所有可用的executor。

          Hive配置

          Hive on spark 共享了很多hive性能相關(guān)的配置。可以像調(diào)優(yōu)hive on mapreduce一樣調(diào)優(yōu)hive on spark。然而,hive.auto.convert.join.noconditionaltask.size是基于統(tǒng)計(jì)信息將基礎(chǔ)join轉(zhuǎn)化為map join的閾值,可能會對性能產(chǎn)生重大影響。盡管該配置可以用hive on mr和hive on spark,但是兩者的解釋不同。

          數(shù)據(jù)的大小有兩個統(tǒng)計(jì)指標(biāo):

          • totalSize- 數(shù)據(jù)在磁盤上的近似大小
          • rawDataSize- 數(shù)據(jù)在內(nèi)存中的近似大小

          hive on mr用的是totalSize。hive on spark使用的是rawDataSize。由于可能存在壓縮和序列化,這兩個值會有較大的差別。對于hive on spark 需要將 hive.auto.convert.join.noconditionaltask.size指定為更大的值,才能將與hive on mr相同的join轉(zhuǎn)化為map join。

          可以增加此參數(shù)的值,以使地圖連接轉(zhuǎn)換更具兇猛。將common join 轉(zhuǎn)換為 map join 可以提高性能。如果此值設(shè)置得太大,則來自小表的數(shù)據(jù)將使用過多內(nèi)存,任務(wù)可能會因內(nèi)存不足而失敗。根據(jù)群集環(huán)境調(diào)整此值。

          通過參數(shù) hive.stats.collect.rawdatasize 可以控制是否收集 rawDataSize 統(tǒng)計(jì)信息。

          對于hiveserver2,建議再配置兩個額外的參數(shù): hive.stats.fetch.column.stats=truehive.optimize.index.filter=true.

          Hive性能調(diào)優(yōu)通常建議使用以下屬性:

          hive.optimize.reducededuplication.min.reducer=4
          hive.optimize.reducededuplication=true
          hive.merge.mapfiles=true
          hive.merge.mapredfiles=false
          hive.merge.smallfiles.avgsize=16000000
          hive.merge.size.per.task=256000000
          hive.merge.sparkfiles=true
          hive.auto.convert.join=true
          hive.auto.convert.join.noconditionaltask=true
          hive.auto.convert.join.noconditionaltask.size=20M(might?need?to?increase?for?Spark,?200M)
          hive.optimize.bucketmapjoin.sortedmerge=false
          hive.map.aggr.hash.percentmemory=0.5
          hive.map.aggr=true
          hive.optimize.sort.dynamic.partition=false
          hive.stats.autogather=true
          hive.stats.fetch.column.stats=true
          hive.compute.query.using.stats=true
          hive.limit.pushdown.memory.usage=0.4?(MR?and?Spark)
          hive.optimize.index.filter=true
          hive.exec.reducers.bytes.per.reducer=67108864
          hive.smbjoin.cache.rows=10000
          hive.fetch.task.conversion=more
          hive.fetch.task.conversion.threshold=1073741824
          hive.optimize.ppd=true

          預(yù)啟動YARN容器

          在開始新會話后提交第一個查詢時,在查看查詢開始之前可能會遇到稍長的延遲。還會注意到,如果再次運(yùn)行相同的查詢,它的完成速度比第一個快得多。

          Spark執(zhí)行程序需要額外的時間來啟動和初始化yarn上的Spark,這會導(dǎo)致較長的延遲。此外,Spark不會等待所有executor在啟動作業(yè)之前全部啟動完成,因此在將作業(yè)提交到群集后,某些executor可能仍在啟動。但是,對于在Spark上運(yùn)行的作業(yè),作業(yè)提交時可用executor的數(shù)量部分決定了reducer的數(shù)量。當(dāng)就緒executor的數(shù)量未達(dá)到最大值時,作業(yè)可能沒有最大并行度。這可能會進(jìn)一步影響第一個查詢的性能。

          在用戶較長期會話中,這個額外時間不會導(dǎo)致任何問題,因?yàn)樗辉诘谝淮尾樵儓?zhí)行時發(fā)生。然而,諸如Oozie發(fā)起的Hive工作之類的短期繪畫可能無法實(shí)現(xiàn)最佳性能。

          為減少啟動時間,可以在作業(yè)開始前啟用容器預(yù)熱。只有在請求的executor準(zhǔn)備就緒時,作業(yè)才會開始運(yùn)行。這樣,在reduce那一側(cè)不會減少短會話的并行性。

          要啟用預(yù)熱功能,請?jiān)诎l(fā)出查詢之前將hive.prewarm.enabled設(shè)置為true。還可以通過設(shè)置hive.prewarm.numcontainers來設(shè)置容器數(shù)量。默認(rèn)值為10。

          預(yù)熱的executor的實(shí)際數(shù)量受spark.executor.instances(靜態(tài)分配)或spark.dynamicAllocation.maxExecutors(動態(tài)分配)的值限制。hive.prewarm.numcontainers的值不應(yīng)超過分配給用戶會話的值。

          注意:預(yù)熱需要幾秒鐘,對于短會話來說是一個很好的做法,特別是如果查詢涉及reduce階段。但是,如果hive.prewarm.numcontainers的值高于群集中可用的值,則該過程最多可能需要30秒。請謹(jǐn)慎使用預(yù)熱。

          另外,一個完整調(diào)優(yōu)案例你可以參考:https://blog.csdn.net/u010010664/article/details/77066031


          八千里路云和月 | 從零到大數(shù)據(jù)專家學(xué)習(xí)路徑指南

          我們在學(xué)習(xí)Flink的時候,到底在學(xué)習(xí)什么?

          193篇文章暴揍Flink,這個合集你需要關(guān)注一下

          Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS

          Flink CDC我吃定了耶穌也留不住他!| Flink CDC線上問題小盤點(diǎn)

          我們在學(xué)習(xí)Spark的時候,到底在學(xué)習(xí)什么?

          在所有Spark模塊中,我愿稱SparkSQL為最強(qiáng)!

          硬剛Hive | 4萬字基礎(chǔ)調(diào)優(yōu)面試小總結(jié)

          數(shù)據(jù)治理方法論和實(shí)踐小百科全書

          標(biāo)簽體系下的用戶畫像建設(shè)小指南

          4萬字長文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析

          【面試&個人成長】2021年過半,社招和校招的經(jīng)驗(yàn)之談

          大數(shù)據(jù)方向另一個十年開啟 |《硬剛系列》第一版完結(jié)

          我寫過的關(guān)于成長/面試/職場進(jìn)階的文章

          當(dāng)我們在學(xué)習(xí)Hive的時候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」


          你好,我是王知無,一個大數(shù)據(jù)領(lǐng)域的硬核原創(chuàng)作者。

          做過后端架構(gòu)、數(shù)據(jù)中間件、數(shù)據(jù)平臺&架構(gòu)、算法工程化。

          專注大數(shù)據(jù)領(lǐng)域?qū)崟r動態(tài)&技術(shù)提升&個人成長&職場進(jìn)階,歡迎關(guān)注。

          瀏覽 69
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  伊人导航网 | 午夜成人一区二区三区影院在线 | 3www.男人的天堂 | 欧美一级在线免费 | 青娱乐青青草论坛在线 |