2w 字帶你實(shí)戰(zhàn) ElasticSearch !
一、 基礎(chǔ)知識
你會(huì)如何處理實(shí)時(shí)或準(zhǔn)實(shí)時(shí)數(shù)據(jù)流?
在大數(shù)據(jù)時(shí)代,有很多方案可以幫助你完成這項(xiàng)任務(wù)。
接下來,我將通過一個(gè)系列的教程,我將利用Storm、Kafka、ElasticSearch逐步教你搭建一個(gè)實(shí)時(shí)計(jì)算系統(tǒng)。
搭建系統(tǒng)之前,我們首先需要了解一些定義。
通過考慮四個(gè)不同的屬性,幫助你更好地理解大數(shù)據(jù):數(shù)據(jù)量,速度,多樣性和準(zhǔn)確性。
數(shù)據(jù)量:海量數(shù)據(jù) 速度:數(shù)據(jù)處理的速度 多樣性:任何類型的數(shù)據(jù),包括結(jié)構(gòu)化和非結(jié)構(gòu)化 準(zhǔn)確性:傳入和傳出數(shù)據(jù)的準(zhǔn)確性
存在具有不同用途的大數(shù)據(jù)工具:
數(shù)據(jù)處理工具對數(shù)據(jù)執(zhí)行某種形式的計(jì)算 數(shù)據(jù)傳輸工具將數(shù)據(jù)收集和引入數(shù)據(jù)處理工具 數(shù)據(jù)存儲工具在不同處理階段存儲數(shù)據(jù)
數(shù)據(jù)處理工具可進(jìn)一步分類為:
批處理:批處理是要一起處理的數(shù)據(jù)的集合。批處理允許你將不同的數(shù)據(jù)點(diǎn)連接、合并或聚合在一起。在整個(gè)批處理完成之前,其結(jié)果通常不可用。批處理越大,等待從中獲取有用信息的時(shí)間越長。如果需要更直接的結(jié)果,流處理是更好的解決方案。 流處理:流處理器作用于無限制的數(shù)據(jù)流,而不是連續(xù)攝取的一批數(shù)據(jù)點(diǎn)(“流”)。與批處理過程不同,流中沒有明確定義的數(shù)據(jù)流起點(diǎn)或終點(diǎn),而且,它是連續(xù)的。
批處理
流梳理
示例數(shù)據(jù)和方案
我們將使用一些實(shí)際數(shù)據(jù)來數(shù)據(jù)規(guī)約系統(tǒng)(DRS)。根據(jù)維基百科,“數(shù)據(jù)規(guī)約是將數(shù)字或字母數(shù)字信息…轉(zhuǎn)換為校正,有序和簡化的形式。基本概念是將大量數(shù)據(jù)規(guī)約為有意義的形式。”
數(shù)據(jù)源將是實(shí)際的航空公司歷史飛行數(shù)據(jù),我們的最終目標(biāo)是能夠在地圖上顯示航班歷史數(shù)據(jù)。
我們將構(gòu)建的最終數(shù)據(jù)處理鏈路如下圖所示:

可以使用SMACK替代上述方案:
Spark:引擎(替代Storm) Mesos:容器 Akka:模型 Cassandra:存儲(替代ElasticSearch) Kafka:消息隊(duì)列
或者,你可以嘗試自己使用自己喜歡的編程語言來實(shí)現(xiàn)它。

單線程調(diào)度程序使用以下方式以循環(huán)方式將工作分配給多個(gè)處理器(例如,可以是Raspberry Pi的陣列)。MQTT用于數(shù)據(jù)交換。每個(gè)處理器并行處理數(shù)據(jù)并產(chǎn)生結(jié)果,這些結(jié)果由收集器收集,收集器負(fù)責(zé)將其存儲到數(shù)據(jù)庫,NAS或?qū)崟r(shí)呈現(xiàn)。由于我們沒有與用于接收實(shí)時(shí)飛行數(shù)據(jù)的真實(shí)傳感器(例如雷達(dá))建立任何連接以演示實(shí)際流處理,因此我們只能選擇批處理(即下載歷史飛行數(shù)據(jù)并離線處理它們)。
我們將首先將數(shù)據(jù)直接存儲到ElasticSearch并在Kibana或其他UI應(yīng)用程序中可視化它們。
ElasticSearch
ElasticSearch是一個(gè)面向文檔的分布式搜索引擎,用于處理以文檔形式存儲數(shù)據(jù)。
ElasticSearch具有如下優(yōu)勢:
跨多個(gè)節(jié)點(diǎn)可擴(kuò)展 搜索結(jié)果速度非常快 多語種 面向文檔 支持即時(shí)搜索 支持模糊搜索 開源,不收費(fèi)
ElasticStack由許多產(chǎn)品組成:
ElasticSearch:我們將在本文中重點(diǎn)介紹的 Kibana:一個(gè)分析和可視化平臺,可讓你輕松地可視化Elasticsearch中的數(shù)據(jù)并進(jìn)行分析 LogStash:數(shù)據(jù)處理管道 Beats:數(shù)據(jù)傳輸集合 X-pack:可為Elasticsearch和Kibana添加其他功能,例如安全性(身份驗(yàn)證和授權(quán)),性能(監(jiān)控),報(bào)告和機(jī)器學(xué)習(xí)
綜上所述,可以使用Beats和/或Logstash將數(shù)據(jù)導(dǎo)入Elasticsearch,也可以直接通過ElasticSearch的API。Kibana用于可視化ElasticSearch中的數(shù)據(jù)。

接下來,我們將學(xué)習(xí)如何安裝,啟動(dòng)和停止ElasticSearch和Kibana。在下一篇文章中,我們將提供產(chǎn)品概述,并學(xué)習(xí)如何將批量航班數(shù)據(jù)導(dǎo)入ElasticSearch。
1. 安裝ElasticSearch & Kibana
訪問ElasticSearch網(wǎng)站,下載安裝包,解壓,然后進(jìn)行接下,你會(huì)發(fā)現(xiàn)它包含如下內(nèi)容:
bin
config
data
jdk
lib
logs
modules
plugins它的主要配置文件是config/elasticsearch.yml。
通過如下命令,可以運(yùn)行ElasticSearch,
cd <elasticsearch-installation>
bin/elasticsearch使用瀏覽器打開鏈接http:// localhost:9200/,如果看到類似以下的內(nèi)容,那么恭喜你,你已經(jīng)正常運(yùn)行ElasticSearch實(shí)例了。
{
"name" : "MacBook-Pro.local",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "jyxqsR0HTOu__iUmi3m3eQ",
"version" : {
"number" : "7.9.0",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "a479a2a7fce0389512d6a9361301708b92dff667",
"build_date" : "2020-08-11T21:36:48.204330Z",
"build_snapshot" : false,
"lucene_version" : "8.6.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
ElasticSearch由一組節(jié)點(diǎn)組成(也就是存儲數(shù)據(jù)的ElasticSearch實(shí)例),每個(gè)節(jié)點(diǎn)存儲部分?jǐn)?shù)據(jù),同一臺計(jì)算機(jī)上運(yùn)行多個(gè)實(shí)例。
http://localhost:9200/_cluster/health?pretty
{
"cluster_name" : "elasticsearch",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 0,
"active_shards" : 0,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 100.0
}
集群狀態(tài)為green,我們看到它僅包含1個(gè)節(jié)點(diǎn)。數(shù)據(jù)作為JSON對象(或文檔)存儲在ElasticSearch中,使用索引將文檔組織在群集內(nèi)。索引是具有相似特征并在邏輯上相關(guān)的文檔的集合,通過索引,在邏輯上將文檔分組在一起,并提供與可伸縮性和可用性相關(guān)的配置選項(xiàng)。
數(shù)據(jù)分布在各個(gè)節(jié)點(diǎn)中,但是,實(shí)際上是如何實(shí)現(xiàn)的呢?
ElasticSearch使用分片。
分片是一種將索引分為不同部分的方法,其中每個(gè)部分稱為分片,分片可水平縮放數(shù)據(jù)。
如果發(fā)生磁盤故障并且存儲分片的節(jié)點(diǎn)發(fā)生故障,該怎么辦?
如果我們只有一個(gè)節(jié)點(diǎn),那么所有數(shù)據(jù)都會(huì)丟失。
默認(rèn)情況下,ElasticSearch支持分片復(fù)制以實(shí)現(xiàn)容錯(cuò)功能。主碎片的副本碎片在存儲主碎片的節(jié)點(diǎn)以外的節(jié)點(diǎn)中創(chuàng)建。主分片和副本分片都稱為復(fù)制組。在我們只有一個(gè)節(jié)點(diǎn)的示例中,沒有復(fù)制發(fā)生。如果磁盤出現(xiàn)故障,我的所有數(shù)據(jù)都會(huì)丟失。我們添加的節(jié)點(diǎn)越多,通過在節(jié)點(diǎn)周圍散布碎片就可以提高可用性。
ElasticSearch集群暴露REST API,使得開發(fā)者可以通過GET POST PUT DELETE命令進(jìn)行訪問。
有多種方法可以向ElasticSearch發(fā)出命令。
通過在瀏覽器中或使用 curl命令通過Kibana的控制臺工具
curl的訪問語法如下:
curl -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'參數(shù)解釋:
<VERB>:HTTP請求方法,GETPOSTPUTDELETE<PROTOCOL>或.:如果你在Elasticsearch前面有HTTPS代理,或者使用Elasticsearch安全功能來加密HTTP通信,請使用后者<HOST>:Elasticsearch集群中任何節(jié)點(diǎn)的主機(jī)名<PORT>:運(yùn)行Elasticsearch HTTP服務(wù)的端口,默認(rèn)為9200<PATH>:API的endpoint<BODY>:JSON編碼的請求正文
例如:
curl -X GET "localhost:9200/flight/_doc/1?pretty"將返回存儲在索引中的所有文檔,由于我們尚未在ElasticSearch中插入任何文檔,因此該查詢將返回錯(cuò)誤。
前面介紹了ElasticSearch的安裝方法,下面介紹一下Kibana的安裝。
訪問網(wǎng)站下載安裝包,解壓,通過下方命令運(yùn)行Kibana:
cd <kibana-installation>
bin/kibana在啟動(dòng)Kibana之前,請確保ElasticSearch已啟動(dòng)并正在運(yùn)行。
Kibana的目錄結(jié)構(gòu)如下:
bin
built_assets
config
data
node
node_modules
optimize
package.json
plugins
src
webpackShims
x-pack首次運(yùn)行Kibana(http://localhost:5601)時(shí),會(huì)讓你提供樣本數(shù)據(jù)或自行探索。
使用瀏覽器發(fā)送下方命令:
GET /_cat/health?v會(huì)得到下方信息:
epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1585684478 19:54:38 elasticsearch green 1 1 6 6 0 0 7 0 - 100.0%_catAPI提供有關(guān)屬于群集的節(jié)點(diǎn)的信息。
有一個(gè)更方便的API GET /_cat/indices?pretty,它提供了有關(guān)節(jié)點(diǎn)的更多詳細(xì)信息。
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .apm-custom-link ticRJ0PoTk26n8Ab7-BQew 1 0 0 0 208b 208b
green open .kibana_task_manager_1 SCJGLrjpTQmxAD7yRRykvw 1 0 6 99 34.4kb 34.4kb
green open .kibana-event-log-7.9.0-000001 _RqV43r_RHaa-ztSvhV-pA 1 0 1 0 5.5kb 5.5kb
green open .apm-agent-configuration 61x6ihufQfOiII0SaLHrrw 1 0 0 0 208b 208b
green open .kibana_1 lxQoYjPiStuVyK0pQ5_kaA 1 0 22 1 10.4mb 10.4mb在這一部分,我主要介紹了一下搭建數(shù)據(jù)規(guī)約系統(tǒng)涉及到的一些基本概念,以及ElasticSearch、Kibana的安裝,確保,這兩款關(guān)鍵工具能夠正常運(yùn)行。
在下一篇文章中,我們將看到如何將批量航班數(shù)據(jù)導(dǎo)入到ElasticSearch,并查看如何實(shí)際搜索它們。
二、ElasticSearch操作
在前面這一部分,我已經(jīng)解釋了ElasticSearch的基礎(chǔ)知識及其工作原理。
在這一部分,我們將學(xué)習(xí)如何在ElasticSearch中執(zhí)行搜索。
CRUD
在開發(fā)過程中,主要都在圍繞著數(shù)據(jù)的CRUD進(jìn)行處理,具體來說就是:
C – Create R – Retrieve or Read U – Update D – Delete
下表將每個(gè)CRUD命令與其各自的ElasticSearch HTTP / REST命令進(jìn)行了一一對應(yīng),
| CRUD command | HTTP/REST command |
|---|---|
| Create | PUT or POST |
| Read | GET |
| Update | PUT or POST |
| Delete | DELETE |
上一篇文章中,我們學(xué)習(xí)了Kibana,接下來,就切換到Kibana的控制臺。
1. 創(chuàng)建索引
通過如下命令,創(chuàng)建一個(gè)flight索引:
PUT /flight
GET /_cluster/health請注意,現(xiàn)在群集的運(yùn)行狀況已從綠色變?yōu)辄S色。發(fā)生這種情況是因?yàn)槲覀儍H運(yùn)行一個(gè)Elasticsearch實(shí)例。單節(jié)點(diǎn)群集具有完整的功能,但是無法將數(shù)據(jù)復(fù)制到其他節(jié)點(diǎn)以提供彈性。副本分片必須有其他可用節(jié)點(diǎn),群集狀態(tài)才能變?yōu)榫G色。如果群集狀態(tài)為紅色,則標(biāo)識某些數(shù)據(jù)不可用。
為了解決這個(gè)問題,您需要安裝另一個(gè)同樣的Elasticsearch,并在elasticsearch.yml中更改node.name;兩個(gè)實(shí)例中的cluster.name必須保持相同(默認(rèn)為elasticsearch)。
另一種方法是在命令行上將配置參數(shù)傳遞給Elasticsearch。
bin/elasticsearch -Enode.name=node-2 -Epath.data=./node-2/data -Epath.logs=./node-2/logs
GET /_cat/indices?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open flight w696u4y3SYWuGW--8VzW6Q 1 1 0 0 208b 208b2. 創(chuàng)建文檔
下面,向我們的索引添加一些示例數(shù)據(jù):
PUT /flight/_doc/1
{
"Icao":"A0835D",
"Alt":2400,
"Lat":39.984322,
"Long":-82.925616
}也可以使用curl命令:
curl -X PUT "localhost:9200/flight/_doc/1?pretty" -H 'Content-Type: application/json' -d'
{
"Icao":"A0835D",
"Alt":2400,
"Lat":39.984322,
"Long":-82.925616
}'在這種情況下,ElasticSearch將為我們的文檔生成一個(gè)自動(dòng)ID。這是ElasticSearch返回的結(jié)果:
Content-Type對于查詢成功至關(guān)重要, 我們創(chuàng)建了一個(gè)ID = 1的新排期。我們也可以使用POST代替PUT,但是在這種情況下,我們無法傳遞ID。
在這種情況下,ElasticSearch將為我們的文檔生成一個(gè)自動(dòng)ID。
下面是ElasticSearch返回的結(jié)果:
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "flight",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"Icao" : "A0835D",
"Alt" : 2400,
"Lat" : 39.984322,
"Long" : -82.925616
}
}
]
}
}結(jié)果文檔存儲在鍵值_source內(nèi)。
3. 刪除文檔
如果你知道文檔索引,可以直接通過索引進(jìn)行刪除:
DELETE /flight/_doc/14. 刪除索引
通過下方命令刪除索引:
DELETE /flight5. 批量導(dǎo)入數(shù)據(jù)
我們的方案是處理航班數(shù)據(jù),理想情況下,這些數(shù)據(jù)是從多個(gè)傳感器(雷達(dá))實(shí)時(shí)獲得的,但是由于這很難實(shí)現(xiàn)。
因此,我們將使用可從此處下載的批量歷史飛行數(shù)據(jù)。
在下載批處理文件的目錄中,發(fā)送以下命令(每個(gè).json文件):
curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flights/_bulk --data-binary "@2016-07-01-1300Z.json"請注意,內(nèi)容類型是application/x-ndjson,而不是application/x-json。
另外,請注意,我們將數(shù)據(jù)表示為二進(jìn)制以便保留換行符。
磁瓦ElasticSearch需要json文檔滿足特定格式:
{"index":{"_id":4800770}}
{"Rcvr":1,"HasSig":false,"Icao":"494102", "Bad":false,"Reg":"CS-PHB", ...}
...這意味著你必須將每個(gè)下載的.json文件轉(zhuǎn)換為上述格式。
如果你不想花時(shí)間手動(dòng)修改.json文檔,則在下一篇文章中,我們將開發(fā)一個(gè)Java程序來解析它們,并使用ElasticSearch的REST API將文件插入ElasticSearch中。
6. 搜索查詢
ElasticSearch是一款搜索相關(guān)的工具,它允許你進(jìn)行符合條件的搜索查詢。
GET /flight/_search?pretty
{ "query": {
"match_all" : {
}
}
}上面的搜索查詢匹配索引對應(yīng)的所有文檔。也可以這樣簡化:
GET /flight/_search下面是根據(jù)給定字段Icao進(jìn)行查詢:
GET /flight/_search?pretty
{ "query": {
"match" : {
"Icao" : "A0835D"
}
}
}也可以用嵌入U(xiǎn)RL進(jìn)行搜索:
GET /flight/_search?q=Icao:A0835D也可以這樣寫:
GET /flight/_search?pretty
{ "query": {
"query_string": {
"query": "Icao:A0835D"
}
}
}除了“match”和“query_string”以外,還可以使用“term”。使用“ term”表示精確匹配。
GET /flight/_search?pretty
{ "query": {
"term": {
"Mil": true
}
}
}你也可以使用“term”來搜索值數(shù)組。
除此之外,還可以使用通配符“wildcard”進(jìn)行搜索,包括*/?。
GET /flight/_search?pretty
{ "query": {
"wildcard": {
"Call": "NJ*"
}
}
}7. 更新文檔
如果你知道索引的ID,可以通過_updateAPI進(jìn)行更新。
POST /flight/_update/4800770
{
"doc": {
"Mil": true
}
}使用上述命令,我們也可以將新字段添加到文檔中。
附帶說明一下,ElasticSearch文檔是不可變的!
因此,當(dāng)我們請求更新文檔時(shí),ElasticSearch會(huì)在后臺進(jìn)行操作,它檢索文檔,更改其字段并為具有相同ID的文檔重新索引,從而對它進(jìn)行替換。
可以使用腳本發(fā)送更復(fù)雜的查詢,
POST /flight/_update/4800770
{
"script": {
"source": "ctx._source.FlightsCount++"
}
}ctx表示上下文。
還有許多其他更新文檔的方法,例如,upserts,即根據(jù)文件是否已存在有條件地更新或插入文件。
POST /flight/_update/4800771
{
"script": {
"source": "ctx._source.FlightsCount++"
},
"upsert": {
"Rcvr":1,
"HasSig":false,
"Icao":"AE4839",
...
},
}8. 刪除文檔
使用_delete_by_query API可以刪除文檔:
POST /flight/_delete_by_query
{
"query": {
"match_all": {}
}
}9. 批量查詢
批量API可幫助我們通過一個(gè)查詢對許多文檔執(zhí)行同樣的操作。
該API包含4個(gè)動(dòng)作:索引,創(chuàng)建,更新,刪除:
POST /_bulk
{ "index": { "_index" : "flight", "_id": 10519389 } }
{ "Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false, ... }
{ "create": { "_index" : "flight", "_id": 4800770 } }
{"Rcvr":1,"HasSig":false,"Icao":"494102","Bad":false, ... }
{ "update": { "_index" : "flight", "_id": 4800770 } }
{ "doc": {"Mil": true } }
{ "delete": { "_index" : "flight", "_id": 4800770 } }索引和創(chuàng)建操作之間的區(qū)別如下:如果文檔已經(jīng)存在,則創(chuàng)建將引發(fā)錯(cuò)誤,而索引將替換文檔。
如果批量查詢要針對相同的索引運(yùn)行,那么我們可以像這樣簡化查詢:
POST /flight/_bulk
{ "index": { "_id": 10519389 } }
{ "Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false, ... }
{ "create": { "_id": 4800770 } }
{"Rcvr":1,"HasSig":false,"Icao":"494102","Bad":false, ... }
{ "update": { "_id": 4800770 } }
{ "doc": {"Mil": true } }
{ "delete": { "_id": 4800770 } }10. 映射
ElasticSearch是如何映射數(shù)據(jù)的呢?
動(dòng)態(tài)映射意味著沒有明確定義映射,或者至少?zèng)]有為某些字段定義。
ElasticSearch是通過檢查文檔字段的值類型來完成的。
要查看數(shù)據(jù)映射,請?jiān)贙ibana中執(zhí)行以下內(nèi)容:
GET /flight/_mapping我們也可以通過下方命令手動(dòng)添加映射關(guān)系,
PUT /flight/_mapping
{
"properties": {
"location": {
"type": "geo_point"
}
}
}請注意,一旦創(chuàng)建了字段映射,就不能對其進(jìn)行修改。唯一的方法是刪除并重新創(chuàng)建索引。
在下面的示例中,我們手動(dòng)創(chuàng)建了各種禁用動(dòng)態(tài)映射的映射。
PUT /flight/_mapping
{
"dynamic": false,
"properties": {
"Rcvr": {
"type": "integer"
},
"Icao": {
"type": "text"
},
...
"location": {
"type": "geo_point"
}
}
}如果你更新了映射,請?jiān)诮脛?dòng)態(tài)映射的情況下發(fā)出以下查詢來更新ElasticSearch,
POST /flight/_update_by_query?conflicts_proceed在這部分,我重點(diǎn)介紹了如何使用ElasticSearch的常用功能。
在下一一部分中,我們將學(xué)習(xí)如何在將json文件轉(zhuǎn)換為ElasticSearch的批量API所需的格式之后,以及通過使用JSON庫解析json文件,并將批處理json文件導(dǎo)入到ElasticSearch中。
三、數(shù)據(jù)導(dǎo)入
在第二部分中,我們學(xué)習(xí)了如何在ElasticSearch中執(zhí)行搜索。但是,我們無法使用其批量API將.json數(shù)據(jù)文件導(dǎo)入ElasticSearch。
在這部分中,我們將進(jìn)行一些編程,并學(xué)習(xí)一些有關(guān)如何將.json飛行數(shù)據(jù)文件導(dǎo)入ElasticSearch的方法:
通過將.json數(shù)據(jù)文件轉(zhuǎn)換為ElasticSearch的API需要的格式 通過解析.json數(shù)據(jù)文件,使用JSON庫(例如gson)提取其值,然后使用ElasticSearch的REST API導(dǎo)入數(shù)據(jù)
數(shù)據(jù)轉(zhuǎn)換
ElasticSearch對數(shù)據(jù)格式有特定的格式要求:
{``"index"``:{``"_id"``:4800770}}
{``"Rcvr"``:1,``"HasSig"``:``false``,``"Icao"``:``"494102"``, ``"Bad"``:``false``,``"Reg"``:``"CS-PHB"``, ...}
...
這就意味著,你需要把下載的每一份json數(shù)據(jù)按照上述格式進(jìn)行轉(zhuǎn)換。主要滿足如下2點(diǎn):
在每個(gè)數(shù)據(jù)文檔前面加入一行以 index開頭的數(shù)據(jù)把 "Id":<value>修改為{"_id":<value>}
我們可以通過編寫簡單的Java程序,快速把json文件轉(zhuǎn)換成對應(yīng)格式:
package com.jgc;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static java.util.stream.Collectors.toList;
/**
* Converts a flight data json file to a format that can be imported to
* ElasticSearch using its bulk API.
*/
public class JsonFlightFileConverter {
private static final Path flightDataJsonFile =
Paths.get("src/main/resources/flightdata/2016-07-01-1300Z.json");
public static void main(String[] args) {
List<String> list = new ArrayList<>();
try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
list = stream
.map(line -> line.split("\\{"))
.flatMap(Arrays::stream)
.collect(toList());
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(list);
}
}
最后,通過簡單的拼接,輸出我們想要的結(jié)果:
final String result = list.stream().skip(3)
.map(s -> "{" + s + "\n")
.collect(Collectors.joining());
System.out.println(result);
現(xiàn)在,可以看到輸出已經(jīng)非常接近我們想要的結(jié)果:
{"Id":4800770,"Rcvr":1,"HasSig":false,"Icao":"494102", ...
實(shí)際上,我們可以將最后一個(gè)代碼片段添加到原始流中,如下所示:
String result = "";
try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
result = stream
.map(line -> line.split("\\{"))
.flatMap(Arrays::stream)
.skip(3)
.map(s -> "{" + s + "\n")
.collect(Collectors.joining());
} catch (IOException e) {
e.printStackTrace();
}
現(xiàn)在,我們需要在每行的上方插入新行,其中包含文檔的索引,如下所示:
{"index":{"_id":4800770}}
我們可以創(chuàng)建一個(gè)函數(shù),這樣處理會(huì)更加簡潔明了:
private static String insertIndex(String s) {
final String[] keyValues = s.split(",");
final String[] idKeyValue = keyValues[0].split(":");
return "{\"index\":{\"_id\":"+ idKeyValue[1] +"}}\n";
}
這樣,就可以對每個(gè)輸入進(jìn)行轉(zhuǎn)換,給出我們需要的輸出。
我們還需要解決的更多細(xì)節(jié),從每個(gè)文檔中刪除最后一個(gè)逗號。
private static String removeLastComma(String s) {
return s.charAt(s.length() - 1) == ',' ? s.substring(0, s.length() - 1) : s;
}
這時(shí)候,數(shù)據(jù)處理代碼就變成了下面這個(gè)樣子:
public class JsonFlightFileConverter {
public static void main(String[] args) {
if (args.length == 1) {
Path inDirectoryPath = Paths.get(args[0]);
if (inDirectoryPath != null) {
Path outDirectoryPath = Paths.get(inDirectoryPath.toString(), "out");
try {
if (Files.exists(outDirectoryPath)) {
Files.walk(outDirectoryPath)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
}
Files.createDirectory(Paths.get(inDirectoryPath.toString(), "out"));
} catch (IOException e) {
e.printStackTrace();
}
try (DirectoryStream ds = Files.newDirectoryStream(inDirectoryPath, "*.json")) {
for (Path inFlightDataJsonFile : ds) {
String result = "";
try (Stream stream =
Files.lines(inFlightDataJsonFile.toAbsolutePath())) {
result = stream
.parallel()
.map(line -> line.split("\\{"))
.flatMap(Arrays::stream)
.skip(3)
.map(s -> createResult(s))
.collect(Collectors.joining());
Path outFlightDataJsonFile =
Paths.get(outDirectoryPath.toString(),
inFlightDataJsonFile.getFileName().toString());
Files.createFile(outFlightDataJsonFile);
Files.writeString(outFlightDataJsonFile, result);
}
} catch (IOException e) {
e.printStackTrace();
}
}
} else {
System.out.println("Usage: java JsonFlightFileConverter ");
}
...
使用ElasticSearch的批量API導(dǎo)入數(shù)據(jù)
需要再次強(qiáng)調(diào),文件必須以空行結(jié)尾。如果不是,則添加一個(gè)(實(shí)際上前面的程序已經(jīng)在文件末尾添加了換行符)。
在產(chǎn)生新的.json文件的目錄(輸出目錄)內(nèi),執(zhí)行以下命令:
curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flight/_bulk --data-binary "@2016-07-01-1300Z.json"請注意,內(nèi)容類型是application / x-ndjson,而不是application / x-json。
還要注意,我們將數(shù)據(jù)表示為二進(jìn)制以便保留換行符。文件名為2016-07-01-1300Z.json。
ElasticSearch中任何具有相同ID的現(xiàn)有文檔都將被.json文件中的文檔替換。
最后,可以發(fā)現(xiàn)有7679文件被導(dǎo)入:
"hits" : {
"total" : {
"value" : 7679,
"relation" : "eq"
},
GET /_cat/shards?v
返回結(jié)果:
index shard prirep state docs store ip node
flight 0 p STARTED 7679 71mb 127.0.0.1 MacBook-Pro.local
flight 0 r UNASSIGNED解析JSON數(shù)據(jù)
將這些文檔導(dǎo)入ElasticSearch的另一種方法是將JSON數(shù)據(jù)文件解析到內(nèi)存中,并使用ElasticSearch的REST API將其導(dǎo)入ElasticSearch。
有許多庫可用于解析Java中的JSON文件:
GSon Jackson mJson JSON-Simple JSON-P
我們將使用Google的GSon庫,但其他任何JSON庫都可以完成此工作。
GSon提供了多種表示JSON數(shù)據(jù)的方法,具體使用哪一種,則取決于下一步,即如何將數(shù)據(jù)導(dǎo)入到ElasticSearch。
ElasticSearch API要求數(shù)據(jù)的格式為:Map<String, Object>,這是我們將解析后的JSON數(shù)據(jù)存儲到的位置。
首先,將下面依賴加入到pom.xml中:
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
使用下方代碼解析json數(shù)據(jù):
package com.jcg;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import com.google.gson.reflect.TypeToken;
import java.io.BufferedReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
public class JsonFlightFileReader {
private static final String flightDataJsonFile = "src/main/resources/flightdata/2016-07-01-1300Z.json";
private static final Gson gson = new Gson();
public static void main(String[] args) {
parseJsonFile(flightDataJsonFile);
}
private static void parseJsonFile(String file) {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(file))) {
Map<String, Object> map = gson.fromJson(reader,
new TypeToken<Map<String, Object>>() { }.getType());
List<Object> acList = (List<Object>) (map.get("acList"));
for (Object item : acList) {
LinkedTreeMap<String, Object> flight =
(LinkedTreeMap<String, Object>) item;
for (Map.Entry<String, Object> entry : flight.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
String outEntry = (key.equals("Id") ? "{" + key : key) + " : " + value + ", ";
System.out.print(outEntry);
}
System.out.println("}");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
通過下述方法可以使用數(shù)據(jù):
Map<String, Object> map = gson.fromJson(reader, new TypeToken<Map<String, Object>>() {}.getType());
List<Object> acList = (List<Object>) (map.get("acList"));
使用ElasticSearch REST API導(dǎo)入數(shù)據(jù)
首先,在pom.xml中加入下方依賴:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.10.0</version>
</dependency>
我們可以通過RestClient與ElasticSearch進(jìn)行交互:
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http"));
.setDefaultHeaders(new Header[]{
new BasicHeader("accept", "application/json"),
new BasicHeader("content-type", "application/json")})
.setFailureListener(new RestClient.FailureListener() {
public void onFailure(Node node) {
System.err.println("Low level Rest Client Failure on node " +
node.getName());
}
}).build();
創(chuàng)建好RestClient之后,下一步就是創(chuàng)建一個(gè)Request,并將json數(shù)據(jù)傳遞給它:
Request request = new Request("POST", "/flight/_doc/4800770");
String jsonDoc = "{\"Rcvr\":1,\"HasSig\":false,\"Icao\":\"494102\",...]}";
request.setJsonEntity(jsonDoc);
最后,我們發(fā)送請求。
有兩種方式,同步:
Response response = restClient.performRequest(request);
if (response.getStatusLine().getStatusCode() != 200) {
System.err.println("Could not add document with Id: " + id + " to index /flight");
}
異步:
Cancellable cancellable = restClient.performRequestAsync(request,
new ResponseListener() {
@Override
public void onSuccess(Response response) {
System.out.println("Document with Id: " + id + " was successfully added to index /flight");
}
@Override
public void onFailure(Exception exception) {
System.err.println("Could not add document with Id: " + id + " to index /flight");
}
});
最后,不要忘記關(guān)閉restClient連接:
} finally {
try {
restClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
這部分,我們重點(diǎn)介紹了如何將.json數(shù)據(jù)批處理文件導(dǎo)入到ElasticSearch。
我們看到了如何通過兩種方式做到這一點(diǎn):
使用ElasticSearch的批量API, 使用JSON庫解析.json文件
你可以根據(jù)自己的情況自行選擇其中一種方法。
四、Logstash
在本系列文章的第3部分關(guān)于實(shí)時(shí)流處理的文章中,我們學(xué)習(xí)了如何使用ElasticSearch的批量API以及利用REST API將.json航班數(shù)據(jù)文件導(dǎo)入ElasticSearch。
在這篇文章中,我們將介紹另一種方式,Logstash。
Logstash介紹
Logstash是一個(gè)開源的數(shù)據(jù)收集引擎,具有實(shí)時(shí)流水線功能。
它從多個(gè)源頭接收數(shù)據(jù),進(jìn)行數(shù)據(jù)處理,然后將轉(zhuǎn)化后的信息發(fā)送到stash,即存儲。
Logstash允許我們將任何格式的數(shù)據(jù)導(dǎo)入到任何數(shù)據(jù)存儲中,不僅僅是ElasticSearch。
它可以用來將數(shù)據(jù)并行導(dǎo)入到其他NoSQL數(shù)據(jù)庫,如MongoDB或Hadoop,甚至導(dǎo)入到AWS。
數(shù)據(jù)可以存儲在文件中,也可以通過流等方式進(jìn)行傳遞。
Logstash對數(shù)據(jù)進(jìn)行解析、轉(zhuǎn)換和過濾。它還可以從非結(jié)構(gòu)化數(shù)據(jù)中推導(dǎo)出結(jié)構(gòu),對個(gè)人數(shù)據(jù)進(jìn)行匿名處理,可以進(jìn)行地理位置查詢等等。
一個(gè)Logstash管道有兩個(gè)必要的元素,輸入和輸出,以及一個(gè)可選的元素,過濾器。
輸入組件從源頭消耗數(shù)據(jù),過濾組件轉(zhuǎn)換數(shù)據(jù),輸出組件將數(shù)據(jù)寫入一個(gè)或多個(gè)目的地。
所以,我們的示例場景的Logstash架構(gòu)基本如下。
我們從.json文件中讀取我們的航班數(shù)據(jù),我們對它們進(jìn)行處理/轉(zhuǎn)換,應(yīng)用一些過濾器并將它們存儲到ElasticSearch中。
Logstash安裝
有幾種選擇來安裝Logstash。
一種是訪問網(wǎng)站下載你平臺的存檔,然后解壓到一個(gè)文件夾。
你也可以使用你的平臺的包管理器來安裝,比如yum、apt-get或homebrew,或者作為docker鏡像來安裝。
確保你已經(jīng)定義了一個(gè)環(huán)境變量JAVA_HOME,指向JDK 8或11或14的安裝(Logstash自帶嵌入式AdoptJDK)。
Logstash工作流
一旦你安裝了它,讓我們通過運(yùn)行最基本的Logstash工作流來測試你的Logstash安裝情況。
bin/logstash -e 'input { stdin { } } output { stdout {} }'上面的工作流接受來自stdin(即你的鍵盤)的輸入,并將其輸出到stdout(即你的屏幕)。
上面的工作流中沒有定義任何過濾器。一旦你看到logstash被成功啟動(dòng)的消息,輸入一些東西(我輸入的是Hello world),按ENTER鍵,你應(yīng)該看到產(chǎn)生的消息的結(jié)構(gòu)格式,像下面這樣。
[2021-02-11T21:52:57,120][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
Hello world
{
"message" => "Hello world",
"@version" => "1",
"@timestamp" => 2021-02-11T19:57:46.208Z,
"host" => "MacBook-Pro.local"
}然而,通常Logstash是通過配置文件來工作的,配置文件告訴它該做什么,即在哪里找到它的輸入,如何轉(zhuǎn)換它,在哪里存儲它。Logstash配置文件的結(jié)構(gòu)基本上包括三個(gè)部分:輸入、過濾和輸出。
你在輸入部分指定數(shù)據(jù)的來源,在輸出部分指定目的地。在過濾器部分,你可以使用支持的過濾器插件來操作、測量和創(chuàng)建事件。
配置文件的結(jié)構(gòu)如下面的代碼示例所示。
input {...}
filter {...}
output{...}你需要?jiǎng)?chuàng)建一個(gè)配置文件,指定你要使用的組件和每個(gè)組件的設(shè)置。在config文件夾中已經(jīng)存在一個(gè)配置文件樣本,logstash-sample.conf。
其內(nèi)容如下所示。
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}這里input部分定義了Logstash應(yīng)該從哪里獲取數(shù)據(jù)。這里有一個(gè)可用的輸入插件列表。
我們的輸入不是來自Beats組件,而是來自文件系統(tǒng),所以我們使用文件輸入組件。
input {
file {
start_position => "beginning"
path => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json"
codec => "json"
}
}我們使用start_position參數(shù)來告訴插件從頭開始讀取文件。
需要注意,數(shù)據(jù)路徑必須是絕對的。
我們使用的是json編解碼器,除了json,還可以使用純文本形式。
在下載的數(shù)據(jù)中,可以找到一個(gè)名為test.json的文件。它只由2條航班數(shù)據(jù)組成的文件。
輸出塊定義了Logstash應(yīng)該在哪里存儲數(shù)據(jù)。我們將使用ElasticSearch來存儲我們的數(shù)據(jù)。
我們添加了第二個(gè)輸出作為我們的控制臺,并使用rubydebugger格式化輸出,第三個(gè)輸出作為文件系統(tǒng),最后兩個(gè)用于測試我們的輸出。我們將輸出存儲在output.json中。
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "testflight"
}
file {
path => "/usr/local/Cellar/logstash-full/7.11.0/data/output.json"
}
stdout {
codec => rubydebug
}
}此外,還可以定義過濾器來對數(shù)據(jù)進(jìn)行轉(zhuǎn)換。
Logstash提供了大量的過濾器,下面介紹一些非常常用的的過濾器:
grok:解析任何任意文本并添加結(jié)構(gòu),它包含120種內(nèi)置模式 mutate:對字段進(jìn)行一般的轉(zhuǎn)換,例如重命名、刪除、替換和修改字段 drop:丟棄一個(gè)數(shù)據(jù) clone:復(fù)制一個(gè)數(shù)據(jù),可能增加或刪除字段 geoip:添加IP地址的地理位置信息 split:將多行消息、字符串或數(shù)組分割成不同的數(shù)據(jù)
可以通過執(zhí)行下方命令查看 Logstash 安裝中安裝的全部插件列表。
$ bin/logstash-plugin list你會(huì)注意到,有一個(gè)JSON過濾器插件。這個(gè)插件可以解析.json文件并創(chuàng)建相應(yīng)的JSON數(shù)據(jù)結(jié)構(gòu)。
正確地選擇和配置過濾器是非常重要的,否則,你最終的輸出中沒有數(shù)據(jù)。
所以,在我們的過濾塊中,我們啟用json插件,并告訴它我們的數(shù)據(jù)在消息字段中。
filter {
json {
source => "message"
}
}到此為止,完成的配置文件config/testflight.conf內(nèi)容如下:
input {
file {
start_position => "beginning"
path => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json"
codec => "json"
}
}
filter {
json {
source => "message"
}
}
output {
# elasticsearch {
# hosts => ["http://localhost:9200/"]
# index => "testflight"
# }
file {
path => "/usr/local/Cellar/logstash-full/7.11.0/data/output.json"
}
stdout {
codec => rubydebug
}
}你可以通過如下命令進(jìn)行一下測試:
bin/logstash -f config/testflight.conf --config.test_and_exit
...
Configuration OK
[2021-02-11T23:15:38,997][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash如果配置文件通過了配置測試,用以下命令啟動(dòng)Logstash。
bin/logstash -f config/testflight.conf --config.reload.automatic
...--config.reload.automatic配置選項(xiàng)可以實(shí)現(xiàn)自動(dòng)重載配置,這樣你就不必每次修改配置文件時(shí)都要停止并重新啟動(dòng)Logstash。
如果一切順利,你應(yīng)該會(huì)看到如下的輸出結(jié)果。
{
"CMsgs" => 1,
"@version" => "1",
"PosTime" => 1467378028852,
"Rcvr" => 1,
"EngMount" => 0,
"Tisb" => false,
"Mil" => false,
"Trt" => 2,
"Icao" => "A0835D",
"Long" => -82.925616,
"InHg" => 29.9409447,
"VsiT" => 1,
"ResetTrail" => true,
"CallSus" => false,
"@timestamp" => 2021-02-14T18:32:16.337Z,
"host" => "MacBook-Pro.local",
"OpIcao" => "RPA",
"Man" => "Embraer",
"GAlt" => 2421,
"TT" => "a",
"Bad" => false,
"HasSig" => true,
"TSecs" => 1,
"Vsi" => 2176,
"EngType" => 3,
"Reg" => "N132HQ",
"Alt" => 2400,
"Species" => 1,
"FlightsCount" => 0,
"WTC" => 2,
"Cos" => [
[0] 39.984322,
[1] -82.925616,
[2] 1467378028852.0,
[3] nil
],"message" => "{\"Id\":10519389,\"Rcvr\":1,\"HasSig\":true,\"Sig\":0,\"Icao\":\"A0835D\",\"Bad\":false,\"Reg\":\"N132HQ\",\"FSeen\":\"\\/Date(1467378028852)\\/\",\"TSecs\":1,\"CMsgs\":1,\"Alt\":2400,\"GAlt\":2421,\"InHg\":29.9409447,\"AltT\":0,\"Lat\":39.984322,\"Long\":-82.925616,\"PosTime\":1467378028852,\"Mlat\":true,\"Tisb\":false,\"Spd\":135.8,\"Trak\":223.2,\"TrkH\":false,\"Type\":\"E170\",\"Mdl\":\"2008 EMBRAER-EMPRESA BRASILEIRA DE ERJ 170-200 LR\",\"Man\":\"Embraer\",\"CNum\":\"17000216\",\"Op\":\"REPUBLIC AIRLINE INC - INDIANAPOLIS, IN\",\"OpIcao\":\"RPA\",\"Sqk\":\"\",\"Vsi\":2176,\"VsiT\":1,\"WTC\":2,\"Species\":1,\"Engines\":\"2\",\"EngType\":3,\"EngMount\":0,\"Mil\":false,\"Cou\":\"United States\",\"HasPic\":false,\"Interested\":false,\"FlightsCount\":0,\"Gnd\":false,\"SpdTyp\":0,\"CallSus\":false,\"ResetTrail\":true,\"TT\":\"a\",\"Trt\":2,\"Year\":\"2008\",\"Cos\":[39.984322,-82.925616,1467378028852.0,null]}",
"Lat" => 39.984322,
"TrkH" => false,
"Op" => "REPUBLIC AIRLINE INC - INDIANAPOLIS, IN",
"Engines" => "2",
"Sqk" => "",
"Id" => 10519389,
"Gnd" => false,
"CNum" => "17000216",
"path" => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json",
"Cou" => "United States",
"HasPic" => false,
"FSeen" => "/Date(1467378028852)/",
"Interested" => false,
"Mdl" => "2008 EMBRAER-EMPRESA BRASILEIRA DE ERJ 170-200 LR",
"Spd" => 135.8,
"Sig" => 0,
"Trak" => 223.2,
"Year" => "2008",
"SpdTyp" => 0,
"AltT" => 0,
"Type" => "E170",
"Mlat" => true
}數(shù)據(jù)轉(zhuǎn)換
首先,讓我們從輸出中刪除path, @version, @timestamp, host和message,這些都是logstash添加的。
filter {
json {
source => "message"
}
mutate {
remove_field => ["path", "@version", "@timestamp", "host", "message"]
}
}mutate過濾器組件可以刪除不需要的字段。
重新運(yùn)行:
bin/logstash -f config/flightdata-logstash.conf –-config.test_and_exit
bin/logstash -f config/flightdata-logstash.conf --config.reload.automatic接下來,我們將_id設(shè)置為Id。
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "testflight"
document_id => "%{Id}"
}我們在輸出組件中通過設(shè)置document_id來實(shí)現(xiàn)。
然而,如果你重新運(yùn)行l(wèi)ogstash,你會(huì)發(fā)現(xiàn)Id字段仍然存在。
有一個(gè)竅門,在過濾插件中把它改名為[@metadata][Id],然后在輸出中使用,@metadata字段被自動(dòng)刪除。
filter {
json {
source => "message"
}
mutate {
remove_field => ["path", "@version", "@timestamp", "host", "message"]
rename => { "[Id]" => "[@metadata][Id]" }
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "flight-logstash"
document_id => "%{[@metadata][Id]}"
}
...現(xiàn)在讓我們嘗試解析日期。如果你還記得,這是我們在上一篇文章中沒有做的事情,我們需要將日期轉(zhuǎn)換為更適合人們熟悉的格式。
例如:
"FSeen" => "\/Date(1467378028852)\/"需要將時(shí)間1467378028852轉(zhuǎn)化成容易閱讀的格式,并且去掉前后多余的字符串,通過gsub組件可以實(shí)現(xiàn)這項(xiàng)功能:
gsub => [
# get rid of /Date(
"FSeen", "\/Date\(", "",
# get rid of )/
"FSeen", "\)\/", ""
]這里通過gsub去掉了數(shù)據(jù)中/Date()\等多余部分,輸出結(jié)果為:
"FSeen" : "1467378028852"然后把時(shí)間戳轉(zhuǎn)換成熟悉的格式:
date {
timezone => "UTC"
match => ["FSeen", "UNIX_MS"]
target => "FSeen"
}UNIX_MS是UNIX時(shí)間戳,單位是毫秒。我們匹配字段FSeen并將結(jié)果存儲在同一字段中,輸出結(jié)果為:
"FSeen" : "2016-07-01T13:00:28.852Z",上述轉(zhuǎn)換的完整代碼如下:
mutate {
gsub => [
# get rid of /Date(
"FSeen", "\/Date\(", "",
# get rid of )/
"FSeen", "\)\/", ""
]
}
date {
timezone => "UTC"
match => ["FSeen", "UNIX_MS"]
target => "FSeen"
}在這部分中,我們學(xué)習(xí)了如何使用Logstash將.json航班數(shù)據(jù)批量文件導(dǎo)入到ElasticSearch中。Logstash是一個(gè)非常方便的方式,它有很多過濾器,支持很多數(shù)據(jù)類型,你只需要學(xué)習(xí)如何編寫一個(gè)配置文件就可以了!
Logstash是否適合實(shí)時(shí)數(shù)據(jù)處理?
答案是:要看情況
Logstash主要是為批處理數(shù)據(jù)而設(shè)計(jì)的,比如日志數(shù)據(jù),也許不適合處理來自傳感器的實(shí)時(shí)航班數(shù)據(jù)。
不過,你可以參考一些參考資料,這些資料描述了如何創(chuàng)建可以擴(kuò)展的Logstash部署,并使用Redis作為Logstash代理和Logstash中央服務(wù)器之間的中介,以便處理許多事件并實(shí)時(shí)處理它們。
作者:Jackpop
鏈接:https://www.zhihu.com/question/469207536/answer/2550197723
往期推薦
??

