<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Java封裝 bigQuery 查詢工具類,有用

          共 10255字,需瀏覽 21分鐘

           ·

          2022-03-06 00:43

          點擊上方“程序員大白”,選擇“星標”公眾號

          重磅干貨,第一時間送達

          緣起

          最近在公司基于bigQuery開發(fā)埋點數據分析功能,所以總結一下自己封裝的bigQuery查詢工具類(網上關于bigQuery的文章比較少)

          關于bigQuery的概念功能可以參考:

          https://cloud.google.com/bigquery/docs?hl=zh-CN

          在示例那包含了很多操作

          示例中一段查詢代碼

          import?com.google.cloud.bigquery.BigQuery;
          import?com.google.cloud.bigquery.BigQueryOptions;
          import?com.google.cloud.bigquery.FieldValueList;
          import?com.google.cloud.bigquery.Job;
          import?com.google.cloud.bigquery.JobId;
          import?com.google.cloud.bigquery.JobInfo;
          import?com.google.cloud.bigquery.QueryJobConfiguration;
          import?com.google.cloud.bigquery.TableResult;
          import?java.util.UUID;

          public?class?SimpleApp?{
          ??public?static?void?main(String...?args)?throws?Exception?{
          ????BigQuery?bigquery?=?BigQueryOptions.getDefaultInstance().getService();
          ????QueryJobConfiguration?queryConfig?=
          ????????QueryJobConfiguration.newBuilder(
          ????????????????"SELECT?commit,?author,?repo_name?"
          ????????????????????+?"FROM?`bigquery-public-data.github_repos.commits`?"
          ????????????????????+?"WHERE?subject?like?'%bigquery%'?"
          ????????????????????+?"ORDER?BY?subject?DESC?LIMIT?10")
          ????????????//?Use?standard?SQL?syntax?for?queries.
          ????????????//?See:?https://cloud.google.com/bigquery/sql-reference/
          ????????????.setUseLegacySql(false)
          ????????????.build();

          ????//?Create?a?job?ID?so?that?we?can?safely?retry.
          ????JobId?jobId?=?JobId.of(UUID.randomUUID().toString());
          ????Job?queryJob?=?bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

          ????//?Wait?for?the?query?to?complete.
          ????queryJob?=?queryJob.waitFor();

          ????//?Check?for?errors
          ????if?(queryJob?==?null)?{
          ??????throw?new?RuntimeException("Job?no?longer?exists");
          ????}?else?if?(queryJob.getStatus().getError()?!=?null)?{
          ??????//?You?can?also?look?at?queryJob.getStatus().getExecutionErrors()?for?all
          ??????//?errors,?not?just?the?latest?one.
          ??????throw?new?RuntimeException(queryJob.getStatus().getError().toString());
          ????}

          ????//?Get?the?results.
          ????TableResult?result?=?queryJob.getQueryResults();

          ????//?Print?all?pages?of?the?results.
          ????for?(FieldValueList?row?:?result.iterateAll())?{
          ??????//?String?type
          ??????String?commit?=?row.get("commit").getStringValue();
          ??????//?Record?type
          ??????FieldValueList?author?=?row.get("author").getRecordValue();
          ??????String?name?=?author.get("name").getStringValue();
          ??????String?email?=?author.get("email").getStringValue();
          ??????//?String?Repeated?type
          ??????String?repoName?=?row.get("repo_name").getRecordValue().get(0).getStringValue();
          ??????System.out.printf(
          ??????????"Repo?name:?%s?Author?name:?%s?email:?%s?commit:?%s\n",?repoName,?name,?email,?commit);
          ????}
          ??}
          }

          以上這段查詢的代碼,給我的感覺就是步驟都是類似的,都是創(chuàng)建一個JOB,等待查詢,處理查詢的結果集。如果我每寫一個查詢都要寫這一大坨,那簡直要惡心死。

          其實以上的查詢代碼除了SQL和結果集的處理不一樣,其他的都是一樣,基于這一點來封裝一個 bigQuery 查詢工具類,想要達到的效果就是:我給你一段SQL,你給我處理好的結果。

          Maven 依賴

          首先需要引入 Maven 依賴,當然這個在 bigQuery 官方示例上是有的

          <dependencyManagement>
          ??<dependencies>
          ????<dependency>
          ??????<groupId>com.google.cloudgroupId>
          ??????<artifactId>libraries-bomartifactId>
          ??????<version>24.0.0version>
          ??????<type>pomtype>
          ??????<scope>importscope>
          ????dependency>
          ??dependencies>
          dependencyManagement>

          <dependencies>
          ??<dependency>
          ????<groupId>com.google.cloudgroupId>
          ????<artifactId>google-cloud-bigqueryartifactId>
          ??dependency>

          設置身份驗證

          要運行客戶端庫,必須先設置身份驗證,也就是說需要你的服務帳號密鑰才能去連接和操作bigQuery,官方文檔中也說到這點,他提供的方案是:設置環(huán)境變量 GOOGLE_APPLICATION_CREDENTIALS 向應用代碼提供身份驗證憑據

          我覺得這種方式對于我們來說不太友好,我不能在服務器上設置環(huán)境變量,我們現在都是微服務,部署在k8s上,所以這種方案也不知道如何使用(會這種方式的小伙伴一定要告訴我怎么使用)

          所以我的做法是將 身份驗證憑據 JSON 文件加在項目 resource 里,通過流的方式讀取憑據。

          @Value(value?=?"classpath:netpop-e792a-data-analytics.json")
          private?Resource?dataAnalyticsResource;

          配置 BigQuery Bean

          由上面那段查詢案例可知,重要的一個 Bean 就是 BigQuery,所以把這個Bean 注冊到IOC容器中。

          @Configuration
          public?class?BigQueryConfiguration?{

          ????//?加載?身份驗證憑據
          ????@Value(value?=?"classpath:netpop-e792a-data-analytics.json")
          ????private?Resource?dataAnalyticsResource;

          ????//?配置核心Bean
          ????@Bean
          ????BigQuery?bigQuery()?throws?IOException?{
          ????????GoogleCredentials?credentials?=?GoogleCredentials.fromStream(dataAnalyticsResource.getInputStream());
          ????????BigQuery?bigquery?=?BigQueryOptions.newBuilder().setCredentials(credentials).build().getService();
          ????????return?bigquery;
          ????}

          ????//?將bigQuery?分裝工具類注冊到IOC容器中
          ????@Bean
          ????BigQueryHelper?bigQueryHelper(@Autowired?BigQuery?bigQuery)?{
          ????????return?new?BigQueryHelper(bigQuery);
          ????}

          }

          工具類


          package?groot.data.analysis.support;

          import?com.google.cloud.bigquery.*;
          import?lombok.extern.slf4j.Slf4j;
          import?org.springframework.util.Assert;
          import?org.springframework.util.ReflectionUtils;

          import?java.lang.reflect.InvocationTargetException;
          import?java.math.BigDecimal;
          import?java.util.*;

          /**
          ?*?@Classname?BigQueryHelper
          ?*?@Description
          ?*?@Created?by?wangchangjiu
          ?*/

          @Slf4j
          public?class?BigQueryHelper?{

          ????private?BigQuery?bigQuery;

          ????public?BigQueryHelper()?{
          ????}

          ????public?BigQueryHelper(BigQuery?bigQuery)?{
          ????????this.bigQuery?=?bigQuery;
          ????}


          ????/**
          ?????*?獲取列表?返回類型的字段不支持復雜類型
          ?????*
          ?????*?@param?sql
          ?????*?@param?returnType
          ?????*?@param?
          ?????*?@return
          ?????*?@throws?InterruptedException
          ?????*/

          ????public??List?queryForList(String?sql,?Class?returnType)?throws?InterruptedException?{
          ????????TableResult?result?=?execute(sql);
          ????????Map?fieldMap?=?getStringFieldMap(result);
          ????????List?results?=?new?ArrayList<>();
          ????????result.iterateAll().forEach(row?->?{
          ????????????T?returnObj;
          ????????????try?{
          ????????????????returnObj?=?returnType.getDeclaredConstructor().newInstance();
          ????????????}?catch?(InstantiationException?|?IllegalAccessException?|?InvocationTargetException?|?NoSuchMethodException?ex)?{
          ????????????????throw?new?RuntimeException("reflect?create?object?error?:",?ex);
          ????????????}
          ????????????ReflectionUtils.doWithFields(returnType,?field?->?{
          ????????????????Field?bigQueryField?=?fieldMap.get(field.getName());
          ????????????????if?(bigQueryField?!=?null)?{
          ????????????????????FieldValue?fieldValue?=?row.get(bigQueryField.getName());
          ????????????????????if?(bigQueryField.getType().getStandardType()?==?StandardSQLTypeName.STRUCT)?{
          ????????????????????????throw?new?UnsupportedOperationException("unsupported?returnType?field?include?complex?types");
          ????????????????????}
          ????????????????????field.setAccessible(true);
          ????????????????????ReflectionUtils.setField(field,?returnObj,?resultWrapper(fieldValue,?field.getType()));
          ????????????????}
          ????????????});
          ????????????results.add(returnObj);
          ????????});
          ????????return?results;
          ????}

          ????/**
          ?????*??字段名和字段映射
          ?????*?@param?result
          ?????*?@return
          ?????*/

          ????private?Map?getStringFieldMap(TableResult?result)?{
          ????????FieldList?fieldList?=?result.getSchema().getFields();
          ????????Map?fieldMap?=?new?HashMap<>(fieldList.size());
          ????????for?(int?i?=?0;?i?????????????Field?field?=?fieldList.get(i);
          ????????????fieldMap.put(field.getName(),?field);
          ????????}
          ????????return?fieldMap;
          ????}

          ????/**
          ?????*??執(zhí)行SQL?獲取結果集
          ?????*?@param?sql
          ?????*?@return
          ?????*?@throws?InterruptedException
          ?????*/

          ????private?TableResult?execute(String?sql)?throws?InterruptedException?{
          ????????Assert.notNull(sql,?"SQL?must?not?be?null");
          ????????QueryJobConfiguration?queryConfig?=?QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false).build();

          ????????//?Create?a?job?ID?so?that?we?can?safely?retry.
          ????????JobId?jobId?=?JobId.of(UUID.randomUUID().toString());
          ????????Job?queryJob?=?bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

          ????????//?Wait?for?the?query?to?complete.
          ????????queryJob?=?queryJob.waitFor();

          ????????if?(queryJob?==?null)?{
          ????????????throw?new?RuntimeException("Job?no?longer?exists");
          ????????}?else?if?(queryJob.getStatus().getError()?!=?null)?{
          ????????????throw?new?RuntimeException(queryJob.getStatus().getError().toString());
          ????????}
          ????????//?Get?the?results.
          ????????return?queryJob.getQueryResults();
          ????}


          ????/**
          ?????*??查詢列表,實現?ResultSetExtractor?接口?自定義提取數據
          ?????*?@param?sql
          ?????*?@param?rse
          ?????*?@param?
          ?????*?@return
          ?????*?@throws?InterruptedException
          ?????*/

          ????public??List?queryForList(String?sql,?ResultSetExtractor?rse)?throws?InterruptedException?{
          ????????TableResult?tableResult?=?execute(sql);
          ????????List?results?=?new?ArrayList<>();
          ????????tableResult.iterateAll().forEach(row?->?results.add(rse.extractData(row)));
          ????????return?results;
          ????}

          ????/**
          ?????*??查詢返回單個結果集
          ?????*?@param?sql
          ?????*?@param?returnType
          ?????*?@param?
          ?????*?@return
          ?????*?@throws?InterruptedException
          ?????*/

          ????public??T?queryForSingleResult(String?sql,?Class?returnType)?throws?InterruptedException?{
          ????????TableResult?tableResult?=?execute(sql);
          ????????if?(tableResult.iterateAll().iterator().hasNext())?{
          ????????????//?只有一行
          ????????????FieldValueList?fieldValues?=?tableResult.iterateAll().iterator().next();
          ????????????if?(isBasicType(returnType))?{
          ????????????????return?(T)?resultWrapper(fieldValues.get(0),?returnType);
          ????????????}?else?{
          ????????????????T?returnObj;
          ????????????????try?{
          ????????????????????returnObj?=?returnType.getDeclaredConstructor().newInstance();
          ????????????????}?catch?(InstantiationException?|?IllegalAccessException?|?InvocationTargetException?|?NoSuchMethodException?ex)?{
          ????????????????????throw?new?RuntimeException("reflect?create?object?error?:",?ex);
          ????????????????}

          ????????????????Map?fieldMap?=?getStringFieldMap(tableResult);
          ????????????????ReflectionUtils.doWithFields(returnType,?field?->?{
          ????????????????????Field?bigQueryField?=?fieldMap.get(field.getName());
          ????????????????????if?(bigQueryField?!=?null)?{
          ????????????????????????FieldValue?fieldValue?=?fieldValues.get(bigQueryField.getName());
          ????????????????????????if?(bigQueryField.getType().getStandardType()?==?StandardSQLTypeName.STRUCT)?{
          ????????????????????????????throw?new?UnsupportedOperationException("unsupported?returnType?field?include?complex?types");
          ????????????????????????}
          ????????????????????????field.setAccessible(true);
          ????????????????????????ReflectionUtils.setField(field,?returnObj,?resultWrapper(fieldValue,?field.getType()));
          ????????????????????}
          ????????????????});
          ????????????????return?returnObj;
          ????????????}
          ????????}
          ????????return?null;
          ????}

          ????/**
          ?????*??結果類型處理
          ?????*?@param?fieldValue
          ?????*?@param?returnType
          ?????*?@return
          ?????*/

          ????private?Object?resultWrapper(FieldValue?fieldValue,?Class?returnType)?{
          ????????if?(returnType?==?Boolean.class?||?returnType?==?boolean.class)?{
          ????????????return?fieldValue.getBooleanValue();
          ????????}?else?if?(returnType?==?Long.class?||?returnType?==?long.class)?{
          ????????????return?fieldValue.getLongValue();
          ????????}?else?if?(returnType?==?Double.class?||?returnType?==?double.class)?{
          ????????????return?fieldValue.getDoubleValue();
          ????????}?else?if?(returnType?==?BigDecimal.class)?{
          ????????????return?fieldValue.getNumericValue();
          ????????}?else?if?(returnType?==?String.class)?{
          ????????????return?fieldValue.getStringValue();
          ????????}
          ????????return?fieldValue.getValue();
          ????}

          ????/**
          ?????*??判斷是否是簡單類型
          ?????*?@param?returnType
          ?????*?@param?
          ?????*?@return
          ?????*/

          ????private??boolean?isBasicType(Class?returnType)?{
          ????????return?returnType?==?String.class?||?returnType.isPrimitive()
          ????????????????||?returnType?
          ==?Boolean.class?||?returnType?==?Byte.class
          ????????????????||?returnType?
          ==?Integer.class?||?returnType?==?Long.class
          ????????????????||?returnType?
          ==?Double.class?||?returnType?==?Short.class
          ????????????????||?returnType?
          ==?Float.class?||?returnType?==?BigDecimal.class;
          ????}

          }

          這里對外主要提供了

          //?獲取列表?返回類型的字段不支持復雜類型
          public??List?queryForList(String?sql,?Class?returnType)?throws?InterruptedException?

          //?查詢列表,實現?ResultSetExtractor?接口?自定義提取數據
          public??List?queryForList(String?sql,?ResultSetExtractor?rse)?throws?InterruptedException

          //?查詢返回單個結果集
          public??T?queryForSingleResult(String?sql,?Class?returnType)?throws?InterruptedException?

          我這里主要的思想就是利用反射創(chuàng)建目標對象,將字段賦值進去

          當然這里不支持返回類型是對象嵌套對象的形式,原因是那種比較復雜,而且我現在這里也沒有這種場景。

          還有就是這里沒有支持分頁等其他操作

          使用工具類

          使用的話就是注入 bigQueryHelper 工具類

          以上就是簡單的一個bigQuery 分裝類,當然還可以進一步優(yōu)化封裝的更全一點。

          來源:juejin.cn/post/7036337598025072653


          13個你一定要知道的PyTorch特性

          解讀:為什么要做特征歸一化/標準化?

          一文搞懂 PyTorch 內部機制

          張一鳴:每個逆襲的年輕人,都具備的底層能力




          ,,西,,,[]!


          瀏覽 65
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  影音先锋啪 | 一区二区三四区五区视频 | 中文字幕手机在线视频 | www.在线看黄 | 色久在线 |