Java封裝 bigQuery 查詢工具類,有用
點擊上方“程序員大白”,選擇“星標”公眾號
重磅干貨,第一時間送達
緣起
最近在公司基于bigQuery開發(fā)埋點數據分析功能,所以總結一下自己封裝的bigQuery查詢工具類(網上關于bigQuery的文章比較少)
關于bigQuery的概念功能可以參考:
https://cloud.google.com/bigquery/docs?hl=zh-CN
在示例那包含了很多操作
示例中一段查詢代碼
import?com.google.cloud.bigquery.BigQuery;
import?com.google.cloud.bigquery.BigQueryOptions;
import?com.google.cloud.bigquery.FieldValueList;
import?com.google.cloud.bigquery.Job;
import?com.google.cloud.bigquery.JobId;
import?com.google.cloud.bigquery.JobInfo;
import?com.google.cloud.bigquery.QueryJobConfiguration;
import?com.google.cloud.bigquery.TableResult;
import?java.util.UUID;
public?class?SimpleApp?{
??public?static?void?main(String...?args)?throws?Exception?{
????BigQuery?bigquery?=?BigQueryOptions.getDefaultInstance().getService();
????QueryJobConfiguration?queryConfig?=
????????QueryJobConfiguration.newBuilder(
????????????????"SELECT?commit,?author,?repo_name?"
????????????????????+?"FROM?`bigquery-public-data.github_repos.commits`?"
????????????????????+?"WHERE?subject?like?'%bigquery%'?"
????????????????????+?"ORDER?BY?subject?DESC?LIMIT?10")
????????????//?Use?standard?SQL?syntax?for?queries.
????????????//?See:?https://cloud.google.com/bigquery/sql-reference/
????????????.setUseLegacySql(false)
????????????.build();
????//?Create?a?job?ID?so?that?we?can?safely?retry.
????JobId?jobId?=?JobId.of(UUID.randomUUID().toString());
????Job?queryJob?=?bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
????//?Wait?for?the?query?to?complete.
????queryJob?=?queryJob.waitFor();
????//?Check?for?errors
????if?(queryJob?==?null)?{
??????throw?new?RuntimeException("Job?no?longer?exists");
????}?else?if?(queryJob.getStatus().getError()?!=?null)?{
??????//?You?can?also?look?at?queryJob.getStatus().getExecutionErrors()?for?all
??????//?errors,?not?just?the?latest?one.
??????throw?new?RuntimeException(queryJob.getStatus().getError().toString());
????}
????//?Get?the?results.
????TableResult?result?=?queryJob.getQueryResults();
????//?Print?all?pages?of?the?results.
????for?(FieldValueList?row?:?result.iterateAll())?{
??????//?String?type
??????String?commit?=?row.get("commit").getStringValue();
??????//?Record?type
??????FieldValueList?author?=?row.get("author").getRecordValue();
??????String?name?=?author.get("name").getStringValue();
??????String?email?=?author.get("email").getStringValue();
??????//?String?Repeated?type
??????String?repoName?=?row.get("repo_name").getRecordValue().get(0).getStringValue();
??????System.out.printf(
??????????"Repo?name:?%s?Author?name:?%s?email:?%s?commit:?%s\n",?repoName,?name,?email,?commit);
????}
??}
}
以上這段查詢的代碼,給我的感覺就是步驟都是類似的,都是創(chuàng)建一個JOB,等待查詢,處理查詢的結果集。如果我每寫一個查詢都要寫這一大坨,那簡直要惡心死。
其實以上的查詢代碼除了SQL和結果集的處理不一樣,其他的都是一樣,基于這一點來封裝一個 bigQuery 查詢工具類,想要達到的效果就是:我給你一段SQL,你給我處理好的結果。
Maven 依賴
首先需要引入 Maven 依賴,當然這個在 bigQuery 官方示例上是有的
<dependencyManagement>
??<dependencies>
????<dependency>
??????<groupId>com.google.cloudgroupId>
??????<artifactId>libraries-bomartifactId>
??????<version>24.0.0version>
??????<type>pomtype>
??????<scope>importscope>
????dependency>
??dependencies>
dependencyManagement>
<dependencies>
??<dependency>
????<groupId>com.google.cloudgroupId>
????<artifactId>google-cloud-bigqueryartifactId>
??dependency>
設置身份驗證
要運行客戶端庫,必須先設置身份驗證,也就是說需要你的服務帳號密鑰才能去連接和操作bigQuery,官方文檔中也說到這點,他提供的方案是:設置環(huán)境變量 GOOGLE_APPLICATION_CREDENTIALS 向應用代碼提供身份驗證憑據

我覺得這種方式對于我們來說不太友好,我不能在服務器上設置環(huán)境變量,我們現在都是微服務,部署在k8s上,所以這種方案也不知道如何使用(會這種方式的小伙伴一定要告訴我怎么使用)
所以我的做法是將 身份驗證憑據 JSON 文件加在項目 resource 里,通過流的方式讀取憑據。
@Value(value?=?"classpath:netpop-e792a-data-analytics.json")
private?Resource?dataAnalyticsResource;

配置 BigQuery Bean
由上面那段查詢案例可知,重要的一個 Bean 就是 BigQuery,所以把這個Bean 注冊到IOC容器中。
@Configuration
public?class?BigQueryConfiguration?{
????//?加載?身份驗證憑據
????@Value(value?=?"classpath:netpop-e792a-data-analytics.json")
????private?Resource?dataAnalyticsResource;
????//?配置核心Bean
????@Bean
????BigQuery?bigQuery()?throws?IOException?{
????????GoogleCredentials?credentials?=?GoogleCredentials.fromStream(dataAnalyticsResource.getInputStream());
????????BigQuery?bigquery?=?BigQueryOptions.newBuilder().setCredentials(credentials).build().getService();
????????return?bigquery;
????}
????//?將bigQuery?分裝工具類注冊到IOC容器中
????@Bean
????BigQueryHelper?bigQueryHelper(@Autowired?BigQuery?bigQuery)?{
????????return?new?BigQueryHelper(bigQuery);
????}
}
工具類
package?groot.data.analysis.support;
import?com.google.cloud.bigquery.*;
import?lombok.extern.slf4j.Slf4j;
import?org.springframework.util.Assert;
import?org.springframework.util.ReflectionUtils;
import?java.lang.reflect.InvocationTargetException;
import?java.math.BigDecimal;
import?java.util.*;
/**
?*?@Classname?BigQueryHelper
?*?@Description
?*?@Created?by?wangchangjiu
?*/
@Slf4j
public?class?BigQueryHelper?{
????private?BigQuery?bigQuery;
????public?BigQueryHelper()?{
????}
????public?BigQueryHelper(BigQuery?bigQuery)?{
????????this.bigQuery?=?bigQuery;
????}
????/**
?????*?獲取列表?返回類型的字段不支持復雜類型
?????*
?????*?@param?sql
?????*?@param?returnType
?????*?@param?
?????*?@return
?????*?@throws?InterruptedException
?????*/
????public??List?queryForList(String?sql,?Class?returnType) ?throws?InterruptedException? {
????????TableResult?result?=?execute(sql);
????????Map?fieldMap?=?getStringFieldMap(result);
????????List?results?=?new?ArrayList<>();
????????result.iterateAll().forEach(row?->?{
????????????T?returnObj;
????????????try?{
????????????????returnObj?=?returnType.getDeclaredConstructor().newInstance();
????????????}?catch?(InstantiationException?|?IllegalAccessException?|?InvocationTargetException?|?NoSuchMethodException?ex)?{
????????????????throw?new?RuntimeException("reflect?create?object?error?:",?ex);
????????????}
????????????ReflectionUtils.doWithFields(returnType,?field?->?{
????????????????Field?bigQueryField?=?fieldMap.get(field.getName());
????????????????if?(bigQueryField?!=?null)?{
????????????????????FieldValue?fieldValue?=?row.get(bigQueryField.getName());
????????????????????if?(bigQueryField.getType().getStandardType()?==?StandardSQLTypeName.STRUCT)?{
????????????????????????throw?new?UnsupportedOperationException("unsupported?returnType?field?include?complex?types");
????????????????????}
????????????????????field.setAccessible(true);
????????????????????ReflectionUtils.setField(field,?returnObj,?resultWrapper(fieldValue,?field.getType()));
????????????????}
????????????});
????????????results.add(returnObj);
????????});
????????return?results;
????}
????/**
?????*??字段名和字段映射
?????*?@param?result
?????*?@return
?????*/
????private?Map?getStringFieldMap(TableResult?result)? {
????????FieldList?fieldList?=?result.getSchema().getFields();
????????Map?fieldMap?=?new?HashMap<>(fieldList.size());
????????for?(int?i?=?0;?i?????????????Field?field?=?fieldList.get(i);
????????????fieldMap.put(field.getName(),?field);
????????}
????????return?fieldMap;
????}
????/**
?????*??執(zhí)行SQL?獲取結果集
?????*?@param?sql
?????*?@return
?????*?@throws?InterruptedException
?????*/
????private?TableResult?execute(String?sql)?throws?InterruptedException?{
????????Assert.notNull(sql,?"SQL?must?not?be?null");
????????QueryJobConfiguration?queryConfig?=?QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false).build();
????????//?Create?a?job?ID?so?that?we?can?safely?retry.
????????JobId?jobId?=?JobId.of(UUID.randomUUID().toString());
????????Job?queryJob?=?bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
????????//?Wait?for?the?query?to?complete.
????????queryJob?=?queryJob.waitFor();
????????if?(queryJob?==?null)?{
????????????throw?new?RuntimeException("Job?no?longer?exists");
????????}?else?if?(queryJob.getStatus().getError()?!=?null)?{
????????????throw?new?RuntimeException(queryJob.getStatus().getError().toString());
????????}
????????//?Get?the?results.
????????return?queryJob.getQueryResults();
????}
????/**
?????*??查詢列表,實現?ResultSetExtractor?接口?自定義提取數據
?????*?@param?sql
?????*?@param?rse
?????*?@param?
?????*?@return
?????*?@throws?InterruptedException
?????*/
????public??List?queryForList(String?sql,?ResultSetExtractor?rse) ?throws?InterruptedException? {
????????TableResult?tableResult?=?execute(sql);
????????List?results?=?new?ArrayList<>();
????????tableResult.iterateAll().forEach(row?->?results.add(rse.extractData(row)));
????????return?results;
????}
????/**
?????*??查詢返回單個結果集
?????*?@param?sql
?????*?@param?returnType
?????*?@param?
?????*?@return
?????*?@throws?InterruptedException
?????*/
????public??T?queryForSingleResult(String?sql,?Class?returnType) ?throws?InterruptedException?{
????????TableResult?tableResult?=?execute(sql);
????????if?(tableResult.iterateAll().iterator().hasNext())?{
????????????//?只有一行
????????????FieldValueList?fieldValues?=?tableResult.iterateAll().iterator().next();
????????????if?(isBasicType(returnType))?{
????????????????return?(T)?resultWrapper(fieldValues.get(0),?returnType);
????????????}?else?{
????????????????T?returnObj;
????????????????try?{
????????????????????returnObj?=?returnType.getDeclaredConstructor().newInstance();
????????????????}?catch?(InstantiationException?|?IllegalAccessException?|?InvocationTargetException?|?NoSuchMethodException?ex)?{
????????????????????throw?new?RuntimeException("reflect?create?object?error?:",?ex);
????????????????}
????????????????Map?fieldMap?=?getStringFieldMap(tableResult);
????????????????ReflectionUtils.doWithFields(returnType,?field?->?{
????????????????????Field?bigQueryField?=?fieldMap.get(field.getName());
????????????????????if?(bigQueryField?!=?null)?{
????????????????????????FieldValue?fieldValue?=?fieldValues.get(bigQueryField.getName());
????????????????????????if?(bigQueryField.getType().getStandardType()?==?StandardSQLTypeName.STRUCT)?{
????????????????????????????throw?new?UnsupportedOperationException("unsupported?returnType?field?include?complex?types");
????????????????????????}
????????????????????????field.setAccessible(true);
????????????????????????ReflectionUtils.setField(field,?returnObj,?resultWrapper(fieldValue,?field.getType()));
????????????????????}
????????????????});
????????????????return?returnObj;
????????????}
????????}
????????return?null;
????}
????/**
?????*??結果類型處理
?????*?@param?fieldValue
?????*?@param?returnType
?????*?@return
?????*/
????private?Object?resultWrapper(FieldValue?fieldValue,?Class?returnType)?{
????????if?(returnType?==?Boolean.class?||?returnType?==?boolean.class)?{
????????????return?fieldValue.getBooleanValue();
????????}?else?if?(returnType?==?Long.class?||?returnType?==?long.class)?{
????????????return?fieldValue.getLongValue();
????????}?else?if?(returnType?==?Double.class?||?returnType?==?double.class)?{
????????????return?fieldValue.getDoubleValue();
????????}?else?if?(returnType?==?BigDecimal.class)?{
????????????return?fieldValue.getNumericValue();
????????}?else?if?(returnType?==?String.class)?{
????????????return?fieldValue.getStringValue();
????????}
????????return?fieldValue.getValue();
????}
????/**
?????*??判斷是否是簡單類型
?????*?@param?returnType
?????*?@param?
?????*?@return
?????*/
????private??boolean?isBasicType(Class?returnType) ?{
????????return?returnType?==?String.class?||?returnType.isPrimitive()
????????????????||?returnType?==?Boolean.class?||?returnType?==?Byte.class
????????????????||?returnType?==?Integer.class?||?returnType?==?Long.class
????????????????||?returnType?==?Double.class?||?returnType?==?Short.class
????????????????||?returnType?==?Float.class?||?returnType?==?BigDecimal.class;
????}
}
這里對外主要提供了
//?獲取列表?返回類型的字段不支持復雜類型
public??List?queryForList(String?sql,?Class?returnType) ?throws?InterruptedException?
//?查詢列表,實現?ResultSetExtractor?接口?自定義提取數據
public??List?queryForList(String?sql,?ResultSetExtractor?rse) ?throws?InterruptedException
//?查詢返回單個結果集
public??T?queryForSingleResult(String?sql,?Class?returnType) ?throws?InterruptedException?
我這里主要的思想就是利用反射創(chuàng)建目標對象,將字段賦值進去
當然這里不支持返回類型是對象嵌套對象的形式,原因是那種比較復雜,而且我現在這里也沒有這種場景。
還有就是這里沒有支持分頁等其他操作
使用工具類
使用的話就是注入 bigQueryHelper 工具類


以上就是簡單的一個bigQuery 分裝類,當然還可以進一步優(yōu)化封裝的更全一點。
來源:juejin.cn/post/7036337598025072653
推薦閱讀
關于程序員大白
程序員大白是一群哈工大,東北大學,西湖大學和上海交通大學的碩士博士運營維護的號,大家樂于分享高質量文章,喜歡總結知識,歡迎關注[程序員大白],大家一起學習進步!


