圖解 | Elasticsearch 獲取兩個(gè)索引數(shù)據(jù)不同之處的四種方案
1、實(shí)戰(zhàn)項(xiàng)目問題
......我有2個(gè)index,假設(shè)其中index1中數(shù)據(jù)是 id1,id2,id3,index2 中是 id1,id3。我的目的是能找出缺失的 id2 的數(shù)據(jù),并且后續(xù)進(jìn)去的 id4,id5 如果有缺失的也能發(fā)現(xiàn)?!獑栴}來源:死磕 Elasticsearch 知識(shí)星球
2、問題解讀
假定有兩個(gè)索引 index1、index2,這兩個(gè)索引中有大量相同數(shù)據(jù)。
這個(gè)問題的本質(zhì)是實(shí)現(xiàn)類似:linux 下的 diff 命令的操作,找出一個(gè)索引中存在而在另外一個(gè)索引不存在的數(shù)據(jù)。
3、方案探討
Elasticsearch 沒有直接實(shí)現(xiàn)找索引數(shù)據(jù)差異的類 diff 命令可用。
但,redis 中有 sdiff 命令可以一鍵搞定一個(gè)集合中有而另外一個(gè)集合中沒有的數(shù)據(jù)。

這就引申出方案一:借助 redis 實(shí)現(xiàn)。
那么問題來了,不用 redis, Elasticsearch 自身能否搞定呢?
其實(shí)是可以搞定的。我們通過組合索引檢索,然后對(duì)索引中公有相同主鍵字段進(jìn)行聚合,然后進(jìn)行去重統(tǒng)計(jì),找出計(jì)數(shù) < 2 的就是我們想要的 id 。因?yàn)椋喝绻麅蓚€(gè)索引都有數(shù)據(jù),勢(shì)必聚合后計(jì)數(shù) >= 2。此為方案二。
還有,我們可以借助 Elasticsearch transform 實(shí)現(xiàn),此為方案三。
類似問題是個(gè)業(yè)界通用問題,有沒有開源實(shí)現(xiàn)方案呢?此為方案四。
4、方案實(shí)現(xiàn)
4.1 方案一:借助 redis sdiff 實(shí)現(xiàn)

前提:Elasticsearch 索引數(shù)據(jù)中有類似 MySQL 主鍵的字段,能唯一標(biāo)定一條記錄。如果沒有可以使用 _id 字段,但不建議使用 _id ,下文會(huì)說原因。
實(shí)施步驟如下:
步驟1:將 index1 (數(shù)據(jù)量多的,全量索引)的主鍵字段 uniq_1 導(dǎo)入 redis; 步驟2:將 index2 的主鍵字段 uniq_2 導(dǎo)入 redis; 步驟3:使用 sdiff 命令行返回結(jié)果就是期望不同 id 值。
4.2 方案二:借助 Elasticsearch 聚合實(shí)現(xiàn)
我們用 kibana 自帶的索引數(shù)據(jù)仿真一把。
4.2.1 用已有索引實(shí)現(xiàn),好理解,大家都可以復(fù)現(xiàn)。
POST _reindex
{
"source": {
"index": "kibana_sample_data_flights"
},
"dest": {
"index": "kibana_sample_data_flights_ext"
}
}
GET kibana_sample_data_flights/_count
共60個(gè),用作不同的值區(qū)分用
POST kibana_sample_data_flights_ext/_search
{
"query": {
"bool": {
"must": [
{
"term": {
"OriginCountry.keyword": {
"value": "US"
}
}
},
{
"term": {
"OriginWeather.keyword": {
"value": "Rain"
}
}
},
{
"term": {
"DestWeather.keyword": {
"value": "Rain"
}
}
}
]
}
}
}
刪除掉了60條記錄 "deleted" : 60,
POST kibana_sample_data_flights_ext/_delete_by_query
{
"query": {
"bool": {
"must": [
{
"term": {
"OriginCountry.keyword": {
"value": "US"
}
}
},
{
"term": {
"OriginWeather.keyword": {
"value": "Rain"
}
}
},
{
"term": {
"DestWeather.keyword": {
"value": "Rain"
}
}
}
]
}
}
}
這樣操作之后,_data_flights_ext 索引就比 _data_flights 索引少了 60 條數(shù)據(jù)。
如何實(shí)現(xiàn)聚合呢?
先全局設(shè)置修復(fù)可能的報(bào)錯(cuò),設(shè)置如下:
PUT _cluster/settings
{
"persistent": {
"indices.id_field_data.enabled": true
}
}
4.2.2 聚合去重實(shí)現(xiàn) DSL
POST kibana_sample_data_flights,kibana_sample_data_flights_ext/_search
{
"size": 0,
"aggs": {
"group_by_uid": {
"terms": {
"field": "_id",
"size": 1000000
},
"aggs": {
"count_indices": {
"cardinality": {
"field": "_index"
}
},
"values_bucket_filter_by_index_count": {
"bucket_selector": {
"buckets_path": {
"count": "count_indices"
},
"script": "params.count < 2"
}
}
}
}
}
}
size 值設(shè)置的比較大,是因?yàn)樘岣呔酆暇鹊脑?,否則結(jié)果會(huì)不準(zhǔn)確。
前面如果不設(shè)置的話,會(huì)報(bào)錯(cuò)如下:
"reason" : "Fielddata access on the _id field is disallowed, you can re-enable it by updating the dynamic cluster setting: indices.id_field_data.enabled"
也就是說 8.X 版本不推薦使用 id 作為聚合操作的字段,這也解釋了前文讓自己生成 uniq_id 的原因所在。
執(zhí)行結(jié)果如下:

doc_count 為 1 的結(jié)果值,就是我們期望的結(jié)果。
如果上面聚合不好理解,簡(jiǎn)化版圖解如下:

4.3 方案三:借助 Elasticsearch transform 實(shí)現(xiàn)
transform 咱們之前文章提及的少,這里簡(jiǎn)單說一下。
transform 含義如其英文釋義一致“轉(zhuǎn)換、改造”的意思。就是把已有索引“轉(zhuǎn)換、改造”為匯總索引(summarized indices),方便我們做后續(xù)的分析操作。

transform 常見的 API 如下所示:

https://www.elastic.co/guide/en/elasticsearch/reference/current/transform-apis.html
步驟1:創(chuàng)建索引
其實(shí)這一步非必須,只不過我們后面使用了 _id 字段,不先創(chuàng)建索引、指定 mapping 的話會(huì)報(bào)錯(cuò)。
PUT compare
{
"mappings": {
"_meta": {
"_transform": {
"transform": "index_compare",
"version": {
"created": "8.2.2"
},
"creation_date_in_millis": 1656279927899
},
"created_by": "transform"
},
"properties": {
"unique-id": {
"type": "keyword"
}
}
},
"settings": {
"index": {
"number_of_shards": "1",
"auto_expand_replicas": "0-1"
}
},
"aliases": {}
}
compare 就是我們目標(biāo)生成的:匯總索引 。
細(xì)心的讀者會(huì)發(fā)現(xiàn),這個(gè) compare 像是系統(tǒng)生成的索引。沒錯(cuò)的,這是借助:POST _transform/_preview ...生成然后人工做部分修改后的索引。
步驟2:創(chuàng)建 transform
PUT _transform/index_compare
{
"source": {
"index": [
"kibana_sample_data_flights",
"kibana_sample_data_flights_ext"
],
"query": {
"match_all": {}
}
},
"dest": {
"index": "compare"
},
"pivot": {
"group_by": {
"unique-id": {
"terms": {
"field": "_id"
}
}
},
"aggregations": {
"compare": {
"scripted_metric": {
"map_script": "state.doc = new HashMap(params['_source'])",
"combine_script": "return state",
"reduce_script": """
if (states.size() != 2) {
return "count_mismatch"
}
if (states.get(0).equals(states.get(1))) {
return "match"
} else {
return "mismatch"
}
"""
}
}
}
}
}
source:指定了兩個(gè)源索引,便于后續(xù)的 compare 操作。
pivot:中樞、樞紐的意思,所有的核心操作都放到這里面。執(zhí)行的核心:先以_id 做了聚合操作,然后針對(duì)聚合后的結(jié)果做了處理;聚合結(jié)果不為2(必然為1),就是我們期望的結(jié)果,返回:count_mismatch。其他,若相等返回:match。
步驟3:執(zhí)行 transform
POST _transform/index_compare/_start
步驟4:基于 transform 生成的目標(biāo)索引,執(zhí)行特定檢索。
POST compare/_search
{
"track_total_hits": true,
"size": 1000,
"query": {
"term": {
"compare.keyword": {
"value": "count_mismatch"
}
}
}
}
返回結(jié)果就是我們期望的不同值,截圖如下所示:

4.4 方案四:第三方開源實(shí)現(xiàn)
認(rèn)知前提:只要我們認(rèn)為是問題的點(diǎn),極大可能“前人”早已經(jīng)遇到過,更大可能“前人”早已經(jīng)給出了解決方案甚至已經(jīng)開源了解決方案。這是我從業(yè)10年+感觸比較深的地方,一句話:“非必要,不重復(fù)造輪子”。
開源方案 1:https://github.com/Aconex/scrutineer/
可實(shí)現(xiàn)不同數(shù)據(jù)源,如:Elasticsearch VS Elasticsearch,Elasticsearch VS Solr 之間的索引數(shù)據(jù)比較。
開源方案 2:https://github.com/olivere/esdiff
可實(shí)現(xiàn)比較不同索引之間文檔的差異。
實(shí)現(xiàn)參考如下:
$ ./esdiff -u=true -d=false 'http://localhost:19200/index01/tweet' 'http://localhost:29200/index01/_doc'
Unchanged 1
Updated 3 {*diff.Document}.Source["message"]:
-: "Playing the piano is fun as well"
+: "Playing the guitar is fun as well"
Created 4 {*diff.Document}:
-: (*diff.Document)(nil)
+: &diff.Document{ID: "4", Source: map[string]interface {}{"message": "Climbed that mountain", "user": "sandrae"}}
5、小結(jié)
只要思想不滑坡,方案總比問題多。
自己寫程序能否實(shí)現(xiàn)呢?當(dāng)然也是可以的。“index1是完整的可以作為參照物。以插入時(shí)間為主線(時(shí)間戳,應(yīng)該每條記錄都會(huì)有一條數(shù)據(jù))拿 index1 的每個(gè)id數(shù)據(jù)在 index2 中進(jìn)行檢索,如果存在,ok沒有問題;如果不存在,記錄一下id,id 存入一個(gè)集合里面,這個(gè) id 集合就是想要的 目標(biāo) id 集合?!?/p>
你的業(yè)務(wù)場(chǎng)景有沒有遇到類似問題,如何解決的呢?
歡迎留言討論。
推薦閱讀
更短時(shí)間更快習(xí)得更多干貨!
和全球 1600+ Elastic 愛好者一起精進(jìn)!

