實戰(zhàn) | 使用Spring Boot + Elasticsearch + Logstash 實現(xiàn)圖書查詢檢索服務(wù)
前面我們介紹了Spring Boot 整合 Elasticsearch 實現(xiàn)數(shù)據(jù)查詢檢索的功能,在實際項目中,我們的數(shù)據(jù)一般存儲在數(shù)據(jù)庫中,而且隨著業(yè)務(wù)的發(fā)送,數(shù)據(jù)也會隨時變化。
那么如何保證數(shù)據(jù)庫中的數(shù)據(jù)與Elasticsearch存儲的索引數(shù)據(jù)保持一致呢?最原始的方案就是:當(dāng)數(shù)據(jù)發(fā)生增刪改操作時同步更新Elasticsearch。但是這樣的設(shè)計耦合太高。接下來我們介紹一種非常簡單的數(shù)據(jù)同步方式:Logstash 數(shù)據(jù)同步。
一、Logstash簡介
1.什么是Logstash
logstash是一個開源的服務(wù)器端數(shù)據(jù)處理工具。簡單來說,就是一根具備實時數(shù)據(jù)傳輸能力的管道,負(fù)責(zé)將數(shù)據(jù)信息從管道的輸入端傳輸?shù)焦艿赖妮敵龆耍慌c此同時這根管道還可以讓你根據(jù)自己的需求在中間加上濾網(wǎng),Logstash提供了很多功能強大的濾網(wǎng)以滿足你的各種應(yīng)用場景。
Logstash常用于日志系統(tǒng)中做日志采集設(shè)備,最常用于ELK中作為日志收集器使用。
2.Logstash的架構(gòu)原理
Logstash的基本流程架構(gòu):input=》 ?filter =》 output 。
input(輸入):采集各種樣式,大小和來源數(shù)據(jù),從各個服務(wù)器中收集數(shù)據(jù)。常用的有:jdbc、file、syslog、redis等。
filter(過濾器)負(fù)責(zé)數(shù)據(jù)處理與轉(zhuǎn)換。主要是將event通過output發(fā)出之前對其實現(xiàn)某些處理功能。
output(輸出):將我們過濾出的數(shù)據(jù)保存到那些數(shù)據(jù)庫和相關(guān)存儲中,。

3.Logstash如何與Elasticsearch數(shù)據(jù)同步
實際項目中,我們不可能通過手動添加的方式將數(shù)據(jù)插入索引庫,所以需要借助第三方工具,將數(shù)據(jù)庫的數(shù)據(jù)同步到索引庫。此時,Logstash出現(xiàn)了,它可以將不同數(shù)據(jù)庫的數(shù)據(jù)同步到Elasticsearch中。保證數(shù)據(jù)庫與Elasticsearch的數(shù)據(jù)保持一致。

目前支持?jǐn)?shù)據(jù)庫與ES數(shù)據(jù)同步的插件有很多,個人認(rèn)為Logstash是眾多同步mysql數(shù)據(jù)到es的插件中,最穩(wěn)定并且最容易配置的一個。
二、安裝Logstash
Logstash的使用方法也很簡單,下面講解一下,Logstash是如何使用的。需要說明的是:這里以windows 環(huán)境為例,演示Logstash的安裝和配置。
1.下載Logstash
首先,下載對應(yīng)版本的Logstash包,可以通過上面提供下載elasticsearch的地址進(jìn)行下載,完成后解壓。

上面是Logstash解壓后的目錄,我們需要關(guān)注是bin目錄中的執(zhí)行文件和config中的配置文件。一般生產(chǎn)情況下,會使用Linux服務(wù)器,并且會將Logstash配置成自啟動的服務(wù)。這里測試的話,直接啟動。
2.配置Logstash
接下來,配置Logstash。需要我們編寫配置文件,根據(jù)官網(wǎng)和網(wǎng)上提供的配置文件,將其進(jìn)行修改。
第一步:在Logstash根目錄下創(chuàng)建mysql文件夾,添加mysql.conf配置文件,配置Logstash需要的相應(yīng)信息,具體配置如下:
input {stdin {}jdbc {# mysql數(shù)據(jù)庫連接jdbc_connection_string => "jdbc:mysql://localhost:3306/book_test?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC"# mysqly用戶名和密碼jdbc_user => "root"jdbc_password => "root"# 驅(qū)動配置jdbc_driver_library => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql-connector-java-8.0.20.jar"# 驅(qū)動類名jdbc_driver_class => "com.mysql.cj.jdbc.Driver"#jdbc_paging_enabled => "true"#jdbc_page_size => "50000"jdbc_default_timezone => "Asia/Shanghai"# 執(zhí)行指定的sql文件statement_filepath => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\bookquery.sql"use_column_value => true# 是否將字段名轉(zhuǎn)換為小寫,默認(rèn)true(如果有數(shù)據(jù)序列化、反序列化需求,建議改為false);lowercase_column_names => false# 需要記錄的字段,用于增量同步,需是數(shù)據(jù)庫字段tracking_column => updatetime# Value can be any of: numeric,timestamp,Default value is "numeric"tracking_column_type => timestamp# record_last_run上次數(shù)據(jù)存放位置;record_last_run => true#上一個sql_last_value值的存放文件路徑, 必須要在文件中指定字段的初始值last_run_metadata_path => "C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\sql\logstash_default_last_time.log"# 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false;clean_run => false# 設(shè)置監(jiān)聽 各字段含義 分 時 天 月 年 ,默認(rèn)全部為*代表含義:每分鐘都更新schedule => "* * * * *"# 索引類型type => "id"}}output {elasticsearch {#es服務(wù)器hosts => ["10.2.1.231:9200"]#ES索引名稱index => "book"#自增IDdocument_id => "%{id}"}stdout {codec => json_lines}}
第二步:將mysql-connector-java.jar 拷貝到前面配置的目錄下。上面的mysql.conf配置的是:C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql-connector-java-8.0.20.jar。那么jar包拷貝到此目錄下即可:

上面是mysql的驅(qū)動,如果是sqlserver數(shù)據(jù)庫,下載SqlServer對應(yīng)的驅(qū)動即可。放置的位置要與mysql.conf 配置文件中的jdbc_driver_library 地址保持一致。
第三步:創(chuàng)建sql目錄,創(chuàng)建bookquery.sql文件用于保存需要執(zhí)行的sql 腳本。示例代碼如下:
select * from book where updatetime >= :sql_last_valueorder by updatetime desc
這里使用的增量更新,所以使用:sql_last_value 記錄上一次記錄的最后時間。3.啟動Logstash
進(jìn)入logstash的bin目錄,執(zhí)行如下命令:
logstash.bat -f C:\Users\Administrator\Desktop\logstash-7.5.1\mysql\mysql.conf動成功之后,Logstash就會自動定時將數(shù)據(jù)寫入到Elasticsearch。如下圖所示:

同步完成后,我們使用Postman查詢Elasticsearch,驗證索引是否都創(chuàng)建成功。在postman中,發(fā)送 Get 請求:http://10.2.1.231:9200/book/_search 。返回結(jié)果如下圖所示:

可以看到,數(shù)據(jù)庫中的數(shù)據(jù)已經(jīng)通過Logstash同步至Elasticsearch。說明Logstash配置成功。
三、創(chuàng)建查詢服務(wù)
數(shù)據(jù)同步完成后,接下來我們使用Spring Boot 構(gòu)建Elasticsearch查詢服務(wù)。首先創(chuàng)建Spring Boot項目并整合Elasticsearch,這個之前都已經(jīng)介紹過,不清楚的朋友可以看我之前的文章。
接下來演示如何封裝完整的數(shù)據(jù)查詢服務(wù)。
1.數(shù)據(jù)實體
( indexName = "book" , replicas = 0)public class Book {private Long id;(analyzer = "ik_max_word",type = FieldType.Text)private String bookName;(analyzer = "ik_max_word",type = FieldType.Text)private String author;private float price;private int page;(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")private Date createTime;(type = FieldType.Date,format = DateFormat.custom,pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")private Date updateTime;(analyzer = "ik_max_word",type = FieldType.Text)private String category;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getBookName() {return bookName;}public void setBookName(String bookName) {this.bookName = bookName;}public String getAuthor() {return author;}public void setAuthor(String author) {this.author = author;}public float getPrice() {return price;}public void setPrice(float price) {this.price = price;}public int getPage() {return page;}public void setPage(int page) {this.page = page;}public String getCategory() {return category;}public void setCategory(String category) {this.category = category;}public Book(){}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public Date getUpdateTime() {return updateTime;}public void setUpdateTime(Date updateTime) {this.updateTime = updateTime;}}
2.請求封裝類
public class BookQuery {public String category;public String bookName;public String author;public int priceMin;public int priceMax;public int pageMin;public int pageMax;public String sort;public String sortType;public int page;public int limit;}
3.創(chuàng)建Controller控制器
@RestControllerpublic class ElasticSearchController {@Autowiredprivate ElasticsearchRestTemplate elasticsearchRestTemplate;/*** 查詢信息* @param* @return*/@PostMapping(value = "/book/query")public JSONResult query(@RequestBody BookQuery bookQuery){Query query= getQueryBuilder(bookQuery);SearchHitssearchHits = elasticsearchRestTemplate.search(query, Book.class); List> result = searchHits.getSearchHits(); return JSONResult.ok(result);}public Query getQueryBuilder(BookQuery query) {BoolQueryBuilder builder = boolQuery();// 匹配器 模糊查詢部分,分析器使用ik (ik_max_word)Listmust = builder.must(); if (query.getBookName()!=null && !query.getBookName().isEmpty())must.add(wildcardQuery("bookName", "*" +query.getBookName()+ "*"));if (query.getCategory()!=null && !query.getCategory().isEmpty())must.add(wildcardQuery("category", "*" +query.getCategory()+ "*"));if (query.getAuthor()!=null && !query.getAuthor().isEmpty())must.add(wildcardQuery("author", "*" +query.getAuthor()+ "*"));// 篩選器 精確查詢部分Listfilter = builder.filter(); // 范圍查詢if (query.getPriceMin()>0 && query.getPriceMax()>0) {RangeQueryBuilder price = rangeQuery("price").gte(query.getPriceMin()).lte(query.getPriceMax());filter.add(price);}// 范圍查詢if (query.getPageMin()>0 && query.getPageMax()>0) {RangeQueryBuilder page = rangeQuery("page").gte(query.getPageMin()).lte(query.getPageMax());filter.add(page);}// 分頁PageRequest pageable = PageRequest.of(query.getPage() - 1, query.getLimit());// 排序SortBuilder sort = SortBuilders.fieldSort("price").order(SortOrder.DESC);//設(shè)置高亮效果String preTag = "";//google的色值String postTag = "";HighlightBuilder.Field highlightFields = new HighlightBuilder.Field("category").preTags(preTag).postTags(postTag);Query searchQuery = new NativeSearchQueryBuilder().withQuery(builder).withHighlightFields(highlightFields).withPageable(pageable).withSort(sort).build();return searchQuery;}}
4.測試驗證
啟動項目,在Postman中,請求http://localhost:8080/book/query 接口查詢書籍信息數(shù)據(jù)。查看接口返回情況。

我們看到接口成功返回數(shù)據(jù)。說明數(shù)據(jù)查詢服務(wù)創(chuàng)建成功。
最后
以上,我們就把使用Spring Boot + Elasticsearch + Logstash 實現(xiàn)完整的數(shù)據(jù)查詢檢索服務(wù)介紹完了。
