Redis異步客戶端選型及落地實(shí)踐



Netty的單個(gè)EventLoop僅與單一線程綁定,業(yè)務(wù)端的并發(fā)請(qǐng)求均會(huì)被放入EventLoop的任務(wù)隊(duì)列中,最終被該線程順序處理。同時(shí),Lettuce自身也會(huì)維護(hù)一個(gè)隊(duì)列,當(dāng)其通過(guò)EventLoop向Redis發(fā)送指令時(shí),成功發(fā)送的指令會(huì)被放入該隊(duì)列;當(dāng)收到服務(wù)端的響應(yīng)時(shí),Lettuce又會(huì)以FIFO的方式從隊(duì)列的頭部取出對(duì)應(yīng)的指令,進(jìn)行后續(xù)處理。 Redis服務(wù)端本身也是基于NIO模型,使用單一線程處理客戶端請(qǐng)求。雖然Redis能同時(shí)維持成百上千個(gè)客戶端連接,但是在某一時(shí)刻,某個(gè)客戶端連接的請(qǐng)求均是被順序處理及響應(yīng)的。 Redis客戶端與服務(wù)端通過(guò)TCP協(xié)議連接,而TCP協(xié)議本身會(huì)保證數(shù)據(jù)傳輸?shù)捻樞蛐浴?/span>


meet:集群中節(jié)點(diǎn)通過(guò)向新加入節(jié)點(diǎn)發(fā)送meet消息,將新節(jié)點(diǎn)加入集群中。 ping:節(jié)點(diǎn)間通過(guò)ping命令交換元數(shù)據(jù)。 pong:響應(yīng)ping。 fail:某個(gè)節(jié)點(diǎn)主觀認(rèn)為某個(gè)節(jié)點(diǎn)宕機(jī),會(huì)向其他節(jié)點(diǎn)發(fā)送fail消息,進(jìn)行客觀宕機(jī)判定。

基于Redis連接信息創(chuàng)建RedisClient 基于RedisClient創(chuàng)建StatefulRedisConnection 從Connection中獲取Command,基于Command執(zhí)行Redis命令操作。
List<RedisURI> servers = new ArrayList<>();servers.add(RedisURI.create("127.0.0.1", 7000));servers.add(RedisURI.create("127.0.0.1", 7001));servers.add(RedisURI.create("127.0.0.1", 7002));servers.add(RedisURI.create("127.0.0.1", 7003));servers.add(RedisURI.create("127.0.0.1", 7004));servers.add(RedisURI.create("127.0.0.1", 7005));//創(chuàng)建客戶端RedisClusterClient client = RedisClusterClient.create(servers);//創(chuàng)建連接StatefulRedisClusterConnection<String, String> connection = client.connect();//獲取異步命令RedisAdvancedClusterAsyncCommands<String, String> commands = connection.async();//執(zhí)行GET命令RedisFuture<String> future = commands.get("test-lettuce-key");try {String result = future.get();log.info("Get命令返回:{}", result);} catch (Exception e) {log.error("Get命令執(zhí)行異常", e);}



// 設(shè)置基于事件的自適應(yīng)刷新策略ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()//開(kāi)啟自適應(yīng)拓?fù)渌⑿?/span>.enableAllAdaptiveRefreshTriggers()//自適應(yīng)拓?fù)渌⑿率录瑫r(shí)時(shí)間,超時(shí)后進(jìn)行刷新.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(30)).build();redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions)// redis命令超時(shí)時(shí)間.timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(30))).build());
評(píng)論
圖片
表情
