<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          2w 字帶你實(shí)戰(zhàn) ElasticSearch !

          共 38383字,需瀏覽 77分鐘

           ·

          2022-07-13 07:18

          一、 基礎(chǔ)知識

          你會(huì)如何處理實(shí)時(shí)或準(zhǔn)實(shí)時(shí)數(shù)據(jù)流?

          在大數(shù)據(jù)時(shí)代,有很多方案可以幫助你完成這項(xiàng)任務(wù)。

          接下來,我將通過一個(gè)系列的教程,我將利用Storm、Kafka、ElasticSearch逐步教你搭建一個(gè)實(shí)時(shí)計(jì)算系統(tǒng)。

          搭建系統(tǒng)之前,我們首先需要了解一些定義。

          通過考慮四個(gè)不同的屬性,幫助你更好地理解大數(shù)據(jù):數(shù)據(jù)量,速度,多樣性和準(zhǔn)確性。

          • 數(shù)據(jù)量:海量數(shù)據(jù)
          • 速度:數(shù)據(jù)處理的速度
          • 多樣性:任何類型的數(shù)據(jù),包括結(jié)構(gòu)化和非結(jié)構(gòu)化
          • 準(zhǔn)確性:傳入和傳出數(shù)據(jù)的準(zhǔn)確性

          存在具有不同用途的大數(shù)據(jù)工具:

          • 數(shù)據(jù)處理工具對數(shù)據(jù)執(zhí)行某種形式的計(jì)算
          • 數(shù)據(jù)傳輸工具將數(shù)據(jù)收集和引入數(shù)據(jù)處理工具
          • 數(shù)據(jù)存儲工具在不同處理階段存儲數(shù)據(jù)

          數(shù)據(jù)處理工具可進(jìn)一步分類為:

          • 批處理:批處理是要一起處理的數(shù)據(jù)的集合。批處理允許你將不同的數(shù)據(jù)點(diǎn)連接、合并或聚合在一起。在整個(gè)批處理完成之前,其結(jié)果通常不可用。批處理越大,等待從中獲取有用信息的時(shí)間越長。如果需要更直接的結(jié)果,流處理是更好的解決方案。
          • 流處理:流處理器作用于無限制的數(shù)據(jù)流,而不是連續(xù)攝取的一批數(shù)據(jù)點(diǎn)(“流”)。與批處理過程不同,流中沒有明確定義的數(shù)據(jù)流起點(diǎn)或終點(diǎn),而且,它是連續(xù)的。

          批處理

          流梳理

          示例數(shù)據(jù)和方案

          我們將使用一些實(shí)際數(shù)據(jù)來數(shù)據(jù)規(guī)約系統(tǒng)(DRS)。根據(jù)維基百科,“數(shù)據(jù)規(guī)約是將數(shù)字或字母數(shù)字信息…轉(zhuǎn)換為校正,有序和簡化的形式。基本概念是將大量數(shù)據(jù)規(guī)約為有意義的形式。”

          數(shù)據(jù)源將是實(shí)際的航空公司歷史飛行數(shù)據(jù),我們的最終目標(biāo)是能夠在地圖上顯示航班歷史數(shù)據(jù)。

          我們將構(gòu)建的最終數(shù)據(jù)處理鏈路如下圖所示:

          可以使用SMACK替代上述方案:

          • Spark:引擎(替代Storm)
          • Mesos:容器
          • Akka:模型
          • Cassandra:存儲(替代ElasticSearch)
          • Kafka:消息隊(duì)列

          或者,你可以嘗試自己使用自己喜歡的編程語言來實(shí)現(xiàn)它。

          單線程調(diào)度程序使用以下方式以循環(huán)方式將工作分配給多個(gè)處理器(例如,可以是Raspberry Pi的陣列)。MQTT用于數(shù)據(jù)交換。每個(gè)處理器并行處理數(shù)據(jù)并產(chǎn)生結(jié)果,這些結(jié)果由收集器收集,收集器負(fù)責(zé)將其存儲到數(shù)據(jù)庫,NAS或?qū)崟r(shí)呈現(xiàn)。由于我們沒有與用于接收實(shí)時(shí)飛行數(shù)據(jù)的真實(shí)傳感器(例如雷達(dá))建立任何連接以演示實(shí)際流處理,因此我們只能選擇批處理(即下載歷史飛行數(shù)據(jù)并離線處理它們)。

          我們將首先將數(shù)據(jù)直接存儲到ElasticSearch并在Kibana或其他UI應(yīng)用程序中可視化它們。

          ElasticSearch

          ElasticSearch是一個(gè)面向文檔的分布式搜索引擎,用于處理以文檔形式存儲數(shù)據(jù)。

          ElasticSearch具有如下優(yōu)勢:

          • 跨多個(gè)節(jié)點(diǎn)可擴(kuò)展
          • 搜索結(jié)果速度非常快
          • 多語種
          • 面向文檔
          • 支持即時(shí)搜索
          • 支持模糊搜索
          • 開源,不收費(fèi)

          ElasticStack由許多產(chǎn)品組成:

          • ElasticSearch:我們將在本文中重點(diǎn)介紹的
          • Kibana:一個(gè)分析和可視化平臺,可讓你輕松地可視化Elasticsearch中的數(shù)據(jù)并進(jìn)行分析
          • LogStash:數(shù)據(jù)處理管道
          • Beats:數(shù)據(jù)傳輸集合
          • X-pack:可為Elasticsearch和Kibana添加其他功能,例如安全性(身份驗(yàn)證和授權(quán)),性能(監(jiān)控),報(bào)告和機(jī)器學(xué)習(xí)

          綜上所述,可以使用Beats和/或Logstash將數(shù)據(jù)導(dǎo)入Elasticsearch,也可以直接通過ElasticSearch的API。Kibana用于可視化ElasticSearch中的數(shù)據(jù)。

          接下來,我們將學(xué)習(xí)如何安裝,啟動(dòng)和停止ElasticSearch和Kibana。在下一篇文章中,我們將提供產(chǎn)品概述,并學(xué)習(xí)如何將批量航班數(shù)據(jù)導(dǎo)入ElasticSearch。

          1. 安裝ElasticSearch & Kibana

          訪問ElasticSearch網(wǎng)站,下載安裝包,解壓,然后進(jìn)行接下,你會(huì)發(fā)現(xiàn)它包含如下內(nèi)容:

          bin
          config
          data
          jdk
          lib
          logs
          modules
          plugins

          它的主要配置文件是config/elasticsearch.yml

          通過如下命令,可以運(yùn)行ElasticSearch,

          cd <elasticsearch-installation>
          bin/elasticsearch

          使用瀏覽器打開鏈接http:// localhost:9200/,如果看到類似以下的內(nèi)容,那么恭喜你,你已經(jīng)正常運(yùn)行ElasticSearch實(shí)例了。

          {
            "name" : "MacBook-Pro.local",
            "cluster_name" : "elasticsearch",
            "cluster_uuid" : "jyxqsR0HTOu__iUmi3m3eQ",
            "version" : {
              "number" : "7.9.0",
              "build_flavor" : "default",
              "build_type" : "tar",
              "build_hash" : "a479a2a7fce0389512d6a9361301708b92dff667",
              "build_date" : "2020-08-11T21:36:48.204330Z",
              "build_snapshot" : false,
              "lucene_version" : "8.6.0",
              "minimum_wire_compatibility_version" : "6.8.0",
              "minimum_index_compatibility_version" : "6.0.0-beta1"
            },
            "tagline" : "You Know, for Search"
          }

          ElasticSearch由一組節(jié)點(diǎn)組成(也就是存儲數(shù)據(jù)的ElasticSearch實(shí)例),每個(gè)節(jié)點(diǎn)存儲部分?jǐn)?shù)據(jù),同一臺計(jì)算機(jī)上運(yùn)行多個(gè)實(shí)例。

          http://localhost:9200/_cluster/health?pretty
          {
            "cluster_name" : "elasticsearch",
            "status" : "green",
            "timed_out" : false,
            "number_of_nodes" : 1,
            "number_of_data_nodes" : 1,
            "active_primary_shards" : 0,
            "active_shards" : 0,
            "relocating_shards" : 0,
            "initializing_shards" : 0,
            "unassigned_shards" : 0,
            "delayed_unassigned_shards" : 0,
            "number_of_pending_tasks" : 0,
            "number_of_in_flight_fetch" : 0,
            "task_max_waiting_in_queue_millis" : 0,
            "active_shards_percent_as_number" : 100.0
          }

          集群狀態(tài)為green,我們看到它僅包含1個(gè)節(jié)點(diǎn)。數(shù)據(jù)作為JSON對象(或文檔)存儲在ElasticSearch中,使用索引將文檔組織在群集內(nèi)。索引是具有相似特征并在邏輯上相關(guān)的文檔的集合,通過索引,在邏輯上將文檔分組在一起,并提供與可伸縮性和可用性相關(guān)的配置選項(xiàng)。

          數(shù)據(jù)分布在各個(gè)節(jié)點(diǎn)中,但是,實(shí)際上是如何實(shí)現(xiàn)的呢?

          ElasticSearch使用分片。

          分片是一種將索引分為不同部分的方法,其中每個(gè)部分稱為分片,分片可水平縮放數(shù)據(jù)。

          如果發(fā)生磁盤故障并且存儲分片的節(jié)點(diǎn)發(fā)生故障,該怎么辦?

          如果我們只有一個(gè)節(jié)點(diǎn),那么所有數(shù)據(jù)都會(huì)丟失。

          默認(rèn)情況下,ElasticSearch支持分片復(fù)制以實(shí)現(xiàn)容錯(cuò)功能。主碎片的副本碎片在存儲主碎片的節(jié)點(diǎn)以外的節(jié)點(diǎn)中創(chuàng)建。主分片和副本分片都稱為復(fù)制組。在我們只有一個(gè)節(jié)點(diǎn)的示例中,沒有復(fù)制發(fā)生。如果磁盤出現(xiàn)故障,我的所有數(shù)據(jù)都會(huì)丟失。我們添加的節(jié)點(diǎn)越多,通過在節(jié)點(diǎn)周圍散布碎片就可以提高可用性。

          ElasticSearch集群暴露REST API,使得開發(fā)者可以通過GET POST PUT DELETE命令進(jìn)行訪問。

          有多種方法可以向ElasticSearch發(fā)出命令。

          • 通過在瀏覽器中或使用curl命令
          • 通過Kibana的控制臺工具

          curl的訪問語法如下:

          curl -X<VERB> '<PROTOCOL>://<HOST>:<PORT>/<PATH>?<QUERY_STRING>' -d '<BODY>'

          參數(shù)解釋:

          • <VERB>:HTTP請求方法,GET POST PUT DELETE
          • <PROTOCOL>.:如果你在Elasticsearch前面有HTTPS代理,或者使用Elasticsearch安全功能來加密HTTP通信,請使用后者
          • <HOST>:Elasticsearch集群中任何節(jié)點(diǎn)的主機(jī)名
          • <PORT>:運(yùn)行Elasticsearch HTTP服務(wù)的端口,默認(rèn)為9200
          • <PATH>:API的endpoint
          • <BODY>:JSON編碼的請求正文

          例如:

          curl -X GET "localhost:9200/flight/_doc/1?pretty"

          將返回存儲在索引中的所有文檔,由于我們尚未在ElasticSearch中插入任何文檔,因此該查詢將返回錯(cuò)誤。

          前面介紹了ElasticSearch的安裝方法,下面介紹一下Kibana的安裝。

          訪問網(wǎng)站下載安裝包,解壓,通過下方命令運(yùn)行Kibana:

          cd <kibana-installation>
          bin/kibana

          在啟動(dòng)Kibana之前,請確保ElasticSearch已啟動(dòng)并正在運(yùn)行。

          Kibana的目錄結(jié)構(gòu)如下:

          bin
          built_assets
          config
          data
          node
          node_modules
          optimize
          package.json
          plugins
          src
          webpackShims
          x-pack

          首次運(yùn)行Kibana(http://localhost:5601)時(shí),會(huì)讓你提供樣本數(shù)據(jù)或自行探索。

          使用瀏覽器發(fā)送下方命令:

          GET /_cat/health?v

          會(huì)得到下方信息:

          epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
          1585684478 19:54:38 elasticsearch green 1 1 6 6 0 0 7 0 - 100.0%

          _catAPI提供有關(guān)屬于群集的節(jié)點(diǎn)的信息。

          有一個(gè)更方便的API GET /_cat/indices?pretty,它提供了有關(guān)節(jié)點(diǎn)的更多詳細(xì)信息。

          health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
          green open .apm-custom-link ticRJ0PoTk26n8Ab7-BQew 1 0 0 0 208b 208b
          green open .kibana_task_manager_1 SCJGLrjpTQmxAD7yRRykvw 1 0 6 99 34.4kb 34.4kb
          green open .kibana-event-log-7.9.0-000001 _RqV43r_RHaa-ztSvhV-pA 1 0 1 0 5.5kb 5.5kb
          green open .apm-agent-configuration 61x6ihufQfOiII0SaLHrrw 1 0 0 0 208b 208b
          green open .kibana_1 lxQoYjPiStuVyK0pQ5_kaA 1 0 22 1 10.4mb 10.4mb

          在這一部分,我主要介紹了一下搭建數(shù)據(jù)規(guī)約系統(tǒng)涉及到的一些基本概念,以及ElasticSearch、Kibana的安裝,確保,這兩款關(guān)鍵工具能夠正常運(yùn)行。

          在下一篇文章中,我們將看到如何將批量航班數(shù)據(jù)導(dǎo)入到ElasticSearch,并查看如何實(shí)際搜索它們。

          二、ElasticSearch操作

          在前面這一部分,我已經(jīng)解釋了ElasticSearch的基礎(chǔ)知識及其工作原理。

          在這一部分,我們將學(xué)習(xí)如何在ElasticSearch中執(zhí)行搜索。

          CRUD

          在開發(fā)過程中,主要都在圍繞著數(shù)據(jù)的CRUD進(jìn)行處理,具體來說就是:

          • C – Create
          • R – Retrieve or Read
          • U – Update
          • D – Delete

          下表將每個(gè)CRUD命令與其各自的ElasticSearch HTTP / REST命令進(jìn)行了一一對應(yīng),

          CRUD commandHTTP/REST command
          CreatePUT or POST
          ReadGET
          UpdatePUT or POST
          DeleteDELETE

          上一篇文章中,我們學(xué)習(xí)了Kibana,接下來,就切換到Kibana的控制臺。

          1. 創(chuàng)建索引

          通過如下命令,創(chuàng)建一個(gè)flight索引:

          PUT /flight
          GET /_cluster/health

          請注意,現(xiàn)在群集的運(yùn)行狀況已從綠色變?yōu)辄S色。發(fā)生這種情況是因?yàn)槲覀儍H運(yùn)行一個(gè)Elasticsearch實(shí)例。單節(jié)點(diǎn)群集具有完整的功能,但是無法將數(shù)據(jù)復(fù)制到其他節(jié)點(diǎn)以提供彈性。副本分片必須有其他可用節(jié)點(diǎn),群集狀態(tài)才能變?yōu)榫G色。如果群集狀態(tài)為紅色,則標(biāo)識某些數(shù)據(jù)不可用。

          為了解決這個(gè)問題,您需要安裝另一個(gè)同樣的Elasticsearch,并在elasticsearch.yml中更改node.name;兩個(gè)實(shí)例中的cluster.name必須保持相同(默認(rèn)為elasticsearch)。

          另一種方法是在命令行上將配置參數(shù)傳遞給Elasticsearch。

          bin/elasticsearch -Enode.name=node-2 -Epath.data=./node-2/data -Epath.logs=./node-2/logs
          GET /_cat/indices?v

          health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
          yellow open flight w696u4y3SYWuGW--8VzW6Q 1 1 0 0 208b 208b

          2. 創(chuàng)建文檔

          下面,向我們的索引添加一些示例數(shù)據(jù):

          PUT /flight/_doc/1 
          {
          "Icao":"A0835D",
          "Alt":2400,
          "Lat":39.984322,
          "Long":-82.925616
          }

          也可以使用curl命令:

          curl -X PUT "localhost:9200/flight/_doc/1?pretty" -H 'Content-Type: application/json' -d'
          {
          "Icao":"A0835D",
          "Alt":2400,
          "Lat":39.984322,
          "Long":-82.925616
          }'

          在這種情況下,ElasticSearch將為我們的文檔生成一個(gè)自動(dòng)ID。這是ElasticSearch返回的結(jié)果:

          Content-Type對于查詢成功至關(guān)重要, 我們創(chuàng)建了一個(gè)ID = 1的新排期。我們也可以使用POST代替PUT,但是在這種情況下,我們無法傳遞ID。

          在這種情況下,ElasticSearch將為我們的文檔生成一個(gè)自動(dòng)ID。

          下面是ElasticSearch返回的結(jié)果:

          {
          "took" : 2,
          "timed_out" : false,
          "_shards" : {
          "total" : 1,
          "successful" : 1,
          "skipped" : 0,
          "failed" : 0
          },
          "hits" : {
          "total" : {
          "value" : 1,
          "relation" : "eq"
          },
          "max_score" : 1.0,
          "hits" : [
          {
          "_index" : "flight",
          "_type" : "_doc",
          "_id" : "1",
          "_score" : 1.0,
          "_source" : {
          "Icao" : "A0835D",
          "Alt" : 2400,
          "Lat" : 39.984322,
          "Long" : -82.925616
          }
          }
          ]
          }
          }

          結(jié)果文檔存儲在鍵值_source內(nèi)。

          3. 刪除文檔

          如果你知道文檔索引,可以直接通過索引進(jìn)行刪除:

          DELETE /flight/_doc/1

          4. 刪除索引

          通過下方命令刪除索引:

          DELETE /flight

          5. 批量導(dǎo)入數(shù)據(jù)

          我們的方案是處理航班數(shù)據(jù),理想情況下,這些數(shù)據(jù)是從多個(gè)傳感器(雷達(dá))實(shí)時(shí)獲得的,但是由于這很難實(shí)現(xiàn)。

          因此,我們將使用可從此處下載的批量歷史飛行數(shù)據(jù)。

          在下載批處理文件的目錄中,發(fā)送以下命令(每個(gè).json文件):

          curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flights/_bulk --data-binary "@2016-07-01-1300Z.json"

          請注意,內(nèi)容類型是application/x-ndjson,而不是application/x-json

          另外,請注意,我們將數(shù)據(jù)表示為二進(jìn)制以便保留換行符。

          磁瓦ElasticSearch需要json文檔滿足特定格式:

          {"index":{"_id":4800770}}
          {"Rcvr":1,"HasSig":false,"Icao":"494102", "Bad":false,"Reg":"CS-PHB", ...}
          ...

          這意味著你必須將每個(gè)下載的.json文件轉(zhuǎn)換為上述格式。

          如果你不想花時(shí)間手動(dòng)修改.json文檔,則在下一篇文章中,我們將開發(fā)一個(gè)Java程序來解析它們,并使用ElasticSearch的REST API將文件插入ElasticSearch中。

          6. 搜索查詢

          ElasticSearch是一款搜索相關(guān)的工具,它允許你進(jìn)行符合條件的搜索查詢。

          GET /flight/_search?pretty
          { "query": {
          "match_all" : {
          }
          }
          }

          上面的搜索查詢匹配索引對應(yīng)的所有文檔。也可以這樣簡化:

          GET /flight/_search

          下面是根據(jù)給定字段Icao進(jìn)行查詢:

          GET /flight/_search?pretty 
          { "query": {
          "match" : {
          "Icao" : "A0835D"
          }
          }
          }

          也可以用嵌入U(xiǎn)RL進(jìn)行搜索:

          GET /flight/_search?q=Icao:A0835D

          也可以這樣寫:

          GET /flight/_search?pretty 
          { "query": {
          "query_string": {
          "query": "Icao:A0835D"
          }
          }
          }

          除了“match”和“query_string”以外,還可以使用“term”。使用“ term”表示精確匹配。

          GET /flight/_search?pretty 
          { "query": {
          "term": {
          "Mil": true
          }
          }
          }

          你也可以使用“term”來搜索值數(shù)組。

          除此之外,還可以使用通配符“wildcard”進(jìn)行搜索,包括*/?

          GET /flight/_search?pretty 
          { "query": {
          "wildcard": {
          "Call": "NJ*"
          }
          }
          }

          7. 更新文檔

          如果你知道索引的ID,可以通過_updateAPI進(jìn)行更新。

          POST /flight/_update/4800770
          {
          "doc": {
          "Mil": true
          }
          }

          使用上述命令,我們也可以將新字段添加到文檔中。

          附帶說明一下,ElasticSearch文檔是不可變的!

          因此,當(dāng)我們請求更新文檔時(shí),ElasticSearch會(huì)在后臺進(jìn)行操作,它檢索文檔,更改其字段并為具有相同ID的文檔重新索引,從而對它進(jìn)行替換。

          可以使用腳本發(fā)送更復(fù)雜的查詢,

          POST /flight/_update/4800770
          {
          "script": {
          "source": "ctx._source.FlightsCount++"
          }
          }

          ctx表示上下文。

          還有許多其他更新文檔的方法,例如,upserts,即根據(jù)文件是否已存在有條件地更新或插入文件。

          POST /flight/_update/4800771
          {
          "script": {
          "source": "ctx._source.FlightsCount++"
          },
          "upsert": {
          "Rcvr":1,
          "HasSig":false,
          "Icao":"AE4839",
          ...
          },
          }

          8. 刪除文檔

          使用_delete_by_query API可以刪除文檔:

          POST /flight/_delete_by_query
          {
          "query": {
          "match_all": {}
          }
          }

          9. 批量查詢

          批量API可幫助我們通過一個(gè)查詢對許多文檔執(zhí)行同樣的操作。

          該API包含4個(gè)動(dòng)作:索引,創(chuàng)建,更新,刪除:

          POST /_bulk
          { "index": { "_index" : "flight", "_id": 10519389 } }
          { "Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false, ... }
          { "create": { "_index" : "flight", "_id": 4800770 } }
          {"Rcvr":1,"HasSig":false,"Icao":"494102","Bad":false, ... }
          { "update": { "_index" : "flight", "_id": 4800770 } }
          { "doc": {"Mil": true } }
          { "delete": { "_index" : "flight", "_id": 4800770 } }

          索引和創(chuàng)建操作之間的區(qū)別如下:如果文檔已經(jīng)存在,則創(chuàng)建將引發(fā)錯(cuò)誤,而索引將替換文檔。

          如果批量查詢要針對相同的索引運(yùn)行,那么我們可以像這樣簡化查詢:

          POST /flight/_bulk
          { "index": { "_id": 10519389 } }
          { "Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false, ... }
          { "create": { "_id": 4800770 } }
          {"Rcvr":1,"HasSig":false,"Icao":"494102","Bad":false, ... }
          { "update": { "_id": 4800770 } }
          { "doc": {"Mil": true } }
          { "delete": { "_id": 4800770 } }

          10. 映射

          ElasticSearch是如何映射數(shù)據(jù)的呢?

          動(dòng)態(tài)映射意味著沒有明確定義映射,或者至少?zèng)]有為某些字段定義。

          ElasticSearch是通過檢查文檔字段的值類型來完成的。

          要查看數(shù)據(jù)映射,請?jiān)贙ibana中執(zhí)行以下內(nèi)容:

          GET /flight/_mapping

          我們也可以通過下方命令手動(dòng)添加映射關(guān)系,

          PUT /flight/_mapping 
          {
          "properties": {
          "location": {
          "type": "geo_point"
          }
          }
          }

          請注意,一旦創(chuàng)建了字段映射,就不能對其進(jìn)行修改。唯一的方法是刪除并重新創(chuàng)建索引。

          在下面的示例中,我們手動(dòng)創(chuàng)建了各種禁用動(dòng)態(tài)映射的映射。

          PUT /flight/_mapping
          {
          "dynamic": false,
          "properties": {
          "Rcvr": {
          "type": "integer"
          },
          "Icao": {
          "type": "text"
          },
          ...
          "location": {
          "type": "geo_point"
          }
          }
          }

          如果你更新了映射,請?jiān)诮脛?dòng)態(tài)映射的情況下發(fā)出以下查詢來更新ElasticSearch,

          POST /flight/_update_by_query?conflicts_proceed

          在這部分,我重點(diǎn)介紹了如何使用ElasticSearch的常用功能。

          在下一一部分中,我們將學(xué)習(xí)如何在將json文件轉(zhuǎn)換為ElasticSearch的批量API所需的格式之后,以及通過使用JSON庫解析json文件,并將批處理json文件導(dǎo)入到ElasticSearch中。

          三、數(shù)據(jù)導(dǎo)入

          在第二部分中,我們學(xué)習(xí)了如何在ElasticSearch中執(zhí)行搜索。但是,我們無法使用其批量API將.json數(shù)據(jù)文件導(dǎo)入ElasticSearch。

          在這部分中,我們將進(jìn)行一些編程,并學(xué)習(xí)一些有關(guān)如何將.json飛行數(shù)據(jù)文件導(dǎo)入ElasticSearch的方法:

          • 通過將.json數(shù)據(jù)文件轉(zhuǎn)換為ElasticSearch的API需要的格式
          • 通過解析.json數(shù)據(jù)文件,使用JSON庫(例如gson)提取其值,然后使用ElasticSearch的REST API導(dǎo)入數(shù)據(jù)

          數(shù)據(jù)轉(zhuǎn)換

          ElasticSearch對數(shù)據(jù)格式有特定的格式要求:

          {``"index"``:{``"_id"``:4800770}}
          {``"Rcvr"``:1,``"HasSig"``:``false``,``"Icao"``:``"494102"``, ``"Bad"``:``false``,``"Reg"``:``"CS-PHB"``, ...}
          ...

          這就意味著,你需要把下載的每一份json數(shù)據(jù)按照上述格式進(jìn)行轉(zhuǎn)換。主要滿足如下2點(diǎn):

          • 在每個(gè)數(shù)據(jù)文檔前面加入一行以index開頭的數(shù)據(jù)
          • "Id":<value>修改為{"_id":<value>}

          我們可以通過編寫簡單的Java程序,快速把json文件轉(zhuǎn)換成對應(yīng)格式:

          package com.jgc;
          import java.io.IOException;
          import java.nio.file.Files;
          import java.nio.file.Path;
          import java.nio.file.Paths;
          import java.util.ArrayList;
          import java.util.Arrays;
          import java.util.List;
          import static java.util.stream.Collectors.toList;
          /**
           * Converts a flight data json file to a format that can be imported to 
           * ElasticSearch using its bulk API.
           */

          public class JsonFlightFileConverter {
              private static final Path flightDataJsonFile = 
                  Paths.get("src/main/resources/flightdata/2016-07-01-1300Z.json");
              public static void main(String[] args) {
                  List<String> list = new ArrayList<>();
                  try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
                      list = stream
                              .map(line -> line.split("\\{"))
                              .flatMap(Arrays::stream)
                              .collect(toList());
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
                  System.out.println(list);
              }
          }

          最后,通過簡單的拼接,輸出我們想要的結(jié)果:

          final String result = list.stream().skip(3)
                          .map(s -> "{" + s + "\n")
                          .collect(Collectors.joining());
          System.out.println(result);

          現(xiàn)在,可以看到輸出已經(jīng)非常接近我們想要的結(jié)果:

          {"Id":4800770,"Rcvr":1,"HasSig":false,"Icao":"494102", ...

          實(shí)際上,我們可以將最后一個(gè)代碼片段添加到原始流中,如下所示:

          String result = "";
          try (Stream<String> stream = Files.lines(flightDataJsonFile.toAbsolutePath())) {
               result = stream
                      .map(line -> line.split("\\{"))
                      .flatMap(Arrays::stream)
                      .skip(3)
                      .map(s -> "{" + s + "\n")
                      .collect(Collectors.joining());
          catch (IOException e) {
              e.printStackTrace();
          }

          現(xiàn)在,我們需要在每行的上方插入新行,其中包含文檔的索引,如下所示:

          {"index":{"_id":4800770}}

          我們可以創(chuàng)建一個(gè)函數(shù),這樣處理會(huì)更加簡潔明了:

          private static String insertIndex(String s) {
              final String[] keyValues = s.split(",");
              final String[] idKeyValue = keyValues[0].split(":");
              return "{\"index\":{\"_id\":"+ idKeyValue[1] +"}}\n";
          }

          這樣,就可以對每個(gè)輸入進(jìn)行轉(zhuǎn)換,給出我們需要的輸出。

          我們還需要解決的更多細(xì)節(jié),從每個(gè)文檔中刪除最后一個(gè)逗號。

          private static String removeLastComma(String s) {
              return s.charAt(s.length() - 1) == ',' ? s.substring(0, s.length() - 1) : s;
          }

          這時(shí)候,數(shù)據(jù)處理代碼就變成了下面這個(gè)樣子:

          public class JsonFlightFileConverter {
           
           public static void main(String[] args) {
            if (args.length == 1) {
              Path inDirectoryPath = Paths.get(args[0]);
              if (inDirectoryPath != null) {
                  Path outDirectoryPath = Paths.get(inDirectoryPath.toString(), "out");
                  try {
                      if (Files.exists(outDirectoryPath)) {
                          Files.walk(outDirectoryPath)
                                  .sorted(Comparator.reverseOrder())
                                  .map(Path::toFile)
                                  .forEach(File::delete);
                      }
                      Files.createDirectory(Paths.get(inDirectoryPath.toString(), "out"));
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
                  try (DirectoryStream ds = Files.newDirectoryStream(inDirectoryPath, "*.json")) {
                      for (Path inFlightDataJsonFile : ds) {
                          String result = "";
                          try (Stream stream = 
                               Files.lines(inFlightDataJsonFile.toAbsolutePath())) {
                      result = stream
                                .parallel()
                                .map(line -> line.split("\\{"))
                                .flatMap(Arrays::stream)
                                .skip(3)
                                .map(s -> createResult(s))
                                .collect(Collectors.joining());
                          Path outFlightDataJsonFile = 
                               Paths.get(outDirectoryPath.toString(), 
                                         inFlightDataJsonFile.getFileName().toString());
                          Files.createFile(outFlightDataJsonFile);
                          Files.writeString(outFlightDataJsonFile, result);
                      }
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
           } else {
              System.out.println("Usage: java JsonFlightFileConverter ");
           }
          ...

          使用ElasticSearch的批量API導(dǎo)入數(shù)據(jù)

          需要再次強(qiáng)調(diào),文件必須以空行結(jié)尾。如果不是,則添加一個(gè)(實(shí)際上前面的程序已經(jīng)在文件末尾添加了換行符)。

          在產(chǎn)生新的.json文件的目錄(輸出目錄)內(nèi),執(zhí)行以下命令:

          curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flight/_bulk --data-binary "@2016-07-01-1300Z.json"

          請注意,內(nèi)容類型是application / x-ndjson,而不是application / x-json

          還要注意,我們將數(shù)據(jù)表示為二進(jìn)制以便保留換行符。文件名為2016-07-01-1300Z.json

          ElasticSearch中任何具有相同ID的現(xiàn)有文檔都將被.json文件中的文檔替換。

          最后,可以發(fā)現(xiàn)有7679文件被導(dǎo)入:

          "hits" : {
              "total" : {
                "value" : 7679,
                "relation" : "eq"
              },
          GET /_cat/shards?v

          返回結(jié)果:

          index   shard prirep state      docs   store ip        node
          flight 0 p STARTED 7679 71mb 127.0.0.1 MacBook-Pro.local
          flight 0 r UNASSIGNED

          解析JSON數(shù)據(jù)

          將這些文檔導(dǎo)入ElasticSearch的另一種方法是將JSON數(shù)據(jù)文件解析到內(nèi)存中,并使用ElasticSearch的REST API將其導(dǎo)入ElasticSearch。

          有許多庫可用于解析Java中的JSON文件:

          • GSon
          • Jackson
          • mJson
          • JSON-Simple
          • JSON-P

          我們將使用Google的GSon庫,但其他任何JSON庫都可以完成此工作。

          GSon提供了多種表示JSON數(shù)據(jù)的方法,具體使用哪一種,則取決于下一步,即如何將數(shù)據(jù)導(dǎo)入到ElasticSearch。

          ElasticSearch API要求數(shù)據(jù)的格式為:Map<String, Object>,這是我們將解析后的JSON數(shù)據(jù)存儲到的位置。

          首先,將下面依賴加入到pom.xml中:

          <dependency>
              <groupId>com.google.code.gson</groupId>
              <artifactId>gson</artifactId>
              <version>2.8.6</version>
          </dependency>

          使用下方代碼解析json數(shù)據(jù):

          package com.jcg;
           
          import com.google.gson.Gson;
          import com.google.gson.internal.LinkedTreeMap;
          import com.google.gson.reflect.TypeToken;
          import java.io.BufferedReader;
          import java.io.IOException;
          import java.nio.file.Files;
          import java.nio.file.Paths;
          import java.util.List;
          import java.util.Map;
           
          public class JsonFlightFileReader {
              private static final String flightDataJsonFile = "src/main/resources/flightdata/2016-07-01-1300Z.json";
              private static final Gson gson = new Gson();
              public static void main(String[] args) {
                  parseJsonFile(flightDataJsonFile);
              }
              private static void parseJsonFile(String file) {
                  try (BufferedReader reader = Files.newBufferedReader(Paths.get(file))) {
                      Map<String, Object> map = gson.fromJson(reader, 
                                 new TypeToken<Map<String, Object>>() { }.getType());
                      List<Object> acList = (List<Object>) (map.get("acList"));
                      for (Object item : acList) {
                          LinkedTreeMap<String, Object> flight = 
                                  (LinkedTreeMap<String, Object>) item;
                          for (Map.Entry<String, Object> entry : flight.entrySet()) {
                              String key = entry.getKey();
                              Object value = entry.getValue();
                              String outEntry = (key.equals("Id") ? "{" + key : key) + " : " + value + ", ";
                              System.out.print(outEntry);
                          }
                          System.out.println("}");
                      }
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
          }

          通過下述方法可以使用數(shù)據(jù):

          Map<String, Object> map = gson.fromJson(reader, new TypeToken<Map<String, Object>>() {}.getType());
          List<Object> acList = (List<Object>) (map.get("acList"));

          使用ElasticSearch REST API導(dǎo)入數(shù)據(jù)

          首先,在pom.xml中加入下方依賴:

          <dependency>
              <groupId>org.elasticsearch.client</groupId>
              <artifactId>elasticsearch-rest-client</artifactId>
              <version>7.10.0</version>
          </dependency>

          我們可以通過RestClient與ElasticSearch進(jìn)行交互:

          RestClient restClient = RestClient.builder(
              new HttpHost("localhost"9200"http"));
          .setDefaultHeaders(new Header[]{
                  new BasicHeader("accept""application/json"),
                  new BasicHeader("content-type""application/json")})
          .setFailureListener(new RestClient.FailureListener() {
              public void onFailure(Node node) {
                  System.err.println("Low level Rest Client Failure on node " +
                          node.getName());
              }
          }).build();

          創(chuàng)建好RestClient之后,下一步就是創(chuàng)建一個(gè)Request,并將json數(shù)據(jù)傳遞給它:

          Request request = new Request("POST""/flight/_doc/4800770");
          String jsonDoc = "{\"Rcvr\":1,\"HasSig\":false,\"Icao\":\"494102\",...]}";
          request.setJsonEntity(jsonDoc);

          最后,我們發(fā)送請求。

          有兩種方式,同步

          Response response = restClient.performRequest(request);
          if (response.getStatusLine().getStatusCode() != 200) {
              System.err.println("Could not add document with Id: " + id + " to index /flight");
          }

          異步

          Cancellable cancellable = restClient.performRequestAsync(request,
              new ResponseListener() {
                  @Override
                  public void onSuccess(Response response) {
                      System.out.println("Document with Id: " + id + " was successfully added to index /flight");
                  }
           
                  @Override
                  public void onFailure(Exception exception) {
                      System.err.println("Could not add document with Id: " + id + " to index /flight");
                  }
          });

          最后,不要忘記關(guān)閉restClient連接:

          finally {
              try {
                  restClient.close();
              } catch (IOException e) {
                  e.printStackTrace();
              }
          }

          這部分,我們重點(diǎn)介紹了如何將.json數(shù)據(jù)批處理文件導(dǎo)入到ElasticSearch。

          我們看到了如何通過兩種方式做到這一點(diǎn):

          • 使用ElasticSearch的批量API,
          • 使用JSON庫解析.json文件

          你可以根據(jù)自己的情況自行選擇其中一種方法。

          四、Logstash

          在本系列文章的第3部分關(guān)于實(shí)時(shí)流處理的文章中,我們學(xué)習(xí)了如何使用ElasticSearch的批量API以及利用REST API將.json航班數(shù)據(jù)文件導(dǎo)入ElasticSearch。

          在這篇文章中,我們將介紹另一種方式,Logstash。

          Logstash介紹

          Logstash是一個(gè)開源的數(shù)據(jù)收集引擎,具有實(shí)時(shí)流水線功能。

          它從多個(gè)源頭接收數(shù)據(jù),進(jìn)行數(shù)據(jù)處理,然后將轉(zhuǎn)化后的信息發(fā)送到stash,即存儲。

          Logstash允許我們將任何格式的數(shù)據(jù)導(dǎo)入到任何數(shù)據(jù)存儲中,不僅僅是ElasticSearch。

          它可以用來將數(shù)據(jù)并行導(dǎo)入到其他NoSQL數(shù)據(jù)庫,如MongoDB或Hadoop,甚至導(dǎo)入到AWS。

          數(shù)據(jù)可以存儲在文件中,也可以通過流等方式進(jìn)行傳遞。

          Logstash對數(shù)據(jù)進(jìn)行解析、轉(zhuǎn)換和過濾。它還可以從非結(jié)構(gòu)化數(shù)據(jù)中推導(dǎo)出結(jié)構(gòu),對個(gè)人數(shù)據(jù)進(jìn)行匿名處理,可以進(jìn)行地理位置查詢等等。

          一個(gè)Logstash管道有兩個(gè)必要的元素,輸入輸出,以及一個(gè)可選的元素,過濾器

          輸入組件從源頭消耗數(shù)據(jù),過濾組件轉(zhuǎn)換數(shù)據(jù),輸出組件將數(shù)據(jù)寫入一個(gè)或多個(gè)目的地。

          所以,我們的示例場景的Logstash架構(gòu)基本如下。

          我們從.json文件中讀取我們的航班數(shù)據(jù),我們對它們進(jìn)行處理/轉(zhuǎn)換,應(yīng)用一些過濾器并將它們存儲到ElasticSearch中。

          Logstash安裝

          有幾種選擇來安裝Logstash。

          一種是訪問網(wǎng)站下載你平臺的存檔,然后解壓到一個(gè)文件夾。

          你也可以使用你的平臺的包管理器來安裝,比如yum、apt-get或homebrew,或者作為docker鏡像來安裝。

          確保你已經(jīng)定義了一個(gè)環(huán)境變量JAVA_HOME,指向JDK 8或11或14的安裝(Logstash自帶嵌入式AdoptJDK)。

          Logstash工作流

          一旦你安裝了它,讓我們通過運(yùn)行最基本的Logstash工作流來測試你的Logstash安裝情況。

          bin/logstash -e 'input { stdin { } } output { stdout {} }'

          上面的工作流接受來自stdin(即你的鍵盤)的輸入,并將其輸出到stdout(即你的屏幕)。

          上面的工作流中沒有定義任何過濾器。一旦你看到logstash被成功啟動(dòng)的消息,輸入一些東西(我輸入的是Hello world),按ENTER鍵,你應(yīng)該看到產(chǎn)生的消息的結(jié)構(gòu)格式,像下面這樣。

          [2021-02-11T21:52:57,120][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
          Hello world
          {
          "message" => "Hello world",
          "@version" => "1",
          "@timestamp" => 2021-02-11T19:57:46.208Z,
          "host" => "MacBook-Pro.local"
          }

          然而,通常Logstash是通過配置文件來工作的,配置文件告訴它該做什么,即在哪里找到它的輸入,如何轉(zhuǎn)換它,在哪里存儲它。Logstash配置文件的結(jié)構(gòu)基本上包括三個(gè)部分:輸入、過濾和輸出。

          你在輸入部分指定數(shù)據(jù)的來源,在輸出部分指定目的地。在過濾器部分,你可以使用支持的過濾器插件來操作、測量和創(chuàng)建事件。

          配置文件的結(jié)構(gòu)如下面的代碼示例所示。

          input {...}
          filter {...}
          output{...}

          你需要?jiǎng)?chuàng)建一個(gè)配置文件,指定你要使用的組件和每個(gè)組件的設(shè)置。在config文件夾中已經(jīng)存在一個(gè)配置文件樣本,logstash-sample.conf。

          其內(nèi)容如下所示。

          # Sample Logstash configuration for creating a simple
          # Beats -> Logstash -> Elasticsearch pipeline.

          input {
          beats {
          port => 5044
          }
          }

          output {
          elasticsearch {
          hosts => ["http://localhost:9200"]
          index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
          #user => "elastic"
          #password => "changeme"
          }
          }

          這里input部分定義了Logstash應(yīng)該從哪里獲取數(shù)據(jù)。這里有一個(gè)可用的輸入插件列表。

          我們的輸入不是來自Beats組件,而是來自文件系統(tǒng),所以我們使用文件輸入組件。

          input {
          file {
          start_position => "beginning"
          path => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json"
          codec => "json"
          }
          }

          我們使用start_position參數(shù)來告訴插件從頭開始讀取文件。

          需要注意,數(shù)據(jù)路徑必須是絕對的。

          我們使用的是json編解碼器,除了json,還可以使用純文本形式。

          在下載的數(shù)據(jù)中,可以找到一個(gè)名為test.json的文件。它只由2條航班數(shù)據(jù)組成的文件。

          輸出塊定義了Logstash應(yīng)該在哪里存儲數(shù)據(jù)。我們將使用ElasticSearch來存儲我們的數(shù)據(jù)。

          我們添加了第二個(gè)輸出作為我們的控制臺,并使用rubydebugger格式化輸出,第三個(gè)輸出作為文件系統(tǒng),最后兩個(gè)用于測試我們的輸出。我們將輸出存儲在output.json中。

          output {
          elasticsearch {
          hosts => ["http://localhost:9200"]
          index => "testflight"
          }

          file {
          path => "/usr/local/Cellar/logstash-full/7.11.0/data/output.json"
          }

          stdout {
          codec => rubydebug
          }
          }

          此外,還可以定義過濾器來對數(shù)據(jù)進(jìn)行轉(zhuǎn)換。

          Logstash提供了大量的過濾器,下面介紹一些非常常用的的過濾器:

          • grok:解析任何任意文本并添加結(jié)構(gòu),它包含120種內(nèi)置模式
          • mutate:對字段進(jìn)行一般的轉(zhuǎn)換,例如重命名、刪除、替換和修改字段
          • drop:丟棄一個(gè)數(shù)據(jù)
          • clone:復(fù)制一個(gè)數(shù)據(jù),可能增加或刪除字段
          • geoip:添加IP地址的地理位置信息
          • split:將多行消息、字符串或數(shù)組分割成不同的數(shù)據(jù)

          可以通過執(zhí)行下方命令查看 Logstash 安裝中安裝的全部插件列表。

          $ bin/logstash-plugin list

          你會(huì)注意到,有一個(gè)JSON過濾器插件。這個(gè)插件可以解析.json文件并創(chuàng)建相應(yīng)的JSON數(shù)據(jù)結(jié)構(gòu)。

          正確地選擇和配置過濾器是非常重要的,否則,你最終的輸出中沒有數(shù)據(jù)。

          所以,在我們的過濾塊中,我們啟用json插件,并告訴它我們的數(shù)據(jù)在消息字段中。

          filter {
          json {
          source => "message"
          }
          }

          到此為止,完成的配置文件config/testflight.conf內(nèi)容如下:

          input {
          file {
          start_position => "beginning"
          path => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json"
          codec => "json"
          }
          }

          filter {
          json {
          source => "message"
          }
          }

          output {
          # elasticsearch {
          # hosts => ["http://localhost:9200/"]
          # index => "testflight"
          # }
          file {
          path => "/usr/local/Cellar/logstash-full/7.11.0/data/output.json"
          }
          stdout {
          codec => rubydebug
          }
          }

          你可以通過如下命令進(jìn)行一下測試:

          bin/logstash -f config/testflight.conf --config.test_and_exit
          ...
          Configuration OK
          [2021-02-11T23:15:38,997][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

          如果配置文件通過了配置測試,用以下命令啟動(dòng)Logstash。

          bin/logstash -f config/testflight.conf --config.reload.automatic
          ...

          --config.reload.automatic配置選項(xiàng)可以實(shí)現(xiàn)自動(dòng)重載配置,這樣你就不必每次修改配置文件時(shí)都要停止并重新啟動(dòng)Logstash。

          如果一切順利,你應(yīng)該會(huì)看到如下的輸出結(jié)果。

          {
          "CMsgs" => 1,
          "@version" => "1",
          "PosTime" => 1467378028852,
          "Rcvr" => 1,
          "EngMount" => 0,
          "Tisb" => false,
          "Mil" => false,
          "Trt" => 2,
          "Icao" => "A0835D",
          "Long" => -82.925616,
          "InHg" => 29.9409447,
          "VsiT" => 1,
          "ResetTrail" => true,
          "CallSus" => false,
          "@timestamp" => 2021-02-14T18:32:16.337Z,
          "host" => "MacBook-Pro.local",
          "OpIcao" => "RPA",
          "Man" => "Embraer",
          "GAlt" => 2421,
          "TT" => "a",
          "Bad" => false,
          "HasSig" => true,
          "TSecs" => 1,
          "Vsi" => 2176,
          "EngType" => 3,
          "Reg" => "N132HQ",
          "Alt" => 2400,
          "Species" => 1,
          "FlightsCount" => 0,
          "WTC" => 2,
          "Cos" => [
          [0] 39.984322,
          [1] -82.925616,
          [2] 1467378028852.0,
          [3] nil
          ],"message" => "{\"Id\":10519389,\"Rcvr\":1,\"HasSig\":true,\"Sig\":0,\"Icao\":\"A0835D\",\"Bad\":false,\"Reg\":\"N132HQ\",\"FSeen\":\"\\/Date(1467378028852)\\/\",\"TSecs\":1,\"CMsgs\":1,\"Alt\":2400,\"GAlt\":2421,\"InHg\":29.9409447,\"AltT\":0,\"Lat\":39.984322,\"Long\":-82.925616,\"PosTime\":1467378028852,\"Mlat\":true,\"Tisb\":false,\"Spd\":135.8,\"Trak\":223.2,\"TrkH\":false,\"Type\":\"E170\",\"Mdl\":\"2008 EMBRAER-EMPRESA BRASILEIRA DE ERJ 170-200 LR\",\"Man\":\"Embraer\",\"CNum\":\"17000216\",\"Op\":\"REPUBLIC AIRLINE INC - INDIANAPOLIS, IN\",\"OpIcao\":\"RPA\",\"Sqk\":\"\",\"Vsi\":2176,\"VsiT\":1,\"WTC\":2,\"Species\":1,\"Engines\":\"2\",\"EngType\":3,\"EngMount\":0,\"Mil\":false,\"Cou\":\"United States\",\"HasPic\":false,\"Interested\":false,\"FlightsCount\":0,\"Gnd\":false,\"SpdTyp\":0,\"CallSus\":false,\"ResetTrail\":true,\"TT\":\"a\",\"Trt\":2,\"Year\":\"2008\",\"Cos\":[39.984322,-82.925616,1467378028852.0,null]}",
          "Lat" => 39.984322,
          "TrkH" => false,
          "Op" => "REPUBLIC AIRLINE INC - INDIANAPOLIS, IN",
          "Engines" => "2",
          "Sqk" => "",
          "Id" => 10519389,
          "Gnd" => false,
          "CNum" => "17000216",
          "path" => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json",
          "Cou" => "United States",
          "HasPic" => false,
          "FSeen" => "/Date(1467378028852)/",
          "Interested" => false,
          "Mdl" => "2008 EMBRAER-EMPRESA BRASILEIRA DE ERJ 170-200 LR",
          "Spd" => 135.8,
          "Sig" => 0,
          "Trak" => 223.2,
          "Year" => "2008",
          "SpdTyp" => 0,
          "AltT" => 0,
          "Type" => "E170",
          "Mlat" => true
          }

          數(shù)據(jù)轉(zhuǎn)換

          首先,讓我們從輸出中刪除path, @version, @timestamp, host和message,這些都是logstash添加的。

          filter {
          json {
          source => "message"
          }
          mutate {
          remove_field => ["path", "@version", "@timestamp", "host", "message"]
          }
          }

          mutate過濾器組件可以刪除不需要的字段。

          重新運(yùn)行:

          bin/logstash -f config/flightdata-logstash.conf –-config.test_and_exit
          bin/logstash -f config/flightdata-logstash.conf --config.reload.automatic

          接下來,我們將_id設(shè)置為Id。

          output {
          elasticsearch {
          hosts => ["http://localhost:9200"]
          index => "testflight"
          document_id => "%{Id}"
          }

          我們在輸出組件中通過設(shè)置document_id來實(shí)現(xiàn)。

          然而,如果你重新運(yùn)行l(wèi)ogstash,你會(huì)發(fā)現(xiàn)Id字段仍然存在。

          有一個(gè)竅門,在過濾插件中把它改名為[@metadata][Id],然后在輸出中使用,@metadata字段被自動(dòng)刪除。

          filter {
          json {
          source => "message"
          }
          mutate {
          remove_field => ["path", "@version", "@timestamp", "host", "message"]
          rename => { "[Id]" => "[@metadata][Id]" }
          }
          }

          output {
          elasticsearch {
          hosts => ["http://localhost:9200"]
          index => "flight-logstash"
          document_id => "%{[@metadata][Id]}"
          }
          ...

          現(xiàn)在讓我們嘗試解析日期。如果你還記得,這是我們在上一篇文章中沒有做的事情,我們需要將日期轉(zhuǎn)換為更適合人們熟悉的格式。

          例如:

          "FSeen" => "\/Date(1467378028852)\/"

          需要將時(shí)間1467378028852轉(zhuǎn)化成容易閱讀的格式,并且去掉前后多余的字符串,通過gsub組件可以實(shí)現(xiàn)這項(xiàng)功能:

          gsub => [
          # get rid of /Date(
          "FSeen", "\/Date\(", "",
          # get rid of )/
          "FSeen", "\)\/", ""
          ]

          這里通過gsub去掉了數(shù)據(jù)中/Date()\等多余部分,輸出結(jié)果為:

          "FSeen" : "1467378028852"

          然后把時(shí)間戳轉(zhuǎn)換成熟悉的格式:

          date {
          timezone => "UTC"
          match => ["FSeen", "UNIX_MS"]
          target => "FSeen"
          }

          UNIX_MS是UNIX時(shí)間戳,單位是毫秒。我們匹配字段FSeen并將結(jié)果存儲在同一字段中,輸出結(jié)果為:

          "FSeen" : "2016-07-01T13:00:28.852Z",

          上述轉(zhuǎn)換的完整代碼如下:

          mutate {
          gsub => [
          # get rid of /Date(
          "FSeen", "\/Date\(", "",
          # get rid of )/
          "FSeen", "\)\/", ""
          ]
          }
          date {
          timezone => "UTC"
          match => ["FSeen", "UNIX_MS"]
          target => "FSeen"
          }

          在這部分中,我們學(xué)習(xí)了如何使用Logstash將.json航班數(shù)據(jù)批量文件導(dǎo)入到ElasticSearch中。Logstash是一個(gè)非常方便的方式,它有很多過濾器,支持很多數(shù)據(jù)類型,你只需要學(xué)習(xí)如何編寫一個(gè)配置文件就可以了!

          Logstash是否適合實(shí)時(shí)數(shù)據(jù)處理?

          答案是:要看情況

          Logstash主要是為批處理數(shù)據(jù)而設(shè)計(jì)的,比如日志數(shù)據(jù),也許不適合處理來自傳感器的實(shí)時(shí)航班數(shù)據(jù)。

          不過,你可以參考一些參考資料,這些資料描述了如何創(chuàng)建可以擴(kuò)展的Logstash部署,并使用Redis作為Logstash代理和Logstash中央服務(wù)器之間的中介,以便處理許多事件并實(shí)時(shí)處理它們。


          作者:Jackpop 

          鏈接:https://www.zhihu.com/question/469207536/answer/2550197723

           往期推薦 

          ??

          講一篇通俗易懂的 C 函數(shù)。

          6 分鐘看完 BGP 協(xié)議。

          《On Java》值得讀嗎?

          萬字長文爆肝路由協(xié)議!

          DNS,給你安排明白了!

          Java 中的語法糖,真甜。

          10 分鐘聊聊計(jì)算機(jī)

          計(jì)算機(jī)的本質(zhì)是哲學(xué)。



          瀏覽 49
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产婷婷在线视频 | 欧美一级A片免费看视频小说 | 成年人黄色视频在线观看 | 日韩中文足交系列 | 北条麻妃人妻上门在线播放 |