<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          訪問Hive數(shù)據(jù)的幾種騷姿勢

          共 10433字,需瀏覽 21分鐘

           ·

          2020-12-23 01:19

          1287e5096b7f1558bd41df0ee721a166.webp

          關(guān)于Hive訪問的幾個名詞

          • metadata :hive元數(shù)據(jù),即hive定義的表名,字段名,類型,分區(qū),用戶這些數(shù)據(jù)。一般存儲關(guān)系型書庫mysql中,在測試階段也可以用hive內(nèi)置Derby數(shù)據(jù)庫。

          • metastore :hivestore服務(wù)端。主要提供將DDL,DML等語句轉(zhuǎn)換為MapReduce,提交到hdfs中。

          • hiveserver2:hive服務(wù)端。提供hive服務(wù)??蛻舳丝梢酝ㄟ^beeline,jdbc(即用java代碼鏈接)等多種方式鏈接到hive。

          • beeline:hive客戶端鏈接到hive的一個工具??梢岳斫獬蒻ysql的客戶端。如:navite cat 等。

          hive-cli是一個遺留工具,它有兩個主要的使用場景。第一個是它作為Hadoop上SQL的重客戶端,第二個是它作為hiveserver(也就是現(xiàn)在說的"HiveServer1")的命令行工具。但是自從hive1.0 開始hiveserver已被棄用并從代碼庫中刪除,并被替換為HiveServer2因此第二個使用場景不再適用。對于第一個使用場景,Beeline提供或應(yīng)該提供相同的功能,但實(shí)現(xiàn)方式與hivecli不同。

          其它語言訪問hive主要是通過hiveserver2服務(wù),HiveServer2(HS2)是一種能使客戶端執(zhí)行Hive查詢的服務(wù)。HiveServer2可以支持對 HiveServer2 的嵌入式和遠(yuǎn)程訪問,支持多客戶端并發(fā)和身份認(rèn)證。旨在為開放API客戶端(如JDBC和ODBC)提供更好的支持。

          會啟動一個hive服務(wù)端默認(rèn)端口為:10000,可以通過beeline,jdbc,odbc的方式鏈接到hive。hiveserver2啟動的時候會先檢查有沒有配置hive.metastore.uris,如果沒有會先啟動一個metastore服務(wù),然后在啟動hiveserver2。如果有配置hive.metastore.uris。會連接到遠(yuǎn)程的metastore服務(wù)。這種方式是最常用的。部署在圖如下:

          e50abc0897530a2e436a4c945ddd21b0.webpPython訪問Hive

          Python3訪問hive需要安裝的依賴有:

          • pip3 install thrift

          • pip3 install PyHive

          • pip3 install sasl

          • pip3 install thrift_sasl

          這里有一個Python訪問Hive的工具類:

          #?-*-?coding:utf-8?-*-

          from?pyhive?import?hive


          class?HiveClient(object):
          ????"""docstring?for?HiveClient"""
          ????def?__init__(self,?host='hadoop-master',port=10000,username='hadoop',password='hadoop',database='hadoop',auth='LDAP'):
          ????????"""?
          ????????create?connection?to?hive?server2?
          ????????"""??
          ????????self.conn?=?hive.Connection(host=host,??
          ????????????port=port,??
          ????????????username=username,??
          ????????????password=password,??
          ????????????database=database,
          ????????????auth=auth)?

          ????def?query(self,?sql):
          ????????"""
          ?
          ????????query?
          ????????"""?
          ????????with?self.conn.cursor()?as?cursor:?
          ????????????cursor.execute(sql)
          ????????????return?cursor.fetchall()

          ????def?insert(self,?sql):
          ????????"""

          ????????insert?action
          ????????"""
          ????????with?self.conn.cursor()?as?cursor:
          ????????????cursor.execute(sql)
          ????????????#?self.conn.commit()
          ????????????#?self.conn.rollback()

          ????def?close(self):
          ????????"""
          ?
          ????????close?connection?
          ????????"""??
          ????????self.conn.close()

          使用的時候,只需要導(dǎo)入,然后創(chuàng)建一個對象實(shí)例即可,傳入sql調(diào)用query方法完成查詢。

          #?拿一個連接
          hclient?=?hive.HiveClient()

          #?執(zhí)行查詢操作
          ...

          #?關(guān)閉連接
          hclient.close()

          注意:在insert插入方法中,我將self.conn.commit()self.conn.rollback()即回滾注釋了,這是傳統(tǒng)關(guān)系型數(shù)據(jù)庫才有的事務(wù)操作,Hive中是不支持的。

          Java連接Hive

          Java作為大數(shù)據(jù)的基礎(chǔ)語言,連接hive自然是支持的很好的,這里介紹通過jdbc和mybatis兩種方法連接hive。

          1. Jdbc連接

          java通過jdbc連接hiveserver,跟傳統(tǒng)的jdbc連接mysql方法一樣。

          需要hive-jdbc依賴:

          <dependency>
          ????<groupId>org.apache.hivegroupId>
          ????<artifactId>hive-jdbcartifactId>
          ????<version>1.2.1version>
          dependency>

          代碼跟連接mysql套路一樣,都是使用的DriverManager.getConnection(url, username, password)

          @NoArgsConstructor
          @AllArgsConstructor
          @Data
          @ToString
          public?class?HiveConfigModel?{

          ????private?String?url?=?"jdbc:hive2://localhost:10000";
          ????private?String?username?=?"hadoop";
          ????private?String?password?=?"hadoop";

          }

          @Test
          public?void?test(){
          ????//?初始化配置
          ????HiveConfigModel?hiveConfigModel?=?ConfigureContext.getInstance("hive-config.properties")
          ????????????.addClass(HiveConfigModel.class)
          ????????????.getModelProperties(HiveConfigModel.class);

          ????try?{
          ????????Connection?conn?=?DriverManager.getConnection(hiveConfigModel.getUrl(),
          ????????????????hiveConfigModel.getUsername(),?hiveConfigModel.getPassword());


          ????????String?sql?=?"show?tables";
          ????????PreparedStatement?preparedStatement?=?conn.prepareStatement(sql);
          ????????ResultSet?rs?=?preparedStatement.executeQuery();
          ????????List?tables?=?new?ArrayList<>();
          ????????while?(rs.next()){
          ????????????tables.add(rs.getString(1));
          ????????}

          ????????System.out.println(tables);
          ????}?catch?(SQLException?e)?{
          ????????e.printStackTrace();
          ????}
          }

          hive-jdbc-1.2.1.jarMETA-INF下有個services目錄,里面有個java.sql.Driver文件,內(nèi)容是:

          org.apache.hive.jdbc.HiveDriver

          java.sql.DriverManager使用spi實(shí)現(xiàn)了服務(wù)接口與服務(wù)實(shí)現(xiàn)分離以達(dá)到解耦,在這里jdbc的實(shí)現(xiàn)org.apache.hive.jdbc.HiveDriver根據(jù)java.sql.Driver提供的統(tǒng)一規(guī)范實(shí)現(xiàn)邏輯??蛻舳耸褂胘dbc時不需要去改變代碼,直接引入不同的spi接口服務(wù)即可。

          DriverManager.getConnection(url,?username,?password)

          這樣即可拿到連接,前提是具體實(shí)現(xiàn)需要遵循相應(yīng)的spi規(guī)范。

          2. 整合mybatis

          通常都會使用mybatis來做dao層訪問數(shù)據(jù)庫,訪問hive也是類似的。

          配置文件sqlConfig.xml


          ????????"http://mybatis.org/dtd/mybatis-3-config.dtd">
          <configuration>
          ????<environments?default="production">
          ????????<environment?id="production">
          ????????????<transactionManager?type="JDBC"/>
          ????????????<dataSource?type="POOLED">
          ????????????????<property?name="driver"?value="org.apache.hive.jdbc.HiveDriver"/>
          ????????????????<property?name="url"?value="jdbc:hive2://master:10000/default"/>
          ????????????????<property?name="username"?value="hadoop"/>
          ????????????????<property?name="password"?value="hadoop"/>
          ????????????dataSource>
          ????????environment>
          ????environments>
          ????<mappers>
          ????????<mapper?resource="mapper/hive/test/test.xml"/>
          ????mappers>
          configuration>

          mapper代碼省略,實(shí)現(xiàn)代碼:

          public?classTestMapperImpl?implements?TestMapper?{

          ????private?static?SqlSessionFactory?sqlSessionFactory?=?HiveSqlSessionFactory.getInstance().getSqlSessionFactory();

          ????@Override
          ????public?int?getTestCount(String?dateTime)?{
          ????????SqlSession?sqlSession?=?sqlSessionFactory.openSession();
          ????????TestMapper?testMapper?=?sqlSession.getMapper(TestMapper.class);

          ????????int?count?=?testMapper.getTestCount(dateTime);

          ????????sqlSession.close();

          ????????return?count;
          ????}
          }

          3. 整合SpringBoot

          公司內(nèi)部各個部門人員是層次不齊的,不可能都會使用大數(shù)據(jù)分析后臺,更不會寫sql,這時候可以開發(fā)一套自助取數(shù)系統(tǒng),通過頁面操作即可獲取相應(yīng)的數(shù)據(jù),這時候通常需要使用SpringBoot連接mysqlHive生成報(bào)表。SpringBoot整合Hive這里整合了Druid連接池。


          需要完成的任務(wù)

          • 每個人都可以在web頁面寫sql,完成Hive查詢?nèi)蝿?wù);

          • 查詢數(shù)據(jù)量不能太大,不要超過60天數(shù)據(jù)量(那將是災(zāi)難);

          • 提交查詢?nèi)蝿?wù)后,獲取yarn的資源情況,如果緊張,則拒絕;

          • 后臺將異常,以及拒絕服務(wù)的原因通過拋出異常,反饋信息給前臺頁面;

          • 如果前面有人查過了會將結(jié)果存入mysql,第二個人查詢,無需再查詢Hive,只需要從mysql里面??;


          1) 需要的依賴

          為了節(jié)省篇幅,這里給出hiveserver2方式連接hive主要的maven依賴,父工程springboot依賴省略。


          <properties>
          ????<hadoop.version>2.6.5hadoop.version>
          ????<mybatis.version>3.2.7mybatis.version>
          properties>
          <dependency>
          ????<groupId>org.mybatisgroupId>
          ????<artifactId>mybatisartifactId>
          ????<version>${mybatis.version}version>
          dependency>


          <dependency>
          ????<groupId>org.apache.hadoopgroupId>
          ????<artifactId>hadoop-commonartifactId>
          ????<version>${hadoop.version}version>
          dependency>



          <dependency>
          ????<groupId>org.apache.hivegroupId>
          ????<artifactId>hive-jdbcartifactId>
          ????<version>1.2.1version>
          dependency>


          <dependency>
          ????<groupId>org.jsoupgroupId>
          ????<artifactId>jsoupartifactId>
          ????<version>1.8.3version>
          dependency>

          2)application-test.yml文件:

          #?Spring配置
          spring:
          ??#?數(shù)據(jù)源配置
          ??datasource:
          ????type:?com.alibaba.druid.pool.DruidDataSource
          ????driverClassName:?com.mysql.cj.jdbc.Driver
          ????druid:
          ??????#?主庫數(shù)據(jù)源
          ??????master:
          ????????url:?jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=GMT%2B8
          ????????username:?root
          ????????password:?root
          ??????#?從庫數(shù)據(jù)源
          ??????slave:
          ????????#?從數(shù)據(jù)源開關(guān)/默認(rèn)關(guān)閉
          ????????enabled:?true
          ????????url:?jdbc:mysql://localhost:3306/test2?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=GMT%2B8
          ????????username:?root
          ????????password:?root
          ??????#?從庫數(shù)據(jù)源2
          ??????#?...省略...
          ??????#?hive數(shù)據(jù)源
          ??????slave3:
          ??????#?從數(shù)據(jù)源開關(guān)/默認(rèn)關(guān)閉
          ????????enabled:?true
          ????????driverClassName:?org.apache.hive.jdbc.HiveDriver
          ????????url:?jdbc:hive2://master:10000/default
          ????????username:?hive
          ????????password:?hive
          ??????#?初始連接數(shù)
          ??????initialSize:?5
          ??????#?最小連接池?cái)?shù)量
          ??????minIdle:?10
          ??????#?最大連接池?cái)?shù)量
          ??????maxActive:?20
          ??????#?配置獲取連接等待超時的時間
          ??????maxWait:?60000
          ??????#?配置間隔多久才進(jìn)行一次檢測,檢測需要關(guān)閉的空閑連接,單位是毫秒
          ??????timeBetweenEvictionRunsMillis:?60000
          ??????#?配置一個連接在池中最小生存的時間,單位是毫秒
          ??????minEvictableIdleTimeMillis:?300000
          ??????#?配置一個連接在池中最大生存的時間,單位是毫秒
          ??????maxEvictableIdleTimeMillis:?900000

          這里數(shù)據(jù)源配置了mysqlHive,默認(rèn)情況下是使用主庫master數(shù)據(jù)源,是訪問mysql的,使用的時候只需要在mapper層進(jìn)行切換即可。

          代碼實(shí)現(xiàn)跟其它程序一樣,都是mapper、service、controller層,套路一模一樣。一共設(shè)置了實(shí)時和離線兩個yarn資源隊(duì)列,由于其它部門人使用可能存在隊(duì)列壓力過大的情況,需要對數(shù)據(jù)量按照每次查詢的數(shù)據(jù)范圍不超過60天來限制,和此時集群使用資源不能大于55%,這里重點(diǎn)說明一下controller層對數(shù)據(jù)量的預(yù)防。

          實(shí)體類UserModel

          @NoArgsConstructor
          @AllArgsConstructor
          @Data
          @ToString
          public?class?UserModel?extends?BaseEntity{

          ????private?String?userId;
          ????private?Integer?count;
          }

          3) ?集群資源使用率不大于55%

          因?yàn)楹芏鄻I(yè)務(wù)查詢邏輯controller都要用到數(shù)據(jù)量防御過大的問題,這里使用了被Spring切面關(guān)聯(lián)的注解來標(biāo)識controller。

          定義切面YarnResourceAspect,并且關(guān)聯(lián)注解@YarnResource

          @Target({ElementType.TYPE,?ElementType.METHOD})
          @Retention(RetentionPolicy.RUNTIME)
          public?@interface?YarnResource?{

          }

          @Aspect
          @Component
          public?class?YarnResourceAspect?{

          ????private?static?final?Logger?log?=?LoggerFactory.getLogger(YarnResourceAspect.class);

          ????/**
          ?????*?配置切入點(diǎn)
          ?????*/

          ????@Pointcut("@annotation(com.ruoyi.common.annotation.YarnResource)")
          ????public?void?yarnResourcdPointCut(){
          ????}

          ????/**
          ?????*?檢查yarn的資源是否可用
          ?????*/

          ????@Before("yarnResourcdPointCut()")
          ????public?void?before(){
          ????????log.info("************************************檢查yarn的資源是否可用*******************************");
          ????????//?yarn資源緊張
          ????????if(!YarnClient.yarnResourceOk()){
          ????????????throw?new?InvalidStatusException();
          ????????}
          ????}

          }

          4) 獲取yarn的資源使用數(shù)據(jù):

          因?yàn)樘峤蝗蝿?wù)的時間是不定的,我們需要根據(jù)用戶提交時候的yarn資源狀態(tài)來判斷當(dāng)前是否能執(zhí)行Hive查詢,以免影響線上任務(wù)。

          @Slf4j
          public?class?YarnClient?{

          ????/**
          ?????*?yarn資源不能超過多少
          ?????*/

          ????private?static?final?int?YARN_RESOURCE?=?55;

          ????/**
          ?????*
          ?????*?@return?true?:?表示資源正常,?false:?資源緊張
          ?????*/

          ????public?static?boolean?yarnResourceOk()?{
          ????????try?{
          ????????????URL?url?=?new?URL("http://master:8088/cluster/scheduler");
          ????????????HttpURLConnection?conn?=?null;
          ????????????conn?=?(HttpURLConnection)?url.openConnection();
          ????????????conn.setRequestMethod("GET");
          ????????????conn.setUseCaches(false);
          ????????????//?請求超時5秒
          ????????????conn.setConnectTimeout(5000);
          ????????????//?設(shè)置HTTP頭:
          ????????????conn.setRequestProperty("Accept",?"*/*");
          ????????????conn.setRequestProperty("User-Agent",?"Mozilla/5.0?(Windows?NT?6.1;?Win64;?x64)?AppleWebKit/537.36?(KHTML,?like?Gecko)?Chrome/86.0.4240.111?Safari/537.36");
          ????????????//?連接并發(fā)送HTTP請求:
          ????????????conn.connect();

          ????????????//?判斷HTTP響應(yīng)是否200:
          ????????????if?(conn.getResponseCode()?!=?200)?{
          ????????????????throw?new?RuntimeException("bad?response");
          ????????????}
          ????????????//?獲取所有響應(yīng)Header:
          ????????????Map>?map?=?conn.getHeaderFields();
          ????????????for?(String?key?:?map.keySet())?{
          ????????????????System.out.println(key?+?":?"?+?map.get(key));
          ????????????}
          ????????????//?獲取響應(yīng)內(nèi)容:
          ????????????InputStream?input?=?conn.getInputStream();
          ????????????byte[]?datas?=?null;

          ????????????try?{
          ????????????????//?從輸入流中讀取數(shù)據(jù)
          ????????????????datas?=?readInputStream(input);
          ????????????}?catch?(Exception?e)?{
          ????????????????e.printStackTrace();
          ????????????}
          ????????????String?result?=?new?String(datas,?"UTF-8");//?將二進(jìn)制流轉(zhuǎn)為String

          ????????????Document?document?=?Jsoup.parse(result);

          ????????????Elements?elements?=?document.getElementsByClass("qstats");

          ????????????String[]?ratios?=?elements.text().split("used");

          ????????????return?Double.valueOf(ratios[3].replace("%",?""))?????????}?catch?(IOException?e)?{
          ????????????log.error("yarn資源獲取失敗");
          ????????}

          ????????return?false;

          ????}

          ????private?static?byte[]?readInputStream(InputStream?inStream)?throws?Exception?{
          ????????ByteArrayOutputStream?outStream?=?new?ByteArrayOutputStream();
          ????????byte[]?buffer?=?new?byte[1024];
          ????????int?len?=?0;
          ????????while?((len?=?inStream.read(buffer))?!=?-1)?{
          ????????????outStream.write(buffer,?0,?len);
          ????????}
          ????????byte[]?data?=?outStream.toByteArray();
          ????????outStream.close();
          ????????inStream.close();
          ????????return?data;
          ????}
          }

          5) controller上通過注解@YarnResource標(biāo)識:

          @Controller
          @RequestMapping("/hero/hive")
          public?class?HiveController?{

          ????/**
          ?????*?html?文件地址前綴
          ?????*/

          ????private?String?prefix?=?"hero";

          ????@Autowired
          ????IUserService?iUserService;

          ????@RequestMapping("")
          ????@RequiresPermissions("hero:hive:view")
          ????public?String?heroHive(){
          ????????return?prefix?+?"/hive";
          ????}

          ????@YarnResource
          ????@RequestMapping("/user")
          ????@RequiresPermissions("hero:hive:user")
          ????@ResponseBody
          ????public?TableDataInfo?user(UserModel?userModel){
          ????????DateCheckUtils.checkInputDate(userModel);

          ????????PageInfo?pageInfo?=?iUserService.queryUser(userModel);
          ????????TableDataInfo?tableDataInfo?=?new?TableDataInfo();

          ????????tableDataInfo.setTotal(pageInfo.getTotal());
          ????????tableDataInfo.setRows(pageInfo.getList());

          ????????return?tableDataInfo;
          ????}
          }

          6) 查詢數(shù)據(jù)跨度不超過60天檢查

          這樣每次請求進(jìn)入controller的時候就會自動檢查查詢的日期是否超過60天了,防止載入數(shù)據(jù)過多,引發(fā)其它任務(wù)資源不夠。

          public?class?DateCheckUtils?{

          ????/**
          ?????*?對前臺傳入過來的日期進(jìn)行判斷,防止查詢大量數(shù)據(jù),造成集群負(fù)載過大
          ?????*?@param?o
          ?????*/

          ????public?static?void?checkInputDate(BaseEntity?o){
          ????????if("".equals(o.getParams().get("beginTime"))?&&?"".equals(o.getParams().get("endTime"))){
          ????????????throw?new?InvalidTaskException();
          ????????}

          ????????String?beginTime?=?"2019-01-01";
          ????????String?endTime?=?DateUtils.getDate();

          ????????if(!"".equals(o.getParams().get("beginTime"))){
          ????????????beginTime?=?String.valueOf(o.getParams().get("beginTime"));
          ????????}

          ????????if(!"".equals(o.getParams().get("endTime"))){
          ????????????endTime?=?String.valueOf(o.getParams().get("endTime"));
          ????????}

          ????????//?查詢數(shù)據(jù)時間跨度大于兩個月
          ????????if(DateUtils.getDayBetween(beginTime,?endTime)?>?60){
          ????????????throw?new?InvalidTaskException();
          ????????}
          ????}
          }

          這里訪問hive肯定需要切換數(shù)據(jù)源的,因?yàn)槠渌撁孢€有對mysql的數(shù)據(jù)訪問,需要注意一下。

          7) 每次查詢結(jié)果都會入mysql

          前面有人查詢過了,會將數(shù)據(jù)保持到mysql,再返回到頁面,后面另外部門第二個人查詢時候,先從mysql取數(shù)據(jù),如果沒有,就從Hive里面查詢。下面這部分代碼也是controller里面的,這里單獨(dú)拎出來了。

          //?首先從mysql查,沒有再從hive查,mysql相當(dāng)于一個緩存介質(zhì)
          PageInfo?pageInfo?=?iToplocationService.queryToplocation(toplocationCountModel);
          if(pageInfo.getList().size()?>?0){
          ????log.info("數(shù)據(jù)exists,?直接從mysql獲取...");
          ????tableDataInfo.setTotal(pageInfo.getTotal());
          ????tableDataInfo.setRows(pageInfo.getList());
          }else?if(iToplocationService.queryExistsToplocation(toplocationCountModel)?==?null){
          ????log.info("從hive中查詢數(shù)據(jù)...");
          ????PageInfo?pageInfo2?=?iToplocationService.query(toplocationCountModel);

          ????//?保存到mysql
          ????log.info("批量保存到mysql...");
          ????List?toplocationCountModels?=?pageInfo2.getList();
          ????int?i?=?0;
          ????while?(i?????????if(toplocationCountModels.size()?-?i?>?10000){
          ????????????iToplocationService.insertToplocation(toplocationCountModels.subList(i,?i?+?10000));
          ????????}else{
          ????????????iToplocationService.insertToplocation(toplocationCountModels.subList(i,?toplocationCountModels.size()));
          ????????}

          ????????i?=?i?+?10000;
          ????}

          目前功能看起來很簡單,沒有用到什么高大上的東西,后面慢慢完善。

          瀏覽 43
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  高潮13p | 欧美A级黄色网址 | 国产一级a毛一级a看免费人娇 | 色国产精品视频 | 午夜精品一区二区三区在线视频99 |