<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          恕我直言,我也是才知道ElasticSearch條件更新是這么玩的

          共 5108字,需瀏覽 11分鐘

           ·

          2020-08-01 04:08

          背景

          ElasticSearch 的使用度越來(lái)越普及了,很多公司都在使用。有做日志搜索的,有做商品搜索的,有做訂單搜索的。

          大部分使用場(chǎng)景都是通過程序定期去導(dǎo)入數(shù)據(jù)到 ElasticSearch 中,或者通過 CDC 的方式來(lái)構(gòu)建索引。在這種場(chǎng)景下,更新數(shù)據(jù)都是單條更新,比如 ID=1 的數(shù)據(jù)發(fā)生了修改操作,那么就會(huì)把 ElasticSearch 中 ID=1 的這條數(shù)據(jù)更新下。

          但有些場(chǎng)景下需要根據(jù)條件同時(shí)更新多條數(shù)據(jù),就像 Mysql 中我們使用 Update Table Set Name=XXX where Age=18 去更新一批數(shù)據(jù)一樣。

          正好有同學(xué)微信問我怎么批量更新,接下來(lái)就看看在 ElasticSearch 中是如何去進(jìn)行按條件更新的操作。

          單條更新

          ElasticSearch 的客戶端官方推薦使用 elasticsearch-rest-high-level-client。所以本文也是基于 elasticsearch-rest-high-level-client 來(lái)構(gòu)建代碼。

          首先來(lái)回顧下單條數(shù)據(jù)的更新是怎么做的,代碼如下:

          UpdateRequest updateRequest = new UpdateRequest(index, type, id);
          updateRequest.doc(documentJson, XContentType.JSON);
          restHighLevelClient.update(updateRequest, options);

          構(gòu)建 UpdateRequest 的時(shí)候就指定了索引,類型,ID 三個(gè)字段,也就精確到了某一條數(shù)據(jù),所以更新的自然也是這一條數(shù)據(jù)。

          條件更新

          首先我們準(zhǔn)備幾條測(cè)試數(shù)據(jù),如下:

          {
          ? ? id: 1,
          ? ? title: "Java怎么學(xué)",
          ? ? type: 1,
          ? ? userId: 1,
          ? ? tags: [
          ? ? ? ? "java"
          ? ? ],
          ? ? textContent: "我要學(xué)Java",
          ? ? status: 1,
          ? ? heat: 100
          }
          {
          ? ? id: 2,
          ? ? title: "Java怎么學(xué)",
          ? ? type: 1,
          ? ? userId: 1,
          ? ? tags: [
          ? ? ? ? "java"
          ? ? ],
          ? ? textContent: "我要學(xué)Java",
          ? ? status: 1,
          ? ? heat: 100
          }

          假如我們的需求是將 userId=1 的所有文檔數(shù)據(jù)改成無(wú)效,也就是 status=0。如果不用按條件更新,你就得查詢出 userId=1 的所有數(shù)據(jù),然后一條條更新,這就太慢了。

          下面看看按條件更新是如何使用的,如下:

          POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
          {
          ? ? "script": {
          ? ? ? ? "source":"ctx._source['status']=0;"
          ? ? },
          ? ? "query": {
          ? ? ? ? "term": {
          ? ? ? ? ? ? "userId": 1
          ? ? ? ? }
          ? ? }?
          }

          按條件更新需要使用_update_by_query 來(lái)進(jìn)行,query 用于指定更新數(shù)據(jù)的匹配條件,script 用于更新的邏輯。

          詳細(xì)使用文檔:

          https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html[1]

          https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html[2]

          在 Java 代碼中如何實(shí)現(xiàn)條件更新呢?

          UpdateByQueryRequest request = new UpdateByQueryRequest("article_v1");
          request.setQuery(new TermQueryBuilder("userId", 1));
          request.setScript(new Script("ctx._source['status']=0;"));
          restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);

          是不是也很簡(jiǎn)單,跟單條數(shù)據(jù)更新差不多,使用 UpdateByQueryRequest 構(gòu)建更新對(duì)象,然后設(shè)置 Query 和 Script 就可以了。

          條件更新數(shù)組

          比如我們的需求是要移除 tags 中的 java,如下:

          POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
          {
          ? ? "script": {
          ? ? ? ? "source":"ctx._source['tags'].removeIf(item -> item == 'java');"
          ? ? },
          ? ? "query": {
          ? ? ? ? "term": {
          ? ? ? ? ? ? "userId": 1
          ? ? ? ? }
          ? ? }?
          }

          新增的話只需要將 removeIf 改成 add 就可以了。

          ctx._source['tags'].add('java');

          如果有特殊的業(yè)務(wù)邏輯,Script 中還可以寫判斷來(lái)判斷是否需要修改。

          POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
          {
          ? ? "script": {
          ? ? ? ? "source":"if(ctx._source.type == 11) {ctx._source['tags'].add('java');}"
          ? ? },
          ? ? "query": {
          ? ? ? ? "term": {
          ? ? ? ? ? ? "userId": 1
          ? ? ? ? }
          ? ? }?
          }

          封裝通用的條件更新

          大部分場(chǎng)景下的更新都比較簡(jiǎn)單,根據(jù)某個(gè)字段去更新某個(gè)值,或者去更新多個(gè)值。在 Java 中如果每個(gè)地方都去寫腳本,就重復(fù)了,最好是抽一個(gè)比較通用的方法來(lái)更新。

          下面是簡(jiǎn)單的示列,其中還有很多需要考慮的點(diǎn),像數(shù)據(jù)類型我只處理了數(shù)字,字符串,和 List,其他的大家需要自己去擴(kuò)展。

          public BulkByScrollResponse updateByQuery(String index, QueryBuilder query, Map document) {
          UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index);
          updateByQueryRequest.setQuery(query);
          StringBuilder script = new StringBuilder();
          Set keys = document.keySet();
          for (String key : keys) {
          String appendValue = "";
          Object value = document.get(key);
          if (value instanceof Number) {
          appendValue = value.toString();
          } else if (value instanceof String) {
          appendValue = "'" + value.toString() + "'";
          } else if (value instanceof List){
          appendValue = JsonUtils.toJson(value);
          } else {
          appendValue = value.toString();
          }
          script.append("ctx._source.").append(key).append("=").append(appendValue).append(";");
          }
          updateByQueryRequest.setScript(new Script(script.toString()));
          return updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
          }
          public BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) {
          Map catData = new HashMap<>(1);
          catData.put(ElasticSearchConstant.UPDATE_BY_QUERY_REQUEST, updateByQueryRequest.toString());
          return CatTransactionManager.newTransaction(() -> {
          try {
          return restHighLevelClient.updateByQuery(updateByQueryRequest, options);
          }catch (IOException e) {
          throw new RuntimeException(e);
          }
          }, ElasticSearchConstant.ES_CAT_TYPE, ElasticSearchConstant.UPDATE, catData);
          }

          如果有了這么一個(gè)方法,那么使用方式如下:

          @Test
          public void testUpdate5() {
          Map document = new HashMap<>();
          document.put("title", "Java");
          document.put("status", 0);
          document.put("tags", Lists.newArrayList("JS", "CSS"));
          kittyRestHighLevelClient.updateByQuery(elasticSearchIndexConfig.getArticleSaveIndexName(), new TermQueryBuilder("userId", 1), document);
          }

          關(guān)于作者:尹吉?dú)g,簡(jiǎn)單的技術(shù)愛好者,《Spring Cloud 微服務(wù)-全棧技術(shù)與案例解析》, 《Spring Cloud 微服務(wù) 入門 實(shí)戰(zhàn)與進(jìn)階》作者, 公眾號(hào) 猿天地 發(fā)起人。

          參考資料

          [1]

          docs-update-by-query.html: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html

          [2]

          modules-scripting-using.html: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html



          往期推薦



          分布式ID生成服務(wù),真的有必要搞一個(gè)

          Dubbo服務(wù)調(diào)用隔離這么玩對(duì)么

          Kitty中的動(dòng)態(tài)線程池支持Nacos,Apollo多配置中心了

          噓!異步事件這樣用真的好么?


          后臺(tái)回復(fù)?學(xué)習(xí)資料?領(lǐng)取學(xué)習(xí)視頻


          如有收獲,點(diǎn)個(gè)在看,誠(chéng)摯感謝

          瀏覽 62
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  蜜桃精品网站 | 2016一级片免费看 | 伊伊人网 | 久久成人毛片 | 亚洲人成色77777在线播放 |