Hive 如何快速拉取大批量數(shù)據(jù)
用hive來做數(shù)倉類操作,或者大數(shù)據(jù)的運算,是沒有疑問的,至少在你沒有更多選擇之前。
當我們要hive來做類似于大批量數(shù)據(jù)的select時,也許問題就會發(fā)生了變化。
1:通用解決方案:分頁拉取
首先,我們要基于一個事實,就是沒有哪個數(shù)據(jù)庫可以無限制的提供我們select任意數(shù)據(jù)量的數(shù)據(jù)。比如常用的 mysql, oracle, 一般你select 10w左右的數(shù)據(jù)量時已經(jīng)非常厲害了。而我們的解決方法也比較簡單,那就是分頁獲取,比如我一頁取1w條,直到取完為止。同樣,因為hive基于都支持sql92協(xié)議,所以你也可以同樣的方案去解決大數(shù)據(jù)量的問題。
分頁的解決方案會有什么問題?首先,我們要明白分頁是如何完成的,首先數(shù)據(jù)庫server會根據(jù)條件運算出所有或部分符合條件的數(shù)據(jù)(取決是否有額外的排序),然后再根據(jù)分頁偏移信息,獲取相應的數(shù)據(jù)。所以,一次次的分頁,則必定涉及到一次次的數(shù)據(jù)運算。這在小數(shù)據(jù)量的情況下是可以接受的,因為計算機的高速運轉能力。但是當數(shù)據(jù)量大到一定程度時,就不行了。比如我們停滯了許多年的大數(shù)據(jù)領域解決方案就是很好的證明。
本文基于hive處理數(shù)據(jù),也就是說數(shù)據(jù)量自然也是大到了一定的級別,那么用分頁也許就不好解決問題了。比如,單次地運算也許就是3-5分鐘(基于分布式并行計算系統(tǒng)能力),當你要select 100w數(shù)據(jù)時,如果用一頁1w的運算,那么就是100次來回,1次3-5分鐘,100次就是5-8小時的時間,這就完全jj了。誰能等這么長時間?這樣處理的最終結果就是,業(yè)務被砍掉,等著財務結賬了。
所以,我們得改變點什么!
2. 使用hive-jdbc:持續(xù)輸出
jdbc本身不算啥,只是一個連接協(xié)議。但它的好處在于,可以維持長連接。這個連接有個好處,就是server可以隨時輸出數(shù)據(jù),而client端則可以隨時處理數(shù)據(jù)。這就給了我們一個機會,即比如100w的數(shù)據(jù)運算好之后,server只需源源不斷的輸出結果,而client端則源源不斷地接收處理數(shù)據(jù)。
所以,我們解決方案是,基于hive-jdbc, 不使用分頁,而全量獲取數(shù)據(jù)即可。這給我們帶來莫大的好處,即一次運算即可。比如1次運算3-5分鐘,那么總共的運算也就是3-5分鐘。
看起來不錯,解決了重復運算的問題。好似萬事大吉了。
具體實現(xiàn)就是引入幾個hive-jdbc的依賴,然后提交查詢,依次獲取結果即可。樣例如下:
<!-- pom 依賴 --><!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc --><dependency><groupId>org.apache.hive</groupId><artifactId>hive-jdbc</artifactId><version>2.3.4</version></dependency>
// 測試hive-jdbcimport java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.sql.DriverManager;public class HiveJdbcTest {private static Connection conn = getConnnection();private static PreparedStatement ps;private static ResultSet rs;// 獲取所有數(shù)據(jù)public static void getAll(String tablename) {String sql="select * from " + tablename;System.out.println(sql);try {ps = prepare(conn, sql);rs = ps.executeQuery();int columns = rs.getMetaData().getColumnCount();while(rs.next()) {for(int i=1;i<=columns;i++) {System.out.print(rs.getString(i));System.out.print("\t\t");}System.out.println();}}catch (SQLException e) {e.printStackTrace();}}// 測試public static void main(String[] args) {String tablename="t1";HiveJdbcTest.getAll(tablename);}private static String driverName = "org.apache.hive.jdbc.HiveDriver";private static String url = "jdbc:hive2://127.0.0.1:10000/";private static Connection conn;// 連接hive庫public static Connection getConnnection() {try {Class.forName(driverName);conn = DriverManager.getConnection(url, "hive", "123");}catch(ClassNotFoundException e) {e.printStackTrace();}catch (SQLException e) {e.printStackTrace();}return conn;}public static PreparedStatement prepare(Connection conn, String sql) {PreparedStatement ps = null;try {ps = conn.prepareStatement(sql);}catch (SQLException e) {e.printStackTrace();}return ps;}}
樣例代碼,無需糾結。簡單的jdbc操作樣板??傮w來說就是,不帶分頁的接收全量數(shù)據(jù)。
但是,這個會有什么問題?同樣,小數(shù)據(jù)量時無任何疑問,但當數(shù)據(jù)量足夠大時,每一次的數(shù)據(jù)接收,都需要一次網(wǎng)絡通信請求,且都是單線程的。我們假設接受一條數(shù)據(jù)花費1ms, 那么接收1000條數(shù)就是1s, 6w條數(shù)據(jù)就是1min。360w條數(shù)據(jù)就是1h, 額,后面就無需再算了。同樣是不可估量的時間消耗。(實際情況也許會好點,因為會有buffer緩沖的存在)
為什么會這樣呢?運算量已經(jīng)減小了,但是這網(wǎng)絡通信量,我們又能如何?實際上,問題不在于網(wǎng)絡通信問題,而在于我們使用這種方式,使我們從并行計算轉到了串行計算的過程了。因為只有單點的數(shù)據(jù)接收,所以只能將數(shù)據(jù)匯集處理。從而就是一個串行化的東西了。
所以,我們更多應該從并行這一層面去解決問題。
3. 基于臨時表實現(xiàn):高效并行
要解決并行變串行的問題,最根本的辦法就是避免一條條讀取數(shù)據(jù)。而要避免這個問題,一個很好想到的辦法就是使用臨時表,繞開自己代碼的限制。讓大數(shù)據(jù)集群自行處理并行計算問題,這是個不錯的想法。
但具體如何做呢?我們面臨至少這么幾個問題:
1. 如何將數(shù)據(jù)寫入臨時表?
2. 寫入臨時表的數(shù)據(jù)如何取回?是否存在瓶頸問題?
3. 臨時表后續(xù)如何處理?
我們一個個問題來,第1個,如何寫臨時表問題:我們可以選擇先創(chuàng)建一個臨時表,然后再使用insert into select ... from ... 的方式寫入,但這種方式非常費力,首先你得固化下臨時表的數(shù)據(jù)結構,其次你要處理多次寫入問題??雌饋聿皇亲詈玫霓k法。幸好,hive中或者相關數(shù)據(jù)庫產(chǎn)品都提供了另一種更方便的建臨時表的方法: create table xxx as select ... from ... 你只需要使用一個語句就可以將結果寫入到臨時表了。但需要注意的是,我們創(chuàng)建時,需要指定好我們需要的格式,否則最終結果也許不是我們想要的,比如我們需要使用','分隔數(shù)據(jù)而非tab, 我們需要使用 text 形式的數(shù)據(jù),而非壓縮的二進制格式。
以下是個使用樣例:
-- 外部使用 create table 包裹CREATE TABLE tmp_2020110145409001ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','STORED AS TEXTFILE as-- 具體的業(yè)務select sqlselect t1.*, t2.* from test t1 left join test2 t2 on t1.id = t2.t_id;
如此,我們就得到所需的結果了。以上結果,在hive中表現(xiàn)為一個臨時表。而其背后則是一個個切分的文件,以','號分隔的文本文件,且會按照hive的默認存儲目錄存放。(更多具體語法請查詢官網(wǎng)資料)
接下來,我們要解決第2個問題:如何將數(shù)據(jù)取回?這個問題也不難,首先,現(xiàn)在結果已經(jīng)有了,我們可以一行行地讀取返回,就像前面一樣。但這時已經(jīng)沒有了數(shù)據(jù)運算,應該會好很多。但明顯還是不夠好,我們仍然需要反復的網(wǎng)絡通信。我們知道,hive存儲的背后,是一個個切分的文件,如果我們能夠將該文件直接下載下來,那將會是非常棒的事。不錯,最好的辦法就是,直接下載hive的數(shù)據(jù)文件,hive會在存儲目錄下,以類似于 part_0000, part_0001... 之類的文件存放。
那么,我們如何才能下載到這些文件呢?hive是基于hadoop的,所以,很明顯我們要回到這個問題,基于hadoop去獲取這些文件。即 hdfs 獲取,命令如下:
// 查看所有分片數(shù)據(jù)文件列表hdfs dfs -ls hdfs://xx/hive/mydb.db/*// 下載所有數(shù)據(jù)文件到 /tmp/local_hdfs 目錄hdfs dfs -get hdfs://xx/hive/mydb.db/* /tmp/local_hdfs
我們可以通過以上命令,將數(shù)據(jù)文件下載到本地,也可以hdfs的jar包,使用 hdfs-client 進行下載。優(yōu)缺點是:使用cli的方式簡單穩(wěn)定但依賴于服務器環(huán)境,而使用jar包的方式則部署方便但需要自己寫更多代碼保證穩(wěn)定性。各自選擇即可?!?/span>
最后,我們還剩下1個問題:如何處理臨時表的問題?hive目前尚不支持設置表的生命周期(阿里云的maxcompute則只是一個 lifecycle 選項的問題),所以,需要自行清理文件。這個問題的實現(xiàn)方式很多,比如你可以自行記錄這些臨時表的創(chuàng)建時間、位置、過期時間,然后再每天運行腳本清理表即可。再簡單點就是你可以直接通過表名進行清理,比如你以年月日作為命令開頭,那么你可以根據(jù)這日期刪除臨時表即可。如:
-- 列舉表名show tables like 'dbname.tmp_20201101*';-- 刪除具體表名drop table dbname.tmp_2020110100001 ;
至此,我們的所有問題已解決??偨Y下:首先使用臨時表并行地將結果寫入;其次通過hdfs將文件快速下載到本地即可;最后需要定時清理臨時表;這樣,你就可以高效,無限制的為用戶拉取大批量數(shù)據(jù)了。
不過需要注意的是,我們的步驟從1個步驟變成了3個步驟,增加了復雜度。(實際上你可能還會處理更多的問題,比如元數(shù)據(jù)信息的對應問題)復雜度增加的最大問題就在于,它會帶來更多的問題,所以我們一定要善于處理好這些問題,否則將會帶來一副作用。

騰訊、阿里、滴滴后臺面試題匯總總結 — (含答案)
面試:史上最全多線程面試題 !
最新阿里內推Java后端面試題
JVM難學?那是因為你沒認真看完這篇文章

關注作者微信公眾號 —《JAVA爛豬皮》
了解更多java后端架構知識以及最新面試寶典


看完本文記得給作者點贊+在看哦~~~大家的支持,是作者源源不斷出文的動力
作者:等你歸去來
出處:https://www.cnblogs.com/yougewe/p/13909575.html
