Hive實(shí)戰(zhàn)UDF 外部依賴文件找不到的問題
目錄
關(guān)于外部依賴文件找不到的問題
為什么要使用外部依賴
為什么idea 里面可以運(yùn)行上線之后不行
依賴文件直接打包在jar 包里面不香嗎
學(xué)會獨(dú)立思考并且解決問題
總結(jié)
其實(shí)這篇文章的起源是,我司有數(shù)據(jù)清洗時將ip轉(zhuǎn)化為類似中國-湖北-武漢地區(qū)這種需求。由于ip服務(wù)商提供的Demo,只能在本地讀取,我需要將ip庫上傳到HDFS分布式存儲,每個計(jì)算節(jié)點(diǎn)再從HDFS下載到本地。
那么到底能不能直接從HDFS讀取呢?跟我強(qiáng)哥講了這件事后,不服輸?shù)乃迅蝺憾及竞诹?,終于給出了解決方案。
關(guān)于外部依賴文件找不到的問題

其實(shí)我在上一篇的總結(jié)中也說過了你需要確定的上傳的db 文件在那里,也就是你在hive 中調(diào)用add file之后 會出現(xiàn)添加后的文件路徑或者使用list 命令來看一下
今天我們不討論這個問題我們討論另外一個問題,外部依賴的問題,當(dāng)然這個問題的引入本來就很有意思,其實(shí)是一個很簡單的事情。
為什么要使用外部依賴
重點(diǎn)強(qiáng)調(diào)一下我們的外部依賴并不是單單指的是jar包依賴,我們的程序或者是UDF 依賴的一切外部文件都可以算作是外部依賴。
使用外部依賴的的原因是我們的程序可能需要一些外部的文件,或者是其他的一些信息,例如我們這里的UDF 中的IP 解析庫(DB 文件),或者是你需要在UDF 訪問一些網(wǎng)絡(luò)信息等等。
為什么idea 里面可以運(yùn)行上線之后不行
我們很多如人的一個誤區(qū)就是明明我在IDEA 里面都可以運(yùn)行為什么上線或者是打成jar 包之后就不行,其實(shí)你在idea 可以運(yùn)行之后不應(yīng)該直接上線的,或者說是不應(yīng)該直接創(chuàng)建UDF 的,而是先應(yīng)該測試一下jar 是否可以正常運(yùn)行,如果jar 都不能正常運(yùn)行那UDF 坑定就運(yùn)行報(bào)錯啊。
接下來我們就看一下為什么idea 可以運(yùn)行,但是jar 就不行,代碼我們就不全部粘貼了,只粘貼必要的,完整代碼可以看前面一篇文章
@Override
public?ObjectInspector?initialize(ObjectInspector[]?arguments)?throws?UDFArgumentException?{
????converter?=?ObjectInspectorConverters.getConverter(arguments[0],?PrimitiveObjectInspectorFactory.writableStringObjectInspector);
????
????String?dbPath?=?Ip2Region.class.getResource("/ip2region.db").getPath();
????File?file?=?new?File(dbPath);
????if?(file.exists()?==?false)?{
????????System.out.println("Error:?Invalid?ip2region.db?file");
????????return?null;
????}
????DbConfig?config?=?null;
????try?{
????????config?=?new?DbConfig();
????????searcher?=?new?DbSearcher(config,?dbPath);
????}?catch?(DbMakerConfigException?|?FileNotFoundException?e)?{
????????e.printStackTrace();
????}
????return?PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
這就是我們讀取外部配置文件的方法,我們接下來寫一個測試
@Test
public?void?ip2Region()?throws?HiveException?{
????Ip2Region?udf?=?new?Ip2Region();
????ObjectInspector?valueOI0?=?PrimitiveObjectInspectorFactory.javaStringObjectInspector;
????ObjectInspector[]?init_args?=?{valueOI0};
????udf.initialize(init_args);
????String?ip?=?"220.248.12.158";
????GenericUDF.DeferredObject?valueObj0?=?new?GenericUDF.DeferredJavaObject(ip);
????GenericUDF.DeferredObject[]?args?=?{valueObj0};
????Text?res?=?(Text)?udf.evaluate(args);
????System.out.println(res.toString());
}
我們發(fā)現(xiàn)是可以正常運(yùn)行的,這里我們把它打成jar 包再運(yùn)行一下,為了方便測試我們將這個測試方法改成main 方法,我們還是先在idea 里面運(yùn)行一下

我們發(fā)現(xiàn)還是可以正常運(yùn)行,我們接下來打個jar包試一下
Error:?Invalid?ip2region.db?file
java.io.FileNotFoundException:?file:?/Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar!/ip2region.db?(No?such?file?or?directory)
????????at?java.io.RandomAccessFile.open0(Native?Method)
????????at?java.io.RandomAccessFile.open(RandomAccessFile.java:316)
????????at?java.io.RandomAccessFile.(RandomAccessFile.java:243)
????????at?java.io.RandomAccessFile.(RandomAccessFile.java:124)
????????at?org.lionsoul.ip2region.DbSearcher.(DbSearcher.java:58)
????????at?com.kingcall.bigdata.HiveUDF.Ip2Region.main((Ip2Region.java:42)
Exception?in?thread?"main"?java.lang.NullPointerException
????????at?com.kingcall.bigdata.HiveUDF.Ip2Region.main(Ip2Region.java:48)
我們發(fā)現(xiàn)jar 包已經(jīng)報(bào)錯了,那你的UDF 肯定運(yùn)行不了了啊,其實(shí)如果你仔細(xì)看的話就知道為什么報(bào)錯了/Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar!/ip2region.db 其實(shí)就是這個路徑,我們很明顯看到這個路徑是不對的,所以這就是我們UDF報(bào)錯的原因
依賴文件直接打包在jar 包里面不香嗎
上面找到了這個問題,現(xiàn)在我們就看一下如何解決這個問題,出現(xiàn)這個問題的原因就是打包后的路徑不對,導(dǎo)致我們的不能找到這個依賴文件,那我們?yōu)槭惨@個路徑呢。這個主要是因?yàn)槲覀兪褂玫腁PI 的原因
DbConfig?config?=?new?DbConfig();
DbSearcher?searcher?=?new?DbSearcher(config,?dbPath);
也就是說我們的new DbSearcher(config, dbPath) 第二個參數(shù)傳的是DB 的路徑,所以我們很自然的想到看一下源碼是怎么使用這個路徑的,能不能傳一個其他特定的路徑進(jìn)去,其實(shí)我們從idea 里面可以運(yùn)行就知道,我們是可以傳入一個本地路徑的。
這里我們以memorySearch 方法作為入口
????//?構(gòu)造方法
????public?DbSearcher(DbConfig?dbConfig,?String?dbFile)?throws?FileNotFoundException?{
????????this.dbConfig?=?dbConfig;
????????this.raf?=?new?RandomAccessFile(dbFile,?"r");
????}
????//?構(gòu)造方法
????public?DbSearcher(DbConfig?dbConfig,?byte[]?dbBinStr)?{
????????this.dbConfig?=?dbConfig;
????????this.dbBinStr?=?dbBinStr;
????????this.firstIndexPtr?=?Util.getIntLong(dbBinStr,?0);
????????this.lastIndexPtr?=?Util.getIntLong(dbBinStr,?4);
????????this.totalIndexBlocks?=?(int)((this.lastIndexPtr?-?this.firstIndexPtr)?/?(long)IndexBlock.getIndexBlockLength())?+?1;
????}
??//?memorySearch?方法
????public?DataBlock?memorySearch(long?ip)?throws?IOException?{
????????int?blen?=?IndexBlock.getIndexBlockLength();
???????//?讀取文件到內(nèi)存數(shù)組
????????if?(this.dbBinStr?==?null)?{
????????????this.dbBinStr?=?new?byte[(int)this.raf.length()];
????????????this.raf.seek(0L);
????????????this.raf.readFully(this.dbBinStr,?0,?this.dbBinStr.length);
????????????this.firstIndexPtr?=?Util.getIntLong(this.dbBinStr,?0);
????????????this.lastIndexPtr?=?Util.getIntLong(this.dbBinStr,?4);
????????????this.totalIndexBlocks?=?(int)((this.lastIndexPtr?-?this.firstIndexPtr)?/?(long)blen)?+?1;
????????}
????????int?l?=?0;
????????int?h?=?this.totalIndexBlocks;
????????long?dataptr?=?0L;
????????int?m;
????????int?p;
????????while(l?<=?h)?{
????????????m?=?l?+?h?>>?1;
????????????p?=?(int)(this.firstIndexPtr?+?(long)(m?*?blen));
????????????long?sip?=?Util.getIntLong(this.dbBinStr,?p);
????????????if?(ip?????????????????h?=?m?-?1;
????????????}?else?{
????????????????long?eip?=?Util.getIntLong(this.dbBinStr,?p?+?4);
????????????????if?(ip?<=?eip)?{
????????????????????dataptr?=?Util.getIntLong(this.dbBinStr,?p?+?8);
????????????????????break;
????????????????}
????????????????l?=?m?+?1;
????????????}
????????}
????????if?(dataptr?==?0L)?{
????????????return?null;
????????}?else?{
????????????m?=?(int)(dataptr?>>?24?&?255L);
????????????p?=?(int)(dataptr?&?16777215L);
????????????int?city_id?=?(int)Util.getIntLong(this.dbBinStr,?p);
????????????String?region?=?new?String(this.dbBinStr,?p?+?4,?m?-?4,?"UTF-8");
????????????return?new?DataBlock(city_id,?region,?p);
????????}
????}
其實(shí)我們看到memorySearch 方法首先是讀取DB ?文件到內(nèi)存的字節(jié)數(shù)組然后使用,而且我們看到有這樣一個字節(jié)數(shù)組的構(gòu)造方法DbSearcher(DbConfig dbConfig, byte[] dbBinStr)
既然讀取文件不行,那我們能不能直接傳入字節(jié)數(shù)組呢?其實(shí)可以的
DbSearcher?searcher=null;
DbConfig?config?=?new?DbConfig();
try?{
????config?=?new?DbConfig();
}?catch?(DbMakerConfigException?e)?{
????e.printStackTrace();
}
InputStream?inputStream?=?Ip2Region.class.getResourceAsStream("/ip2region.db");
ByteArrayOutputStream?output?=?new?ByteArrayOutputStream();
byte[]?buffer?=?new?byte[4096];
int?n?=?0;
while?(-1?!=?(n?=?inputStream.read(buffer)))?{
????output.write(buffer,?0,?n);
}
byte[]?bytes?=?output.toByteArray();
searcher?=?new?DbSearcher(config,?bytes);
//?只能使用memorySearch?方法
DataBlock?block?=?searcher.memorySearch(ip);
//打印位置信息(格式:國家|大區(qū)|省份|城市|運(yùn)營商)
System.out.println(block.getRegion());
我們還是先在Idea 里面測試,我們發(fā)現(xiàn)是可以運(yùn)行的,然后我們還是打成jar包進(jìn)行測試,這次我們發(fā)現(xiàn)還是可以運(yùn)行中國|0|上海|上海市|聯(lián)通
也就是說我們已經(jīng)把這個問題解決了,有沒有什么問題呢?有那就是DB 文件在jar 包里面,不能單獨(dú)更新,前面我們將分詞的時候也水果,停用詞庫是隨著公司的業(yè)務(wù)發(fā)展需要更新的 DB庫也是一樣的。
也就是說可以這樣解決但是不完美,我看到有的人是這樣做的他使用getResourceAsStream 把數(shù)據(jù)讀取到內(nèi)存,然后再寫出成本地臨時文件,然后再使用,我只想說這個解決方式也太不友好了吧
文件不能更新 需要寫臨時文件(權(quán)限問題,如果被刪除了還得重寫)
只能使用memorySearch 方法
這個原因值得說明一下,因?yàn)槟闶褂闷渌麅蓚€search 方法的時候都會拋出異常Exception in thread "main" java.lang.NullPointerException
這主要是因?yàn)槠渌麅蓚€方法都是涉及到從文件讀取數(shù)據(jù)進(jìn)來,但是我們的raf 是null
學(xué)會獨(dú)立思考并且解決問題
上面我們的UDF 其實(shí)已經(jīng)可以正常使用了,但是有不足之處,這里我們就處理一下這個問題,前面我們說過了其實(shí)在IDEA 里的路徑參數(shù)可以使用,那就說明傳入本地文件是可以的,但是有一個問題就是我們的UDF 是可能在所有節(jié)點(diǎn)上運(yùn)行的,所以傳入本地路徑的前提是需要保證所有節(jié)點(diǎn)上這個本地路徑都可用,但是這樣維護(hù)成本也很高,還不如直接將依賴放在jar 包里面。
繼承DbSearcher
其實(shí)我們是可以將這個依賴放在OSS或者是HDFS 上的,但是這個時候你傳入路徑之后,還是有問題,因?yàn)闃?gòu)造方法里面讀取文件的時候默認(rèn)的是本地方法,其實(shí)這個時候你可以繼承DbSearcher 方法,然后添加新的構(gòu)造方法,完成從HDFS 上讀取文件。
//?構(gòu)造方法
public?DbSearcher(DbConfig?dbConfig,?byte[]?dbBinStr)?{
????this.dbConfig?=?dbConfig;
????this.dbBinStr?=?dbBinStr;
????this.firstIndexPtr?=?Util.getIntLong(dbBinStr,?0);
????this.lastIndexPtr?=?Util.getIntLong(dbBinStr,?4);
????this.totalIndexBlocks?=?(int)((this.lastIndexPtr?-?this.firstIndexPtr)?/?(long)IndexBlock.getIndexBlockLength())?+?1;
}
讀取文件傳入字節(jié)數(shù)組
還有一個方法就是我們直接使用第二個構(gòu)造方法,dbBinStr 就是我們讀取進(jìn)來的字節(jié)數(shù)組,這個時候不論這個依賴是在HDFS 還是OSS 上你只要調(diào)用相關(guān)的API 就可以了,其實(shí)這個方法我們在讀取jar包里面的文件的時候已經(jīng)使用過了
下面的ctx就是OSS的上下問,用來從OSS上讀取數(shù)據(jù),同理你可以從任何你需要的地方讀取數(shù)據(jù)。
DbConfig?config?=?null;
try?{
????config?=?new?DbConfig();
}?catch?(DbMakerConfigException?e)?{
????e.printStackTrace();
}
InputStream?inputStream?=?ctx.readResourceFileAsStream("ip2region.db");
ByteArrayOutputStream?output?=?new?ByteArrayOutputStream();
byte[]?buffer?=?new?byte[4096];
int?n?=?0;
while?(-1?!=?(n?=?inputStream.read(buffer)))?{
????output.write(buffer,?0,?n);
}
byte[]?bytes?=?output.toByteArray();
searcher?=?new?DbSearcher(config,?bytes);
總結(jié)
Idea里面使用文件路徑是可以的,但是jar里面不行,要使用也是本地文件或者是使用getResourceAsStream獲取InputStream;存儲在 HDFS或者OSS上的文件也不能使用路徑,因?yàn)槟J(rèn)是讀取本地文件的;多思考,為什么,看看源碼,最后請你思考一下怎么在外部依賴的情況下使用 binarySearch或者是btreeSearch方法;
數(shù)倉建模—寬表的設(shè)計(jì)
Spark SQL知識點(diǎn)與實(shí)戰(zhàn)
Hive計(jì)算最大連續(xù)登陸天數(shù)
Hadoop 數(shù)據(jù)遷移用法詳解
數(shù)倉建模分層理論
