Redis(單機&集群)Pipeline工具類
提示:本文會先給出測試代碼及測試效果(使用示例),然后再貼工具類代碼。

性能對比(簡單)測試(含使用示例)
測試單機redis進行普通操作與pipeline操作:
測試代碼:

測試結(jié)果:

測試集群redis進行普通操作與pipeline操作:
測試代碼:

測試結(jié)果:

測試集群redis進行普通操作與pipeline操作hash:
測試代碼:

測試結(jié)果:


Pipeline工具類:
相關(guān)(核心)依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency>
Pipeline工具類:?
/*** redis pipeline 工具類** @author {@link JustryDeng}* @since 2020/11/13 2:41:40*/4j("unused")public final class RedisPipelineUtil implements ApplicationContextAware {private static RedisTemplate<Object, Object> defaultRedisTemplate;/*** 獲取key序列化器** @return key序列化器*/public static RedisSerializer<Object> getKeySerializer() {//noinspection uncheckedreturn (RedisSerializer<Object>) defaultRedisTemplate.getKeySerializer();}/*** 獲取value序列化器** @return value序列化器*/public static RedisSerializer<Object> getValueSerializer() {//noinspection uncheckedreturn (RedisSerializer<Object>) defaultRedisTemplate.getValueSerializer();}/*** 獲取hash-key序列化器** @return hash-key序列化器*/public static RedisSerializer<Object> getHashKeySerializer() {//noinspection uncheckedreturn (RedisSerializer<Object>)defaultRedisTemplate.getHashKeySerializer();}/*** 獲取hash-value序列化器** @return hash-value序列化器*/public static RedisSerializer<Object> getHashValueSerializer() {//noinspection uncheckedreturn (RedisSerializer<Object>)defaultRedisTemplate.getHashValueSerializer();}/*** 流水線批量操作(單節(jié)點)* <p>* 注: SessionCallback是對RedisCallback的進一步封裝, 不過我們都已經(jīng)使用pipeline了, 那干脆直接用RedisCallback好了。** @param biConsumer* 批量操作邏輯* @param paramList* biConsumer用到的參數(shù)* @return 結(jié)果集*/public static <R, P> List<R> pipeline4Standalone(BiConsumer<RedisCommands, P> biConsumer, final List<P> paramList) {//noinspection uncheckedreturn (List<R>) defaultRedisTemplate.executePipelined((RedisCallback<R>) connection -> {for (P p : paramList) {biConsumer.accept(connection, p);}return null;}, defaultRedisTemplate.getValueSerializer());}/*** 流水線批量操作(單節(jié)點)* <p>* 注: SessionCallback是對RedisCallback的進一步封裝, 不過我們都已經(jīng)使用pipeline了, 那干脆直接用RedisCallback好了。** @param redisTemplate* 操作模板* @param biConsumer* 批量操作邏輯* @param paramList* biConsumer用到的參數(shù)集合* @return 結(jié)果集*/public static <R, P> List<R> pipeline4Standalone(RedisTemplate<?, ?> redisTemplate, BiConsumer<RedisCommands, P> biConsumer, final List<P> paramList) {//noinspection uncheckedreturn (List<R>) redisTemplate.executePipelined((RedisCallback<R>) connection -> {for (P p : paramList) {biConsumer.accept(connection, p);}return null;}, redisTemplate.getValueSerializer());}/*** 由于字符串使用的相對較多, 這里官(本)方(人)直接對字符串提供出來一個操作* <p>* @see RedisPipelineUtil#pipeline4ClusterSimpleStr(RedisTemplate, BiFunction, List)*/public static <R> List<R> pipeline4ClusterSimpleStr(BiFunction<Pipeline, PipelineParamSupplier<String>, Response<R>> biFunction,List<String> paramList)throws JedisMovedDataException {return RedisPipelineUtil.pipeline4ClusterSimpleStr(defaultRedisTemplate, biFunction, paramList);}/*** @see RedisPipelineUtil#pipeline4Cluster(JedisCluster, BiFunction, List)*/("rawtypes")public static <R> List<R> pipeline4ClusterSimpleStr(@NonNull RedisTemplate redisTemplate,BiFunction<Pipeline, PipelineParamSupplier<String>,Response<R>> biFunction, List<String> paramList)throws JedisMovedDataException {RedisConnectionFactory redisConnectionFactory = redisTemplate.getConnectionFactory();Assert.notNull(redisConnectionFactory, "redisConnectionFactory cannot be null");RedisClusterConnection clusterConnection = redisConnectionFactory.getClusterConnection();if (!(clusterConnection instanceof JedisClusterConnection)) {throw new UnsupportedOperationException("cannot support RedisClusterConnection [" + clusterConnection.getClass().getName() + "]");}JedisCluster jedisCluster = ((JedisClusterConnection) clusterConnection).getNativeConnection();("unchecked") RedisSerializer<Object> keySerializer = (RedisSerializer<Object>)redisTemplate.getKeySerializer();return RedisPipelineUtil.pipeline4ClusterSimpleStr(jedisCluster, keySerializer, biFunction, paramList);}/*** 為保證keySerializer與jedisCluster是配套的,這里將此方法私有化,不對外提供* <p>* @see RedisPipelineUtil#pipeline4Cluster(JedisCluster, BiFunction, List)*/private static <R> List<R> pipeline4ClusterSimpleStr(@NonNull JedisCluster jedisCluster, RedisSerializer<Object> keySerializer,BiFunction<Pipeline, PipelineParamSupplier<String>, Response<R>> biFunction,List<String> paramList)throws JedisMovedDataException {List<StringSelfSupplier> supplierParamList = paramList.stream().map(x -> new StringSelfSupplier(x, keySerializer)).collect(Collectors.toList());return RedisPipelineUtil.pipeline4Cluster(jedisCluster, biFunction, supplierParamList);}/*** @see RedisPipelineUtil#pipeline4Cluster(RedisTemplate, BiFunction, List)*/public static <P extends PipelineParamSupplier<T>, T, R> List<R> pipeline4Cluster(BiFunction<Pipeline, PipelineParamSupplier<T>,Response<R>> biFunction, List<P> paramList)throws JedisMovedDataException {return RedisPipelineUtil.pipeline4Cluster(defaultRedisTemplate, biFunction, paramList);}/*** @see RedisPipelineUtil#pipeline4Cluster(JedisCluster, BiFunction, List)*/("rawtypes")public static <P extends PipelineParamSupplier<T>, T, R> List<R> pipeline4Cluster(@NonNull RedisTemplate redisTemplate,BiFunction<Pipeline, PipelineParamSupplier<T>, Response<R>> biFunction,List<P> paramList)throws JedisMovedDataException {RedisConnectionFactory redisConnectionFactory = redisTemplate.getConnectionFactory();Assert.notNull(redisConnectionFactory, "redisConnectionFactory cannot be null");RedisClusterConnection clusterConnection = redisConnectionFactory.getClusterConnection();if (!(clusterConnection instanceof JedisClusterConnection)) {throw new UnsupportedOperationException("cannot support RedisClusterConnection [" + clusterConnection.getClass().getName() + "]");}return RedisPipelineUtil.pipeline4Cluster(((JedisClusterConnection) clusterConnection).getNativeConnection(), biFunction, paramList);}/*** (使用JedisCluster,實現(xiàn))流水線批量操作(集群)** @param jedisCluster* JedisCluster實例* 其余參數(shù)* @see JedisClusterPipeline#pipeline4Cluster(BiFunction, List)*/public static <P extends PipelineParamSupplier<T>, T, R> List<R> pipeline4Cluster(@NonNull JedisCluster jedisCluster,BiFunction<Pipeline, PipelineParamSupplier<T>,Response<R>> biFunction, List<P> paramList)throws JedisMovedDataException {JedisClusterPipeline jedisClusterPipeline = new JedisClusterPipeline(jedisCluster);return jedisClusterPipeline.pipeline(biFunction, paramList);}("rawtypes")public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 初始化Map<String, RedisTemplate> beansOfType = applicationContext.getBeansOfType(RedisTemplate.class);Map.Entry<String, RedisTemplate> redisTemplateEntry = beansOfType.entrySet().stream().findFirst().orElseThrow(() -> new IllegalArgumentException(" cannot find any RedisTemplate"));//noinspection uncheckedRedisPipelineUtil.defaultRedisTemplate = redisTemplateEntry.getValue();log.info(" use [{}] as the default RedisPipelineUtil's RedisTemplate", redisTemplateEntry.getKey());}/*** jedis使用pipeline操作redis-cluster輔助類** 參考并整理自* <a />* <a />* <a />** @author {@link JustryDeng}* @since 2020/11/27 16:29:59*/public static class JedisClusterPipeline {private final JedisClusterConnectionHandler connectionHandler;private final JedisClusterInfoCache infoCache;public JedisClusterPipeline(JedisCluster jedisCluster) {try {Field connectionHandlerField = BinaryJedisCluster.class.getDeclaredField("connectionHandler");boolean accessible = connectionHandlerField.isAccessible();connectionHandlerField.setAccessible(true);this.connectionHandler = (JedisClusterConnectionHandler) connectionHandlerField.get(jedisCluster);connectionHandlerField.setAccessible(accessible);Field cacheField = JedisClusterConnectionHandler.class.getDeclaredField("cache");accessible = cacheField.isAccessible();cacheField.setAccessible(true);this.infoCache = (JedisClusterInfoCache) cacheField.get(connectionHandler);cacheField.setAccessible(accessible);} catch (Exception e) {throw new RuntimeException(e);}}/*** (使用JedisCluster,實現(xiàn))流水線批量操作(集群)* <p>* 注: 【據(jù)說】對集群redis進行pipeline, Jedis比Lettuce快。* <p>* 泛型說明* <ul>* <li>P:操作參數(shù), 其需要實現(xiàn){@link PipelineParamSupplier<T>}, 以獲得 1.redis-key 2.最終進行pipeline的操作參數(shù)</li>* <li>T:最終的pipeline操作參數(shù)</li>* <li>R: 為返回的數(shù)據(jù)集合泛型</li>* </ul>** @param biFunction* 批量操作邏輯* @param paramList* biFunction會用到的參數(shù)* @throws JedisMovedDataException* key對應(yīng)的slot槽點變化時拋出* @return 結(jié)果集*/public <P extends PipelineParamSupplier<T>, T, R> List<R> pipeline(BiFunction<Pipeline, PipelineParamSupplier<T>, Response<R>> biFunction,List<P> paramList) throws JedisMovedDataException {// 從paramList中抽取到對應(yīng)的redis-key集合Map<byte[], P> redisKeyParamMap = paramList.stream().collect(Collectors.toMap(P::getRedisKey, Function.identity()));Set<byte[]> allKeys = redisKeyParamMap.keySet();Map<JedisPool, List<byte[]>> poolKeys = new HashMap<>(8);// 刷新集群信息connectionHandler.renewSlotCache();for (byte[] key : allKeys) {int slot = JedisClusterCRC16.getSlot(key);JedisPool jedisPool = getJedisPoolFromSlot(slot);if (poolKeys.containsKey(jedisPool)) {List<byte[]> keys = poolKeys.get(jedisPool);keys.add(key);} else {List<byte[]> keys = new ArrayList<>();keys.add(key);poolKeys.put(jedisPool, keys);}}int size = allKeys.size();List<R> result = new ArrayList<>(size);List<Response<R>> responseList = new ArrayList<>(size);poolKeys.forEach((JedisPool jedisPool, List<byte[]> keys) -> {Jedis jedis = jedisPool.getResource();Pipeline pipeline = jedis.pipelined();try {keys.forEach(key -> responseList.add(biFunction.apply(pipeline, redisKeyParamMap.get(key))));} finally {try {pipeline.close();} catch (Exception e) {log.error(e.getMessage());}try {jedis.close();} catch (Exception e) {log.error(e.getMessage());}}});responseList.forEach(response -> result.add(response.get()));return result;}/*** 根據(jù)槽點獲取要對應(yīng)使用的JedisPool*/private JedisPool getJedisPoolFromSlot(int slot) {JedisPool connectionPool = infoCache.getSlotPool(slot);if (connectionPool != null) {// It can't guaranteed to get valid connection because of node assignmentreturn connectionPool;} else {// It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state// 刷新集群信息connectionHandler.renewSlotCache();connectionPool = infoCache.getSlotPool(slot);if (connectionPool != null) {return connectionPool;} else {throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);}}}}/*** Jedis操作redis-cluster時, pipeline操作參數(shù)提供器** @author {@link JustryDeng}* @since 2020/11/28 16:45:40*/public interface PipelineParamSupplier<T> {/*** 獲取(序列化后的)redis key* <p>* P.S. 在使用Pipeline操作集群時,redis key使用這個方法獲取。* <p>* 注: 這里之所以要將【獲取redis key】抽取為一個方法,是因為相關(guān)邏輯中有多個地方會用到。如果這些地方在將key對象序列化為byte[]時,* 采用了不同的序列化方式, 那么可能存在數(shù)據(jù)槽slot定位不一致的問題, 進而(因代碼不當(dāng))引起JedisMovedDataException異常。* 為了避免上述問題,這里將獲取redis-key的操作,抽取統(tǒng)一。** @return redis key*/byte[] getRedisKey();/*** 獲取pipeline操作需要的參數(shù)* <p>* P.S. 在使用Pipeline操作集群時,redis key使用這個方法獲取。* @return pipeline操作參數(shù)*/T getParam();}/*** 官(本)方(人)對常用的字符串提供PipelineParamSupplier實現(xiàn)** @author {@link JustryDeng}* @since 2020/11/25 22:06:23*/public static class StringSelfSupplier implements PipelineParamSupplier<String> {private final String str;private final RedisSerializer<Object> keySerializer;public StringSelfSupplier(String str, RedisSerializer<Object> keySerializer) {this.str = str;this.keySerializer = keySerializer;}public byte[] getRedisKey() {return keySerializer.serialize(getParam());}public String getParam() {return this.str;}}}

^_^ 如有不當(dāng)之處,歡迎指正
^_^ 參考鏈接
https://blog.csdn.net/youaremoon/article/details/51751991?utm_medium=distribute.pc_relevant.none-task-blog-searchFromBaidu-2.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-searchFromBaidu-2.controlhttps://www.cnblogs.com/xiaodf/p/11002184.htmlhttps://blog.csdn.net/xiaoliu598906167/article/details/82218525?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~all~first_rank_v2~rank_v25-10-82218525.nonecase&utm_term=pipeline%20redis%20%E8%BF%94%E5%9B%9E%E5%80%BC&spm=1000.2123.3001.4430
^_^ 測試代碼托管鏈接
https://github.com/JustryDeng/CommonRepository/tree/master/Abc_RedisPipeline_Demo^_^ 本文已經(jīng)被收錄進《程序員成長筆記(一)》,筆者JustryDeng
評論
圖片
表情
