Redis 批量操作 pipeline 模式
點擊上方“程序員大白”,選擇“星標”公眾號
重磅干貨,第一時間送達
業(yè)務(wù)場景
? ? ? ?項目中場景需要get一批key的value,因為redis的get操作(不單單是get命令)是阻塞的,如果循環(huán)取值的話,就算是內(nèi)網(wǎng),耗時也是巨大的。所以想到了redis的pipeline命令。
pipeline簡介
非pipeline:client一個請求,redis server一個響應(yīng),期間client阻塞

Pipeline:redis的管道命令,允許client將多個請求依次發(fā)給服務(wù)器(redis的客戶端,如jedisCluster,lettuce等都實現(xiàn)了對pipeline的封裝),過程中而不需要等待請求的回復(fù),在最后再一并讀取結(jié)果即可。

單機版
單機版比較簡單,批量獲取
1//換成真實的redis實例
2Jedis?jedis?=?new?Jedis();
3//獲取管道
4Pipeline?p?=?jedis.pipelined();
5for?(int?i?=?0;?i?10000;?i++)?{
6????p.get("key_"?+?i);
7}
8//獲取結(jié)果
9List<Object>?results?=?p.syncAndReturnAll();
批量插入
1String?key?=?"key";
2Jedis?jedis?=?new?Jedis();
3Pipeline?p?=?jedis.pipelined();
4List<String>?cacheData?=?....?//要插入的數(shù)據(jù)列表
5for(String?data:?cacheData?){
6????p.hset(key,?data);
7}
8p.sync();
9jedis.close();
集群版
實際上遇到的問題是,項目上所用到的Redis是集群,初始化的時候使用的類是 JedisCluster 而不是 Jedis。去查了 JedisCluster 的文檔,并沒有發(fā)現(xiàn)提供有像 Jedis 一樣的獲取 Pipeline對象的 pipelined() 方法。解決方案:
Redis 集群規(guī)范有說: Redis 集群的鍵空間被分割為 16384 個槽(slot), 集群的最大節(jié)點數(shù)量也是 16384 個。每個主節(jié)點都負責處理 16384 個哈希槽的其中一部分。當我們說一個集群處于“穩(wěn)定”(stable)狀態(tài)時, 指的是集群沒有在執(zhí)行重配置(reconfiguration)操作, 每個哈希槽都只由一個節(jié)點進行處理。所以可以根據(jù)要插入的 key 知道這個 key 所對應(yīng)的槽的號碼,再通過這個槽的號碼從集群中找到對應(yīng) Jedis。具體實現(xiàn)如下:
1//初始化得到了jedis?cluster,?如何獲取HostAndPort集合代碼就不寫了
2Set?nodes?=?.....
3JedisCluster?jedisCluster?=?new?JedisCluster(nodes);
4
5Map<String,?JedisPool>?nodeMap?=?jedisCluster.getClusterNodes();
6String?anyHost?=?nodeMap.keySet().iterator().next();
7
8//getSlotHostMap方法在下面有
9TreeMapString >?slotHostMap?=?getSlotHostMap(anyHost);?
?
1private?static?TreeMapString >?getSlotHostMap(String?anyHostAndPortStr)?{
2????????TreeMapString>?tree?=?new?TreeMapString>();
3????????String?parts[]?=?anyHostAndPortStr.split(":");
4????????HostAndPort?anyHostAndPort?=?new?HostAndPort(parts[0],?Integer.parseInt(parts[1]));
5????????try{
6????????????Jedis?jedis?=?new?Jedis(anyHostAndPort.getHost(),?anyHostAndPort.getPort());
7????????????List<Object>?list?=?jedis.clusterSlots();
8????????????for?(Object?object?:?list)?{
9????????????????List<Object>?list1?=?(List<Object>)?object;
10????????????????List<Object>?master?=?(List<Object>)?list1.get(2);
11????????????????String?hostAndPort?=?new?String((byte[])?master.get(0))?+?":"?+?master.get(1);
12????????????????tree.put((Long)?list1.get(0),?hostAndPort);
13????????????????tree.put((Long)?list1.get(1),?hostAndPort);
14????????????}
15????????????jedis.close();
16????????}catch(Exception?e){
17
18????????}
19????????return?tree;
20}
?上面這幾步可以在初始化的時候就完成。不需要每次都調(diào)用,?把nodeMap和slotHostMap都定義為靜態(tài)變量。
1//獲取槽號
2int?slot?=?JedisClusterCRC16.getSlot(key);?
3//獲取到對應(yīng)的Jedis對象
4Map.EntryString >?entry?=?slotHostMap.lowerEntry(Long.valueOf(slot));
5Jedis?jedis?=?nodeMap.get(entry.getValue()).getResource();
建議上面這步操作可以封裝成一個靜態(tài)方法。比如命名為 public static Jedis getJedisByKey(String key) 之類的。意思就是在集群中, 通過key獲取到這個key所對應(yīng)的Jedis對象。這樣再通過上面的 jedis.pipelined(); 來就可以進行批量插入了。以下是一個比較完整的封裝
1import?redis.clients.jedis.*;
3import?redis.clients.jedis.exceptions.JedisMovedDataException;
4import?redis.clients.jedis.exceptions.JedisRedirectionException;
5import?redis.clients.util.JedisClusterCRC16;
6import?redis.clients.util.SafeEncoder;
7
8import?java.io.Closeable;
9import?java.lang.reflect.Field;
10import?java.util.*;
11import?java.util.function.BiConsumer;
12
14public?class?JedisClusterPipeline?extends?PipelineBase?implements?Closeable?{
15
16????/**
17?????*?用于獲取?JedisClusterInfoCache
18?????*/
19????private?JedisSlotBasedConnectionHandler?connectionHandler;
20????/**
21?????*?根據(jù)hash值獲取連接
22?????*/
23????private?JedisClusterInfoCache?clusterInfoCache;
24
25????/**
26?????*?也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問接口
27?????*?JedisCluster繼承于BinaryJedisCluster
28?????*?在BinaryJedisCluster,connectionHandler屬性protected修飾的,所以需要反射
29?????*
30?????*
31?????*?而?JedisClusterInfoCache?屬性在JedisClusterConnectionHandler中,但是這個類是抽象類,
32?????*?但它有一個實現(xiàn)類JedisSlotBasedConnectionHandler
33?????*/
34????private?static?final?Field?FIELD_CONNECTION_HANDLER;
35????private?static?final?Field?FIELD_CACHE;
36????static?{
37????????FIELD_CONNECTION_HANDLER?=?getField(BinaryJedisCluster.class,?"connectionHandler");
38????????FIELD_CACHE?=?getField(JedisClusterConnectionHandler.class,?"cache");
39????}
40
41????/**
42?????*?根據(jù)順序存儲每個命令對應(yīng)的Client
43?????*/
44????private?Queue?clients?=?new?LinkedList<>();
45????/**
46?????*?用于緩存連接
47?????*?一次pipeline過程中使用到的jedis緩存
48?????*/
49????private?Map?jedisMap?=?new?HashMap<>();
50????/**
51?????*?是否有數(shù)據(jù)在緩存區(qū)
52?????*/
53????private?boolean?hasDataInBuf?=?false;
54
55????/**
56?????*?根據(jù)jedisCluster實例生成對應(yīng)的JedisClusterPipeline
57?????*?通過此方式獲取pipeline進行操作的話必須調(diào)用close()關(guān)閉管道
58?????*?調(diào)用本類里pipelineXX方法則不用close(),但建議最好還是在finally里調(diào)用一下close()
59?????*?@param
60?????*?@return
61?????*/
62????public?static?JedisClusterPipeline?pipelined(JedisCluster?jedisCluster)?{
63????????JedisClusterPipeline?pipeline?=?new?JedisClusterPipeline();
64????????pipeline.setJedisCluster(jedisCluster);
65????????return?pipeline;
66????}
67
68????public?JedisClusterPipeline()?{
69????}
70
71????public?void?setJedisCluster(JedisCluster?jedis)?{
72????????connectionHandler?=?getValue(jedis,?FIELD_CONNECTION_HANDLER);
73????????clusterInfoCache?=?getValue(connectionHandler,?FIELD_CACHE);
74????}
75
76????/**
77?????*?刷新集群信息,當集群信息發(fā)生變更時調(diào)用
78?????*?@param
79?????*?@return
80?????*/
81????public?void?refreshCluster()?{
82????????connectionHandler.renewSlotCache();
83????}
84
85????/**
86?????*?同步讀取所有數(shù)據(jù).?與syncAndReturnAll()相比,sync()只是沒有對數(shù)據(jù)做反序列化
87?????*/
88????public?void?sync()?{
89????????innerSync(null);
90????}
91
92????/**
93?????*?同步讀取所有數(shù)據(jù)?并按命令順序返回一個列表
94?????*
95?????*?@return?按照命令的順序返回所有的數(shù)據(jù)
96?????*/
97????public?List{
98????????List 使用例子
1????public?Object?testPipelineOperate()?{
2????????//????????String[]?keys?=?{"dylan1","dylan2"};
3????????//????????String[]?values?=?{"dylan1-v1","dylan2-v2"};
4????????//????????int[]?exps?=?{100,200};
5????????//????????JedisClusterPipeline.pipelineSetEx(keys,?values,?exps,?jedisCluster);
6????????long?start?=?System.currentTimeMillis();
7
8????????List<String>?keyList?=?new?ArrayList<>();
9????????for?(int?i?=?0;?i?1000;?i++)?{
10????????????keyList.add(i?+?"");
11????????}
12????????//????????List?pipeline?=?JedisClusterPipeline.pipeline(this::getValue,?keyList,?jedisCluster);
13????????//????????List?pipeline?=?JedisClusterPipeline.pipeline(this::getHashValue,?keyList,?jedisCluster);
14????????String[]?keys?=?{"dylan-test1",?"dylan-test2"};
15
16????????List<Map<String,?String>>?all?=?JedisClusterPipeline.pipelineHgetAll(keys,?jedisCluster);
17????????long?end?=?System.currentTimeMillis();
18????????System.out.println("testPipelineOperate?cost:"?+?(end-start));
19
20????????return?Response.success(all);
21????}推薦閱讀
關(guān)于程序員大白
程序員大白是一群哈工大,東北大學(xué),西湖大學(xué)和上海交通大學(xué)的碩士博士運營維護的號,大家樂于分享高質(zhì)量文章,喜歡總結(jié)知識,歡迎關(guān)注[程序員大白],大家一起學(xué)習進步!


