<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Redis 批量操作 pipeline 模式

          共 537字,需瀏覽 2分鐘

           ·

          2021-12-12 23:28

          點擊上方“程序員大白”,選擇“星標”公眾號

          重磅干貨,第一時間送達


          業(yè)務(wù)場景

          ? ? ? ?項目中場景需要get一批key的value,因為redis的get操作(不單單是get命令)是阻塞的,如果循環(huán)取值的話,就算是內(nèi)網(wǎng),耗時也是巨大的。所以想到了redis的pipeline命令。


          pipeline簡介

          • 非pipeline:client一個請求,redis server一個響應(yīng),期間client阻塞

          • Pipeline:redis的管道命令,允許client將多個請求依次發(fā)給服務(wù)器(redis的客戶端,如jedisCluster,lettuce等都實現(xiàn)了對pipeline的封裝),過程中而不需要等待請求的回復(fù),在最后再一并讀取結(jié)果即可。


          單機版

          單機版比較簡單,批量獲取

          1//換成真實的redis實例
          2Jedis?jedis?=?new?Jedis();
          3//獲取管道
          4Pipeline?p?=?jedis.pipelined();
          5for?(int?i?=?0;?i?10000
          ;?i++)?{
          6????p.get("key_"?+?i);
          7}
          8//獲取結(jié)果
          9List<Object>?results?=?p.syncAndReturnAll();

          批量插入

          1String?key?=?"key";
          2Jedis?jedis?=?new?Jedis();
          3Pipeline?p?=?jedis.pipelined();
          4List<String>?cacheData?=?....?//要插入的數(shù)據(jù)列表
          5for(String?data:?cacheData?){
          6????p.hset(key,?data);
          7}
          8p.sync();
          9jedis.close();


          集群版

          實際上遇到的問題是,項目上所用到的Redis是集群,初始化的時候使用的類是 JedisCluster 而不是 Jedis。去查了 JedisCluster 的文檔,并沒有發(fā)現(xiàn)提供有像 Jedis 一樣的獲取 Pipeline對象的 pipelined() 方法。解決方案:

          Redis 集群規(guī)范有說: Redis 集群的鍵空間被分割為 16384 個槽(slot), 集群的最大節(jié)點數(shù)量也是 16384 個。每個主節(jié)點都負責處理 16384 個哈希槽的其中一部分。當我們說一個集群處于“穩(wěn)定”(stable)狀態(tài)時, 指的是集群沒有在執(zhí)行重配置(reconfiguration)操作, 每個哈希槽都只由一個節(jié)點進行處理。所以可以根據(jù)要插入的 key 知道這個 key 所對應(yīng)的槽的號碼,再通過這個槽的號碼從集群中找到對應(yīng) Jedis。具體實現(xiàn)如下:

          1//初始化得到了jedis?cluster,?如何獲取HostAndPort集合代碼就不寫了
          2Set?nodes?=?.....
          3JedisCluster?jedisCluster?=?new?JedisCluster(nodes);
          4
          5Map<String,?JedisPool>?nodeMap?=?jedisCluster.getClusterNodes();
          6String?anyHost?=?nodeMap.keySet().iterator().next();
          7
          8//getSlotHostMap方法在下面有
          9TreeMapString
          >?slotHostMap?=?getSlotHostMap(anyHost);?

          ?

           1private?static?TreeMapString>?getSlotHostMap(String?anyHostAndPortStr)?{
          2????????TreeMapString>?tree?=?new?TreeMapString>();
          3????????String?parts[]?=?anyHostAndPortStr.split(":");
          4????????HostAndPort?anyHostAndPort?=?new?HostAndPort(parts[0],?Integer.parseInt(parts[1]));
          5????????try{
          6????????????Jedis?jedis?=?new?Jedis(anyHostAndPort.getHost(),?anyHostAndPort.getPort());
          7????????????List<Object>?list?=?jedis.clusterSlots();
          8????????????for?(Object?object?:?list)?{
          9????????????????List<Object>?list1?=?(List<Object>)?object;
          10????????????????List<Object>?master?=?(List<Object>)?list1.get(2);
          11????????????????String?hostAndPort?=?new?String((byte[])?master.get(0))?+?":"?+?master.get(1);
          12????????????????tree.put((Long)?list1.get(0),?hostAndPort);
          13????????????????tree.put((Long)?list1.get(1),?hostAndPort);
          14????????????}
          15????????????jedis.close();
          16????????}catch(Exception?e){
          17
          18????????}
          19????????return?tree;
          20}

          ?上面這幾步可以在初始化的時候就完成。不需要每次都調(diào)用,?把nodeMap和slotHostMap都定義為靜態(tài)變量。

          1//獲取槽號
          2int?slot?=?JedisClusterCRC16.getSlot(key);?
          3//獲取到對應(yīng)的Jedis對象
          4Map.EntryString
          >?entry?=?slotHostMap.lowerEntry(Long.valueOf(slot));
          5Jedis?jedis?=?nodeMap.get(entry.getValue()).getResource();

          建議上面這步操作可以封裝成一個靜態(tài)方法。比如命名為 public static Jedis getJedisByKey(String key) 之類的。意思就是在集群中, 通過key獲取到這個key所對應(yīng)的Jedis對象。這樣再通過上面的 jedis.pipelined(); 來就可以進行批量插入了。以下是一個比較完整的封裝

            1import?redis.clients.jedis.*;
          3import?redis.clients.jedis.exceptions.JedisMovedDataException;
          4import?redis.clients.jedis.exceptions.JedisRedirectionException;
          5import?redis.clients.util.JedisClusterCRC16;
          6import?redis.clients.util.SafeEncoder;
          7
          8import?java.io.Closeable;
          9import?java.lang.reflect.Field;
          10import?java.util.*;
          11import?java.util.function.BiConsumer;
          12
          14public?class?JedisClusterPipeline?extends?PipelineBase?implements?Closeable?{
          15
          16????/**
          17?????*?用于獲取?JedisClusterInfoCache
          18?????*/

          19????private?JedisSlotBasedConnectionHandler?connectionHandler;
          20????/**
          21?????*?根據(jù)hash值獲取連接
          22?????*/

          23????private?JedisClusterInfoCache?clusterInfoCache;
          24
          25????/**
          26?????*?也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問接口
          27?????*?JedisCluster繼承于BinaryJedisCluster
          28?????*?在BinaryJedisCluster,connectionHandler屬性protected修飾的,所以需要反射
          29?????*
          30?????*
          31?????*?而?JedisClusterInfoCache?屬性在JedisClusterConnectionHandler中,但是這個類是抽象類,
          32?????*?但它有一個實現(xiàn)類JedisSlotBasedConnectionHandler
          33?????*/

          34????private?static?final?Field?FIELD_CONNECTION_HANDLER;
          35????private?static?final?Field?FIELD_CACHE;
          36????static?{
          37????????FIELD_CONNECTION_HANDLER?=?getField(BinaryJedisCluster.class,?"connectionHandler");
          38????????FIELD_CACHE?=?getField(JedisClusterConnectionHandler.class,?"cache");
          39????}
          40
          41????/**
          42?????*?根據(jù)順序存儲每個命令對應(yīng)的Client
          43?????*/

          44????private?Queue?clients?=?new?LinkedList<>();
          45????/**
          46?????*?用于緩存連接
          47?????*?一次pipeline過程中使用到的jedis緩存
          48?????*/

          49????private?Map?jedisMap?=?new?HashMap<>();
          50????/**
          51?????*?是否有數(shù)據(jù)在緩存區(qū)
          52?????*/

          53????private?boolean?hasDataInBuf?=?false;
          54
          55????/**
          56?????*?根據(jù)jedisCluster實例生成對應(yīng)的JedisClusterPipeline
          57?????*?通過此方式獲取pipeline進行操作的話必須調(diào)用close()關(guān)閉管道
          58?????*?調(diào)用本類里pipelineXX方法則不用close(),但建議最好還是在finally里調(diào)用一下close()
          59?????*?@param
          60?????*?@return
          61?????*/

          62????public?static?JedisClusterPipeline?pipelined(JedisCluster?jedisCluster)?{
          63????????JedisClusterPipeline?pipeline?=?new?JedisClusterPipeline();
          64????????pipeline.setJedisCluster(jedisCluster);
          65????????return?pipeline;
          66????}
          67
          68????public?JedisClusterPipeline()?{
          69????}
          70
          71????public?void?setJedisCluster(JedisCluster?jedis)?{
          72????????connectionHandler?=?getValue(jedis,?FIELD_CONNECTION_HANDLER);
          73????????clusterInfoCache?=?getValue(connectionHandler,?FIELD_CACHE);
          74????}
          75
          76????/**
          77?????*?刷新集群信息,當集群信息發(fā)生變更時調(diào)用
          78?????*?@param
          79?????*?@return
          80?????*/

          81????public?void?refreshCluster()?{
          82????????connectionHandler.renewSlotCache();
          83????}
          84
          85????/**
          86?????*?同步讀取所有數(shù)據(jù).?與syncAndReturnAll()相比,sync()只是沒有對數(shù)據(jù)做反序列化
          87?????*/

          88????public?void?sync()?{
          89????????innerSync(null);
          90????}
          91
          92????/**
          93?????*?同步讀取所有數(shù)據(jù)?并按命令順序返回一個列表
          94?????*
          95?????*?@return?按照命令的順序返回所有的數(shù)據(jù)
          96?????*/

          97????public?List?syncAndReturnAll()?{
          98????????List?responseList?=?new?ArrayList<>();
          99
          100????????innerSync(responseList);
          101
          102????????return?responseList;
          103????}
          104
          105????@Override
          106????public?void?close()?{
          107????????clean();
          108????????clients.clear();
          109????????for?(Jedis?jedis?:?jedisMap.values())?{
          110????????????if?(hasDataInBuf)?{
          111????????????????flushCachedData(jedis);
          112????????????}
          113????????????jedis.close();
          114????????}
          115????????jedisMap.clear();
          116????????hasDataInBuf?=?false;
          117????}
          118
          119????private?void?flushCachedData(Jedis?jedis)?{
          120????????try?{
          121????????????jedis.getClient().getAll();
          122????????}?catch?(RuntimeException?ex)?{
          123????????}
          124????}
          125
          126????@Override
          127????protected?Client?getClient(String?key)?{
          128????????byte[]?bKey?=?SafeEncoder.encode(key);
          129????????return?getClient(bKey);
          130????}
          131
          132????@Override
          133????protected?Client?getClient(byte[]?key)?{
          134????????Jedis?jedis?=?getJedis(JedisClusterCRC16.getSlot(key));
          135????????Client?client?=?jedis.getClient();
          136????????clients.add(client);
          137????????return?client;
          138????}
          139
          140????private?Jedis?getJedis(int?slot)?{
          141????????JedisPool?pool?=?clusterInfoCache.getSlotPool(slot);
          142????????//?根據(jù)pool從緩存中獲取Jedis
          143????????Jedis?jedis?=?jedisMap.get(pool);
          144????????if?(null?==?jedis)?{
          145????????????jedis?=?pool.getResource();
          146????????????jedisMap.put(pool,?jedis);
          147????????}
          148????????hasDataInBuf?=?true;
          149????????return?jedis;
          150????}
          151
          152????public?static?void?pipelineSetEx(String[]?keys,?String[]?values,?int[]?exps,JedisCluster?jedisCluster)?{
          153????????operate(new?Command()?{
          154????????????@Override
          155????????????public?List?execute()?{
          156????????????????JedisClusterPipeline?p?=?pipelined(jedisCluster);
          157????????????????for?(int?i?=?0,?len?=?keys.length;?i?158????????????????????p.setex(keys[i],?exps[i],?values[i]);
          159????????????????}
          160????????????????return?p.syncAndReturnAll();
          161????????????}
          162????????});
          163????}
          164
          165????public?static?List>?pipelineHgetAll(String[]?keys,JedisCluster?jedisCluster)?{
          166????????return?operate(new?Command()?{
          167????????????@Override
          168????????????public?List?execute()?{
          169????????????????JedisClusterPipeline?p?=?pipelined(jedisCluster);
          170????????????????for?(int?i?=?0,?len?=?keys.length;?i?171????????????????????p.hgetAll(keys[i]);
          172????????????????}
          173????????????????return?p.syncAndReturnAll();
          174????????????}
          175????????});
          176????}
          177
          178????public?static?List?pipelineSismember(String[]?keys,?String?members,JedisCluster?jedisCluster)?{
          179????????return?operate(new?Command()?{
          180????????????@Override
          181????????????public?List?execute()?{
          182????????????????JedisClusterPipeline?p?=?pipelined(jedisCluster);
          183????????????????for?(int?i?=?0,?len?=?keys.length;?i?184????????????????????p.sismember(keys[i],?members);
          185????????????????}
          186????????????????return?p.syncAndReturnAll();
          187????????????}
          188????????});
          189????}
          190
          191????public?static??List?pipeline(BiConsumer?function,?O?obj,JedisCluster?jedisCluster)?{
          192????????return?operate(new?Command()?{
          193????????????@Override
          194????????????public?List?execute()?{
          195????????????????JedisClusterPipeline?jcp?=?JedisClusterPipeline.pipelined(jedisCluster);
          196????????????????function.accept(obj,?jcp);
          197????????????????return?jcp.syncAndReturnAll();
          198????????????}
          199????????});
          200????}
          201
          202????private?void?innerSync(List?formatted)?{
          203????????HashSet?clientSet?=?new?HashSet<>();
          204????????try?{
          205????????????for?(Client?client?:?clients)?{
          206????????????????//?在sync()調(diào)用時其實是不需要解析結(jié)果數(shù)據(jù)的,但是如果不調(diào)用get方法,發(fā)生了JedisMovedDataException這樣的錯誤應(yīng)用是不知道的,因此需要調(diào)用get()來觸發(fā)錯誤。
          207????????????????//?其實如果Response的data屬性可以直接獲取,可以省掉解析數(shù)據(jù)的時間,然而它并沒有提供對應(yīng)方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了
          208????????????????Object?data?=?generateResponse(client.getOne()).get();
          209????????????????if?(null?!=?formatted)?{
          210????????????????????formatted.add(data);
          211????????????????}
          212????????????????//?size相同說明所有的client都已經(jīng)添加,就不用再調(diào)用add方法了
          213????????????????if?(clientSet.size()?!=?jedisMap.size())?{
          214????????????????????clientSet.add(client);
          215????????????????}
          216????????????}
          217????????}?catch?(JedisRedirectionException?jre)?{
          218????????????if?(jre?instanceof?JedisMovedDataException)?{
          219????????????????//?if?MOVED?redirection?occurred,?rebuilds?cluster's?slot?cache,
          220????????????????//?recommended?by?Redis?cluster?specification
          221????????????????refreshCluster();
          222????????????}
          223
          224????????????throw?jre;
          225????????}?finally?{
          226????????????if?(clientSet.size()?!=?jedisMap.size())?{
          227????????????????//?所有還沒有執(zhí)行過的client要保證執(zhí)行(flush),防止放回連接池后后面的命令被污染
          228????????????????for?(Jedis?jedis?:?jedisMap.values())?{
          229????????????????????if?(clientSet.contains(jedis.getClient()))?{
          230????????????????????????continue;
          231????????????????????}
          232????????????????????flushCachedData(jedis);
          233????????????????}
          234????????????}
          235????????????hasDataInBuf?=?false;
          236????????????close();
          237????????}
          238????}
          239
          240????private?static?Field?getField(Class?cls,?String?fieldName)?{
          241????????try?{
          242????????????Field?field?=?cls.getDeclaredField(fieldName);
          243????????????field.setAccessible(true);
          244????????????return?field;
          245????????}?catch?(NoSuchFieldException?|?SecurityException?e)?{
          246????????????throw?new?RuntimeException("cannot?find?or?access?field?'"?+?fieldName?+?"'?from?"?+?cls.getName(),?e);
          247????????}
          248????}
          249
          250????@SuppressWarnings({"unchecked"?})
          251????private?static??T?getValue(Object?obj,?Field?field)?{
          252????????try?{
          253????????????return?(T)field.get(obj);
          254????????}?catch?(IllegalArgumentException?|?IllegalAccessException?e)?{
          257????????????throw?new?RuntimeException(e);
          258????????}
          259????}
          260
          261????private?static??T?operate(Command?command)?{
          262????????try??{
          263????????????return?command.execute();
          264????????}?catch?(Exception?e)?{
          266????????????throw?new?RuntimeException(e);
          267????????}
          268????}
          269
          270????interface?Command?{
          271????????/**
          272?????????*?具體執(zhí)行命令
          273?????????*
          274?????????*?@param?
          275?????????*?@return
          276?????????*/

          277?????????T?execute();
          278????}
          279}

          使用例子

           1????public?Object?testPipelineOperate()?{
          2????????//????????String[]?keys?=?{"dylan1","dylan2"};
          3????????//????????String[]?values?=?{"dylan1-v1","dylan2-v2"};
          4????????//????????int[]?exps?=?{100,200};
          5????????//????????JedisClusterPipeline.pipelineSetEx(keys,?values,?exps,?jedisCluster);
          6????????long?start?=?System.currentTimeMillis();
          7
          8????????List<String>?keyList?=?new?ArrayList<>();
          9????????for?(int?i?=?0;?i?1000
          ;?i++)?{
          10????????????keyList.add(i?+?"");
          11????????}
          12????????//????????List?pipeline?=?JedisClusterPipeline.pipeline(this::getValue,?keyList,?jedisCluster);
          13????????//????????List?pipeline?=?JedisClusterPipeline.pipeline(this::getHashValue,?keyList,?jedisCluster);
          14????????String[]?keys?=?{"dylan-test1",?"dylan-test2"};
          15
          16????????List<Map<String,?String>>?all?=?JedisClusterPipeline.pipelineHgetAll(keys,?jedisCluster);
          17????????long?end?=?System.currentTimeMillis();
          18????????System.out.println("testPipelineOperate?cost:"?+?(end-start));
          19
          20????????return?Response.success(all);
          21????}


          13個你一定要知道的PyTorch特性

          解讀:為什么要做特征歸一化/標準化?

          一文搞懂 PyTorch 內(nèi)部機制

          張一鳴:每個逆襲的年輕人,都具備的底層能力


          關(guān)


          ,學(xué),西學(xué)學(xué),質(zhì),結(jié),關(guān)[],學(xué)!


          瀏覽 269
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    操B小说 操X操X | 黄色a网站| 国产精品后入 | 91 国产 爽 黄 在线 | 中文字幕在线观看免费视频 |