訪問Hive數(shù)據(jù)的幾種騷姿勢

關(guān)于Hive訪問的幾個名詞
metadata :hive元數(shù)據(jù),即hive定義的表名,字段名,類型,分區(qū),用戶這些數(shù)據(jù)。一般存儲關(guān)系型書庫mysql中,在測試階段也可以用hive內(nèi)置Derby數(shù)據(jù)庫。
metastore :hivestore服務(wù)端。主要提供將DDL,DML等語句轉(zhuǎn)換為MapReduce,提交到hdfs中。
hiveserver2:hive服務(wù)端。提供hive服務(wù)??蛻舳丝梢酝ㄟ^beeline,jdbc(即用java代碼鏈接)等多種方式鏈接到hive。
beeline:hive客戶端鏈接到hive的一個工具??梢岳斫獬蒻ysql的客戶端。如:navite cat 等。
hive-cli是一個遺留工具,它有兩個主要的使用場景。第一個是它作為Hadoop上SQL的重客戶端,第二個是它作為hiveserver(也就是現(xiàn)在說的"HiveServer1")的命令行工具。但是自從hive1.0 開始hiveserver已被棄用并從代碼庫中刪除,并被替換為HiveServer2因此第二個使用場景不再適用。對于第一個使用場景,Beeline提供或應(yīng)該提供相同的功能,但實(shí)現(xiàn)方式與hivecli不同。
其它語言訪問hive主要是通過hiveserver2服務(wù),HiveServer2(HS2)是一種能使客戶端執(zhí)行Hive查詢的服務(wù)。HiveServer2可以支持對 HiveServer2 的嵌入式和遠(yuǎn)程訪問,支持多客戶端并發(fā)和身份認(rèn)證。旨在為開放API客戶端(如JDBC和ODBC)提供更好的支持。
會啟動一個hive服務(wù)端默認(rèn)端口為:10000,可以通過beeline,jdbc,odbc的方式鏈接到hive。hiveserver2啟動的時候會先檢查有沒有配置hive.metastore.uris,如果沒有會先啟動一個metastore服務(wù),然后在啟動hiveserver2。如果有配置hive.metastore.uris。會連接到遠(yuǎn)程的metastore服務(wù)。這種方式是最常用的。部署在圖如下:
Python訪問HivePython3訪問hive需要安裝的依賴有:
pip3 install thrift
pip3 install PyHive
pip3 install sasl
pip3 install thrift_sasl
這里有一個Python訪問Hive的工具類:
#?-*-?coding:utf-8?-*-
from?pyhive?import?hive
class?HiveClient(object):
????"""docstring?for?HiveClient"""
????def?__init__(self,?host='hadoop-master',port=10000,username='hadoop',password='hadoop',database='hadoop',auth='LDAP'):
????????"""?
????????create?connection?to?hive?server2?
????????"""??
????????self.conn?=?hive.Connection(host=host,??
????????????port=port,??
????????????username=username,??
????????????password=password,??
????????????database=database,
????????????auth=auth)?
????def?query(self,?sql):
????????"""?
????????query?
????????"""?
????????with?self.conn.cursor()?as?cursor:?
????????????cursor.execute(sql)
????????????return?cursor.fetchall()
????def?insert(self,?sql):
????????"""
????????insert?action
????????"""
????????with?self.conn.cursor()?as?cursor:
????????????cursor.execute(sql)
????????????#?self.conn.commit()
????????????#?self.conn.rollback()
????def?close(self):
????????"""?
????????close?connection?
????????"""??
????????self.conn.close()
使用的時候,只需要導(dǎo)入,然后創(chuàng)建一個對象實(shí)例即可,傳入sql調(diào)用query方法完成查詢。
#?拿一個連接
hclient?=?hive.HiveClient()
#?執(zhí)行查詢操作
...
#?關(guān)閉連接
hclient.close()
注意:在insert插入方法中,我將self.conn.commit()和self.conn.rollback()即回滾注釋了,這是傳統(tǒng)關(guān)系型數(shù)據(jù)庫才有的事務(wù)操作,Hive中是不支持的。
Java作為大數(shù)據(jù)的基礎(chǔ)語言,連接hive自然是支持的很好的,這里介紹通過jdbc和mybatis兩種方法連接hive。
1. Jdbc連接
java通過jdbc連接hiveserver,跟傳統(tǒng)的jdbc連接mysql方法一樣。
需要hive-jdbc依賴:
<dependency>
????<groupId>org.apache.hivegroupId>
????<artifactId>hive-jdbcartifactId>
????<version>1.2.1version>
dependency>
代碼跟連接mysql套路一樣,都是使用的DriverManager.getConnection(url, username, password):
@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
public?class?HiveConfigModel?{
????private?String?url?=?"jdbc:hive2://localhost:10000";
????private?String?username?=?"hadoop";
????private?String?password?=?"hadoop";
}
@Test
public?void?test(){
????//?初始化配置
????HiveConfigModel?hiveConfigModel?=?ConfigureContext.getInstance("hive-config.properties")
????????????.addClass(HiveConfigModel.class)
????????????.getModelProperties(HiveConfigModel.class);
????try?{
????????Connection?conn?=?DriverManager.getConnection(hiveConfigModel.getUrl(),
????????????????hiveConfigModel.getUsername(),?hiveConfigModel.getPassword());
????????String?sql?=?"show?tables";
????????PreparedStatement?preparedStatement?=?conn.prepareStatement(sql);
????????ResultSet?rs?=?preparedStatement.executeQuery();
????????List?tables?=?new?ArrayList<>();
????????while?(rs.next()){
????????????tables.add(rs.getString(1));
????????}
????????System.out.println(tables);
????}?catch?(SQLException?e)?{
????????e.printStackTrace();
????}
}
在hive-jdbc-1.2.1.jar的META-INF下有個services目錄,里面有個java.sql.Driver文件,內(nèi)容是:
org.apache.hive.jdbc.HiveDriver
java.sql.DriverManager使用spi實(shí)現(xiàn)了服務(wù)接口與服務(wù)實(shí)現(xiàn)分離以達(dá)到解耦,在這里jdbc的實(shí)現(xiàn)org.apache.hive.jdbc.HiveDriver根據(jù)java.sql.Driver提供的統(tǒng)一規(guī)范實(shí)現(xiàn)邏輯??蛻舳耸褂胘dbc時不需要去改變代碼,直接引入不同的spi接口服務(wù)即可。
DriverManager.getConnection(url,?username,?password)
這樣即可拿到連接,前提是具體實(shí)現(xiàn)需要遵循相應(yīng)的spi規(guī)范。
2. 整合mybatis
通常都會使用mybatis來做dao層訪問數(shù)據(jù)庫,訪問hive也是類似的。
配置文件sqlConfig.xml:
????????"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
????<environments?default="production">
????????<environment?id="production">
????????????<transactionManager?type="JDBC"/>
????????????<dataSource?type="POOLED">
????????????????<property?name="driver"?value="org.apache.hive.jdbc.HiveDriver"/>
????????????????<property?name="url"?value="jdbc:hive2://master:10000/default"/>
????????????????<property?name="username"?value="hadoop"/>
????????????????<property?name="password"?value="hadoop"/>
????????????dataSource>
????????environment>
????environments>
????<mappers>
????????<mapper?resource="mapper/hive/test/test.xml"/>
????mappers>
configuration>
mapper代碼省略,實(shí)現(xiàn)代碼:
public?classTestMapperImpl?implements?TestMapper?{
????private?static?SqlSessionFactory?sqlSessionFactory?=?HiveSqlSessionFactory.getInstance().getSqlSessionFactory();
????@Override
????public?int?getTestCount(String?dateTime)?{
????????SqlSession?sqlSession?=?sqlSessionFactory.openSession();
????????TestMapper?testMapper?=?sqlSession.getMapper(TestMapper.class);
????????int?count?=?testMapper.getTestCount(dateTime);
????????sqlSession.close();
????????return?count;
????}
}
3. 整合SpringBoot
公司內(nèi)部各個部門人員是層次不齊的,不可能都會使用大數(shù)據(jù)分析后臺,更不會寫sql,這時候可以開發(fā)一套自助取數(shù)系統(tǒng),通過頁面操作即可獲取相應(yīng)的數(shù)據(jù),這時候通常需要使用SpringBoot連接mysql和Hive生成報(bào)表。SpringBoot整合Hive這里整合了Druid連接池。
需要完成的任務(wù)
每個人都可以在
web頁面寫sql,完成Hive查詢?nèi)蝿?wù);查詢數(shù)據(jù)量不能太大,不要超過60天數(shù)據(jù)量(那將是災(zāi)難);
提交查詢?nèi)蝿?wù)后,獲取
yarn的資源情況,如果緊張,則拒絕;后臺將異常,以及拒絕服務(wù)的原因通過拋出異常,反饋信息給前臺頁面;
如果前面有人查過了會將結(jié)果存入
mysql,第二個人查詢,無需再查詢Hive,只需要從mysql里面??;
1) 需要的依賴
為了節(jié)省篇幅,這里給出hiveserver2方式連接hive主要的maven依賴,父工程springboot依賴省略。
<properties>
????<hadoop.version>2.6.5hadoop.version>
????<mybatis.version>3.2.7mybatis.version>
properties>
<dependency>
????<groupId>org.mybatisgroupId>
????<artifactId>mybatisartifactId>
????<version>${mybatis.version}version>
dependency>
<dependency>
????<groupId>org.apache.hadoopgroupId>
????<artifactId>hadoop-commonartifactId>
????<version>${hadoop.version}version>
dependency>
<dependency>
????<groupId>org.apache.hivegroupId>
????<artifactId>hive-jdbcartifactId>
????<version>1.2.1version>
dependency>
<dependency>
????<groupId>org.jsoupgroupId>
????<artifactId>jsoupartifactId>
????<version>1.8.3version>
dependency>
2)application-test.yml文件:
#?Spring配置
spring:
??#?數(shù)據(jù)源配置
??datasource:
????type:?com.alibaba.druid.pool.DruidDataSource
????driverClassName:?com.mysql.cj.jdbc.Driver
????druid:
??????#?主庫數(shù)據(jù)源
??????master:
????????url:?jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=GMT%2B8
????????username:?root
????????password:?root
??????#?從庫數(shù)據(jù)源
??????slave:
????????#?從數(shù)據(jù)源開關(guān)/默認(rèn)關(guān)閉
????????enabled:?true
????????url:?jdbc:mysql://localhost:3306/test2?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=GMT%2B8
????????username:?root
????????password:?root
??????#?從庫數(shù)據(jù)源2
??????#?...省略...
??????#?hive數(shù)據(jù)源
??????slave3:
??????#?從數(shù)據(jù)源開關(guān)/默認(rèn)關(guān)閉
????????enabled:?true
????????driverClassName:?org.apache.hive.jdbc.HiveDriver
????????url:?jdbc:hive2://master:10000/default
????????username:?hive
????????password:?hive
??????#?初始連接數(shù)
??????initialSize:?5
??????#?最小連接池?cái)?shù)量
??????minIdle:?10
??????#?最大連接池?cái)?shù)量
??????maxActive:?20
??????#?配置獲取連接等待超時的時間
??????maxWait:?60000
??????#?配置間隔多久才進(jìn)行一次檢測,檢測需要關(guān)閉的空閑連接,單位是毫秒
??????timeBetweenEvictionRunsMillis:?60000
??????#?配置一個連接在池中最小生存的時間,單位是毫秒
??????minEvictableIdleTimeMillis:?300000
??????#?配置一個連接在池中最大生存的時間,單位是毫秒
??????maxEvictableIdleTimeMillis:?900000
這里數(shù)據(jù)源配置了mysql和Hive,默認(rèn)情況下是使用主庫master數(shù)據(jù)源,是訪問mysql的,使用的時候只需要在mapper層進(jìn)行切換即可。
代碼實(shí)現(xiàn)跟其它程序一樣,都是mapper、service、controller層,套路一模一樣。一共設(shè)置了實(shí)時和離線兩個yarn資源隊(duì)列,由于其它部門人使用可能存在隊(duì)列壓力過大的情況,需要對數(shù)據(jù)量按照每次查詢的數(shù)據(jù)范圍不超過60天來限制,和此時集群使用資源不能大于55%,這里重點(diǎn)說明一下controller層對數(shù)據(jù)量的預(yù)防。
實(shí)體類UserModel:
@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
public?class?UserModel?extends?BaseEntity{
????private?String?userId;
????private?Integer?count;
}
3) ?集群資源使用率不大于55%
因?yàn)楹芏鄻I(yè)務(wù)查詢邏輯controller都要用到數(shù)據(jù)量防御過大的問題,這里使用了被Spring切面關(guān)聯(lián)的注解來標(biāo)識controller。
定義切面YarnResourceAspect,并且關(guān)聯(lián)注解@YarnResource
@Target({ElementType.TYPE,?ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public?@interface?YarnResource?{
}
@Aspect
@Component
public?class?YarnResourceAspect?{
????private?static?final?Logger?log?=?LoggerFactory.getLogger(YarnResourceAspect.class);
????/**
?????*?配置切入點(diǎn)
?????*/
????@Pointcut("@annotation(com.ruoyi.common.annotation.YarnResource)")
????public?void?yarnResourcdPointCut(){
????}
????/**
?????*?檢查yarn的資源是否可用
?????*/
????@Before("yarnResourcdPointCut()")
????public?void?before(){
????????log.info("************************************檢查yarn的資源是否可用*******************************");
????????//?yarn資源緊張
????????if(!YarnClient.yarnResourceOk()){
????????????throw?new?InvalidStatusException();
????????}
????}
}
4) 獲取yarn的資源使用數(shù)據(jù):
因?yàn)樘峤蝗蝿?wù)的時間是不定的,我們需要根據(jù)用戶提交時候的yarn資源狀態(tài)來判斷當(dāng)前是否能執(zhí)行Hive查詢,以免影響線上任務(wù)。
@Slf4j
public?class?YarnClient?{
????/**
?????*?yarn資源不能超過多少
?????*/
????private?static?final?int?YARN_RESOURCE?=?55;
????/**
?????*
?????*?@return?true?:?表示資源正常,?false:?資源緊張
?????*/
????public?static?boolean?yarnResourceOk()?{
????????try?{
????????????URL?url?=?new?URL("http://master:8088/cluster/scheduler");
????????????HttpURLConnection?conn?=?null;
????????????conn?=?(HttpURLConnection)?url.openConnection();
????????????conn.setRequestMethod("GET");
????????????conn.setUseCaches(false);
????????????//?請求超時5秒
????????????conn.setConnectTimeout(5000);
????????????//?設(shè)置HTTP頭:
????????????conn.setRequestProperty("Accept",?"*/*");
????????????conn.setRequestProperty("User-Agent",?"Mozilla/5.0?(Windows?NT?6.1;?Win64;?x64)?AppleWebKit/537.36?(KHTML,?like?Gecko)?Chrome/86.0.4240.111?Safari/537.36");
????????????//?連接并發(fā)送HTTP請求:
????????????conn.connect();
????????????//?判斷HTTP響應(yīng)是否200:
????????????if?(conn.getResponseCode()?!=?200)?{
????????????????throw?new?RuntimeException("bad?response");
????????????}
????????????//?獲取所有響應(yīng)Header:
????????????Map>?map?=?conn.getHeaderFields();
????????????for?(String?key?:?map.keySet())?{
????????????????System.out.println(key?+?":?"?+?map.get(key));
????????????}
????????????//?獲取響應(yīng)內(nèi)容:
????????????InputStream?input?=?conn.getInputStream();
????????????byte[]?datas?=?null;
????????????try?{
????????????????//?從輸入流中讀取數(shù)據(jù)
????????????????datas?=?readInputStream(input);
????????????}?catch?(Exception?e)?{
????????????????e.printStackTrace();
????????????}
????????????String?result?=?new?String(datas,?"UTF-8");//?將二進(jìn)制流轉(zhuǎn)為String
????????????Document?document?=?Jsoup.parse(result);
????????????Elements?elements?=?document.getElementsByClass("qstats");
????????????String[]?ratios?=?elements.text().split("used");
????????????return?Double.valueOf(ratios[3].replace("%",?""))?????????}?catch?(IOException?e)?{
????????????log.error("yarn資源獲取失敗");
????????}
????????return?false;
????}
????private?static?byte[]?readInputStream(InputStream?inStream)?throws?Exception?{
????????ByteArrayOutputStream?outStream?=?new?ByteArrayOutputStream();
????????byte[]?buffer?=?new?byte[1024];
????????int?len?=?0;
????????while?((len?=?inStream.read(buffer))?!=?-1)?{
????????????outStream.write(buffer,?0,?len);
????????}
????????byte[]?data?=?outStream.toByteArray();
????????outStream.close();
????????inStream.close();
????????return?data;
????}
}
5) 在controller上通過注解@YarnResource標(biāo)識:
@Controller
@RequestMapping("/hero/hive")
public?class?HiveController?{
????/**
?????*?html?文件地址前綴
?????*/
????private?String?prefix?=?"hero";
????@Autowired
????IUserService?iUserService;
????@RequestMapping("")
????@RequiresPermissions("hero:hive:view")
????public?String?heroHive(){
????????return?prefix?+?"/hive";
????}
????@YarnResource
????@RequestMapping("/user")
????@RequiresPermissions("hero:hive:user")
????@ResponseBody
????public?TableDataInfo?user(UserModel?userModel){
????????DateCheckUtils.checkInputDate(userModel);
????????PageInfo?pageInfo?=?iUserService.queryUser(userModel);
????????TableDataInfo?tableDataInfo?=?new?TableDataInfo();
????????tableDataInfo.setTotal(pageInfo.getTotal());
????????tableDataInfo.setRows(pageInfo.getList());
????????return?tableDataInfo;
????}
}
6) 查詢數(shù)據(jù)跨度不超過60天檢查
這樣每次請求進(jìn)入controller的時候就會自動檢查查詢的日期是否超過60天了,防止載入數(shù)據(jù)過多,引發(fā)其它任務(wù)資源不夠。
public?class?DateCheckUtils?{
????/**
?????*?對前臺傳入過來的日期進(jìn)行判斷,防止查詢大量數(shù)據(jù),造成集群負(fù)載過大
?????*?@param?o
?????*/
????public?static?void?checkInputDate(BaseEntity?o){
????????if("".equals(o.getParams().get("beginTime"))?&&?"".equals(o.getParams().get("endTime"))){
????????????throw?new?InvalidTaskException();
????????}
????????String?beginTime?=?"2019-01-01";
????????String?endTime?=?DateUtils.getDate();
????????if(!"".equals(o.getParams().get("beginTime"))){
????????????beginTime?=?String.valueOf(o.getParams().get("beginTime"));
????????}
????????if(!"".equals(o.getParams().get("endTime"))){
????????????endTime?=?String.valueOf(o.getParams().get("endTime"));
????????}
????????//?查詢數(shù)據(jù)時間跨度大于兩個月
????????if(DateUtils.getDayBetween(beginTime,?endTime)?>?60){
????????????throw?new?InvalidTaskException();
????????}
????}
}
這里訪問hive肯定需要切換數(shù)據(jù)源的,因?yàn)槠渌撁孢€有對mysql的數(shù)據(jù)訪問,需要注意一下。
7) 每次查詢結(jié)果都會入mysql
前面有人查詢過了,會將數(shù)據(jù)保持到mysql,再返回到頁面,后面另外部門第二個人查詢時候,先從mysql取數(shù)據(jù),如果沒有,就從Hive里面查詢。下面這部分代碼也是controller里面的,這里單獨(dú)拎出來了。
//?首先從mysql查,沒有再從hive查,mysql相當(dāng)于一個緩存介質(zhì)
PageInfo?pageInfo?=?iToplocationService.queryToplocation(toplocationCountModel);
if(pageInfo.getList().size()?>?0){
????log.info("數(shù)據(jù)exists,?直接從mysql獲取...");
????tableDataInfo.setTotal(pageInfo.getTotal());
????tableDataInfo.setRows(pageInfo.getList());
}else?if(iToplocationService.queryExistsToplocation(toplocationCountModel)?==?null){
????log.info("從hive中查詢數(shù)據(jù)...");
????PageInfo?pageInfo2?=?iToplocationService.query(toplocationCountModel);
????//?保存到mysql
????log.info("批量保存到mysql...");
????List?toplocationCountModels?=?pageInfo2.getList();
????int?i?=?0;
????while?(i?????????if(toplocationCountModels.size()?-?i?>?10000){
????????????iToplocationService.insertToplocation(toplocationCountModels.subList(i,?i?+?10000));
????????}else{
????????????iToplocationService.insertToplocation(toplocationCountModels.subList(i,?toplocationCountModels.size()));
????????}
????????i?=?i?+?10000;
????}
目前功能看起來很簡單,沒有用到什么高大上的東西,后面慢慢完善。
