<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Redis 發(fā)布訂閱模式,一個(gè)被嚴(yán)重低估的功能

          共 7630字,需瀏覽 16分鐘

           ·

          2020-09-23 16:15

          也許有的小伙伴對(duì)這個(gè)功能比較陌生,不太清楚這個(gè)功能是干什么的,沒(méi)關(guān)系小黑哥先來(lái)舉個(gè)例子。

          0e49b47a9ae5bcc6257ddcdb15f20f2a.webp

          假設(shè)我們有這么一個(gè)業(yè)務(wù)場(chǎng)景,在網(wǎng)站下單支付以后,需要通知庫(kù)存服務(wù)進(jìn)行發(fā)貨處理。

          上面業(yè)務(wù)實(shí)現(xiàn)不難,我們只要讓庫(kù)存服務(wù)提供給相關(guān)的給口,下單支付之后只要調(diào)用庫(kù)存服務(wù)即可。

          817b505282a3b6da23ea053d38e15811.webp

          后面如果又有新的業(yè)務(wù),比如說(shuō)積分服務(wù),他需要獲取下單支付的結(jié)果,然后增加用戶的積分。

          這個(gè)實(shí)現(xiàn)也不難,讓積分服務(wù)同樣提供一個(gè)接口,下單支付之后只要調(diào)用庫(kù)存服務(wù)即可。

          9f8a93d72bf4017eb8ab851bd001c5c0.webp

          如果就兩個(gè)業(yè)務(wù)需要獲取下單支付的結(jié)果,那也還好,程序改造也快。可是隨著業(yè)務(wù)不斷的發(fā)展,越來(lái)越多的新業(yè)務(wù)說(shuō)是要下單支付的結(jié)果。

          這時(shí)我們會(huì)發(fā)現(xiàn)上面這樣的系統(tǒng)架構(gòu)存在很多問(wèn)題:

          第一,下單支付業(yè)務(wù)與其他業(yè)務(wù)重度耦合,每當(dāng)有個(gè)新業(yè)務(wù)需要支付結(jié)果,就需要改動(dòng)下單支付的業(yè)務(wù)。

          第二,如果調(diào)用業(yè)務(wù)過(guò)多,會(huì)導(dǎo)致下單支付接口響應(yīng)時(shí)間變長(zhǎng)。另外,如果有任一下游接口響應(yīng)變慢,就會(huì)同步導(dǎo)致下單支付接口響應(yīng)也變長(zhǎng)。

          第三,如果任一下游接口失敗,可能導(dǎo)致數(shù)據(jù)不一致的情況。比如說(shuō)下圖,先調(diào)用 A,成功之后再調(diào)用 B,最后再調(diào)用 C。

          4fb1c9bbd9c3817dff2dd989b51d871e.webp

          如果在調(diào)用 B 接口的發(fā)生異常,此時(shí)可能就導(dǎo)致下單支付接口返回失敗,但是此時(shí) A 接口其實(shí)已經(jīng)調(diào)用成功,這就代表它內(nèi)部已經(jīng)處理下單支付成功的結(jié)果。

          這樣就會(huì)導(dǎo)致 A,B,C 三個(gè)下游接口,A 獲取成功獲取支付結(jié)果,但是 B,C 沒(méi)有拿到,導(dǎo)致三者系統(tǒng)數(shù)據(jù)不一致的情況。

          其實(shí)我們仔細(xì)想一下,對(duì)于下單支付業(yè)務(wù)來(lái)講,它其實(shí)不需要關(guān)心下游調(diào)用結(jié)果,只要有某種機(jī)制通知能通知到他們就可以了。

          講到這里,這就需要引入今天需要介紹發(fā)布訂閱機(jī)制。

          Redis 發(fā)布與訂閱

          Redis 提供了基于「發(fā)布/訂閱」模式的消息機(jī)制,在這種模式下,消息發(fā)布者與訂閱者不需要進(jìn)行直接通信。

          5e71ff41681b2a7390030440f09c527c.webp

          如上圖所示,消息發(fā)布者只需要想指定的頻道發(fā)布消息,訂閱該頻道的每個(gè)客戶端都可以接受到到這個(gè)消息。

          使用 Redis 發(fā)布訂閱這種機(jī)制,對(duì)于上面業(yè)務(wù),下單支付業(yè)務(wù)只需要向「支付結(jié)果」這個(gè)頻道發(fā)送消息,其他下游業(yè)務(wù)訂閱「支付結(jié)果」這個(gè)頻道,就能收相應(yīng)消息,然后做出業(yè)務(wù)處理即可。

          這樣就可以解耦系統(tǒng)上下游之間調(diào)用關(guān)系。

          接下來(lái)我們來(lái)看下,我們來(lái)看下如何使用 Redis 發(fā)布訂閱功能。

          Redis 中提供了一組命令,可以用于發(fā)布消息,訂閱頻道,取消訂閱以及按照模式訂閱。

          首先我們來(lái)看下如何發(fā)布一條消息,其實(shí)很簡(jiǎn)單只要使用 「publish」 指令:

          publish?channel?message
          c342d16f7cbe98840295fa2638df7490.webp

          上圖中,我們使用 「publish」 指令向 「pay_result」 這個(gè)頻道發(fā)送了一條消息。我們可以看到 redis 向我們返回 0 ,這其實(shí)代表當(dāng)前訂閱者個(gè)數(shù),由于此時(shí)沒(méi)有訂閱,所以返回結(jié)果為 0 。

          接下來(lái)我們使用 「subscribe」 訂閱一個(gè)或多個(gè)頻道

          subscribe?channel?[channel?...]
          85bb3c65547b5809ddcecbfc87e7900e.webp

          如上圖所示,我們訂閱 「pay_result」 這個(gè)頻道,當(dāng)有其他客戶端往這個(gè)頻道發(fā)送消息,

          2de3f3c7a790d3d626f575d40e0b4a54.webp

          當(dāng)前訂閱者就會(huì)收到消息。

          ad04efff28b6e752931500269261750b.webp

          我們子在使用訂閱命令,需要主要幾點(diǎn):

          第一,客戶端執(zhí)行訂閱指令之后,就會(huì)進(jìn)入訂閱狀態(tài),之后就只能接收 「subscribe」「psubscribe」「unsubscribe」「punsubscribe」 這四個(gè)命令。

          af94bfc481314ebbf2d6c6beb6ea042d.webp

          第二,新訂閱的客戶端,是「無(wú)法收到這個(gè)頻道之前的消息」,這是因?yàn)?Redis 并不會(huì)對(duì)發(fā)布的消息持久化的。

          ?

          相比于很多專業(yè) MQ,比如 kafka、rocketmq 來(lái)說(shuō), ?redis 發(fā)布訂閱功能就顯得有點(diǎn)簡(jiǎn)陋了。不過(guò) redis 發(fā)布訂閱功能勝在簡(jiǎn)單,如果當(dāng)前場(chǎng)景可以容忍這些缺點(diǎn),還是可以選擇使用的。

          看到這里是不是感覺(jué)這個(gè)功能挺廢材的,不要急,往下看,適合的場(chǎng)景還是有大用處的~

          ?

          除了上面的功能以外的,Redis 還支持模式匹配的訂閱方式。簡(jiǎn)單來(lái)說(shuō),客戶端可以訂閱一個(gè)帶 * 號(hào)的模式,如果某些頻道的名字與這個(gè)模式匹配,那么當(dāng)其他客戶端發(fā)送給消息給這些頻道時(shí),訂閱這個(gè)模式的客戶端也將會(huì)到收到消息。

          使用 Redis 訂閱模式,我們需要使用一個(gè)新的指令 「psubscribe」

          我們執(zhí)行下面這個(gè)指令:

          psubscribe?pay.*

          那么一旦有其他客戶端往 ?「pay」 開頭的頻道,比如 pay_resultpay_xxx,我們都可以收到消息。

          fb1fd53099365b732404b9dc20078c21.webp

          如果需要取消訂閱模式,我們需要使用相應(yīng)punsubscribe 指令,比如取消上面訂閱的模式:

          punsubscribe?pay.*

          Redis 客戶端發(fā)布訂閱使用方式

          基于 Jedis 開發(fā)發(fā)布/訂閱

          聊完 Redis 發(fā)布訂閱指令,我們來(lái)看下 Java Redis 客戶端如何使用發(fā)布訂閱。

          ?

          下面的例子主要基于 Jedis,maven 版本為:

          <dependency>
          ?<groupId>redis.clientsgroupId>
          ?<artifactId>jedisartifactId>
          ?<version>3.1.0version>
          dependency>

          其他 Redis 客戶端大同小異。

          ?

          jedis 發(fā)布代碼比較簡(jiǎn)單,只需要調(diào)用 Jedis 類的 publish 方法。

          //?生產(chǎn)環(huán)境千萬(wàn)不要這么使用哦,推薦使用?JedisPool?線程池的方式?
          Jedis?jedis?=?new?Jedis("localhost",?6379);
          jedis.auth("xxxxx");
          jedis.publish("pay_result",?"hello?world");

          訂閱的代碼就相對(duì)復(fù)雜了,我們需要繼承 JedisPubSub實(shí)現(xiàn)里面的相關(guān)方法,一旦有其他客戶端往訂閱的頻道上發(fā)送消息,將會(huì)調(diào)用 JedisPubSub 相應(yīng)的方法。

          private?static?class?MyListener?extends?JedisPubSub?{
          ????@Override
          ????public?void?onMessage(String?channel,?String?message)?{
          ????????System.out.println("收到訂閱頻道:"?+?channel?+?"?消息:"?+?message);

          ????}

          ????@Override
          ????public?void?onPMessage(String?pattern,?String?channel,?String?message)?{
          ????????System.out.println("收到具體訂閱頻道:"?+?channel?+?"訂閱模式:"?+?pattern?+?"?消息:"?+?message);
          ????}

          }

          其次我們需要調(diào)用 Jedis 類的 subscribe 方法:

          Jedis?jedis?=?new?Jedis("localhost",?6379);
          jedis.auth("xxx");
          jedis.subscribe(new?MyListener(),?"pay_result");

          當(dāng)有其他客戶端往 pay_result頻道發(fā)送消息時(shí),訂閱將會(huì)收到消息。

          13897038416a0fb56a7bbf87acb1e4f1.webp

          不過(guò)需要注意的是,jedis#subscribe 是一個(gè)阻塞方法,調(diào)用之后將會(huì)阻塞主線程的,所以如果需要在正式項(xiàng)目使用需要使用異步線程運(yùn)行,這里就不演示具體的代碼了。

          基于 Spring-Data-Redis 開發(fā)發(fā)布訂閱

          原生 jedis 發(fā)布訂閱操作,相對(duì)來(lái)說(shuō)還是有點(diǎn)復(fù)雜。現(xiàn)在我們很多應(yīng)用已經(jīng)基于 SpringBoot 開發(fā),使用 spring-boot-starter-data-redis ,可以簡(jiǎn)化發(fā)布訂閱開發(fā)。

          首先我們需要引入相應(yīng)的 startter 依賴:

          <dependency>
          ????<groupId>org.springframework.bootgroupId>
          ????<artifactId>spring-boot-starter-data-redisartifactId>
          ????<exclusions>
          ????????<exclusion>
          ????????????<artifactId>lettuce-coreartifactId>
          ????????????<groupId>io.lettucegroupId>
          ????????exclusion>
          ????exclusions>
          dependency>
          <dependency>
          ????<groupId>redis.clientsgroupId>
          ????<artifactId>jedisartifactId>
          dependency>
          ?

          這里我們使用 Jedis 當(dāng)做底層連接客戶端,所以需要排除 lettuce,然后引入 Jedis 依賴。

          ?

          然后我們需要?jiǎng)?chuàng)建一個(gè)消息接收類,里面需要有方法消費(fèi)消息:

          @Slf4j
          public?class?Receiver?{
          ????private?AtomicInteger?counter?=?new?AtomicInteger();

          ????public?void?receiveMessage(String?message)?{
          ????????log.info("Received?<"?+?message?+?">");
          ????????counter.incrementAndGet();
          ????}

          ????public?int?getCount()?{
          ????????return?counter.get();
          ????}
          }

          接著我們只需要注入 Spring- Redis 相關(guān) Bean,比如:

          • StringRedisTemplate,用來(lái)操作 Redis 命令
          • MessageListenerAdapter ,消息監(jiān)聽器,可以在這個(gè)類注入我們上面創(chuàng)建消息接受類Receiver
          • RedisConnectionFactory, 創(chuàng)建 Redis 底層連接
          @Configuration
          public?class?MessageConfiguration?{

          ????@Bean
          ????RedisMessageListenerContainer?container(RedisConnectionFactory?connectionFactory,
          ????????????????????????????????????????????MessageListenerAdapter?listenerAdapter)
          ?
          {

          ????????RedisMessageListenerContainer?container?=?new?RedisMessageListenerContainer();
          ????????container.setConnectionFactory(connectionFactory);
          ????????//?訂閱指定頻道使用?ChannelTopic
          ????????//?訂閱模式使用?PatternTopic
          ????????container.addMessageListener(listenerAdapter,?new?ChannelTopic("pay_result"));

          ????????return?container;
          ????}

          ????@Bean
          ????MessageListenerAdapter?listenerAdapter(Receiver?receiver)?{
          ????????//?注入?Receiver,指定類中的接受方法
          ????????return?new?MessageListenerAdapter(receiver,?"receiveMessage");
          ????}

          ????@Bean
          ????Receiver?receiver()?{
          ????????return?new?Receiver();
          ????}

          ????@Bean
          ????StringRedisTemplate?template(RedisConnectionFactory?connectionFactory)?{
          ????????return?new?StringRedisTemplate(connectionFactory);
          ????}

          }

          最后我們使用 StringRedisTemplate#convertAndSend 發(fā)送消息,同時(shí) Receiver 將會(huì)收到一條消息。

          @SpringBootApplication
          public?class?MessagingRedisApplication?{
          ????public?static?void?main(String[]?args)?throws?InterruptedException?{

          ????????ApplicationContext?ctx?=?SpringApplication.run(MessagingRedisApplication.class,?args);

          ????????StringRedisTemplate?template?=?ctx.getBean(StringRedisTemplate.class);
          ????????Receiver?receiver?=?ctx.getBean(Receiver.class);

          ????????while?(receiver.getCount()?==?0)?{
          ????????????template.convertAndSend("pay_result",?"Hello?from?Redis!");
          ????????????Thread.sleep(500L);
          ????????}

          ????????System.exit(0);
          ????}
          }
          ddf2aff01d20971cbf21fa784d0dd6f2.webp

          Redis 發(fā)布訂閱實(shí)際應(yīng)用

          Redis Sentinel 節(jié)點(diǎn)發(fā)現(xiàn)

          「Redis Sentinel」 是 Redis 一套高可用方案,可以在主節(jié)點(diǎn)故障的時(shí)候,自動(dòng)將從節(jié)點(diǎn)提升為主節(jié)點(diǎn),從而轉(zhuǎn)移故障。

          今天這里我們不詳細(xì)解釋 「Redis Sentinel」 詳細(xì)原理,主要來(lái)看下 「Redis Sentinel」 如何使用發(fā)布訂閱機(jī)制。

          「Redis Sentinel」 ?節(jié)點(diǎn)主要使用發(fā)布訂閱機(jī)制,實(shí)現(xiàn)新節(jié)點(diǎn)的發(fā)現(xiàn),以及交換主節(jié)點(diǎn)的之間的狀態(tài)。

          如下所示,每一個(gè) ?「Sentinel」 ?節(jié)點(diǎn)將會(huì)定時(shí)向 _sentinel_:hello 頻道發(fā)送消息,并且每個(gè) 「Sentinel」 都會(huì)訂閱這個(gè)節(jié)點(diǎn)。

          8f7696f1990b9c27dd98947a6bddc9a0.webp

          這樣一旦有節(jié)點(diǎn)往這個(gè)頻道發(fā)送消息,其他節(jié)點(diǎn)就可以立刻收到消息。

          這樣一旦有的新節(jié)點(diǎn)加入,它往這個(gè)頻道發(fā)送消息,其他節(jié)點(diǎn)收到之后,判斷本地列表并沒(méi)有這個(gè)節(jié)點(diǎn),于是就可以當(dāng)做新的節(jié)點(diǎn)加入本地節(jié)點(diǎn)列表。

          除此之外,每次往這個(gè)頻道發(fā)送消息內(nèi)容可以包含節(jié)點(diǎn)的狀態(tài)信息,這樣可以作為后面 「Sentinel」 領(lǐng)導(dǎo)者選舉的依據(jù)。

          以上都是對(duì)于 Redis 服務(wù)端來(lái)講,對(duì)于客戶端來(lái)講,我們也可以用到發(fā)布訂閱機(jī)制。

          當(dāng) ?「Redis Sentinel」 進(jìn)行主節(jié)點(diǎn)故障轉(zhuǎn)移,這個(gè)過(guò)程各個(gè)階段會(huì)通過(guò)發(fā)布訂閱對(duì)外提供。

          對(duì)于我們客戶端來(lái)講,比較關(guān)心切換之后的主節(jié)點(diǎn),這樣我們及時(shí)切換主節(jié)點(diǎn)的連接(舊節(jié)點(diǎn)此時(shí)已故障,不能再接受操作指令),

          客戶端可以訂閱 +switch-master頻道,一旦 ?「Redis Sentinel」 ?結(jié)束了對(duì)主節(jié)點(diǎn)的故障轉(zhuǎn)移就會(huì)發(fā)布主節(jié)點(diǎn)的的消息。

          redission 分布式鎖

          redission 開源框架提供一些便捷操作 Redis 的方法,其中比較出名的 redission ?基于 Redis 的實(shí)現(xiàn)分布式鎖。

          今天我們來(lái)看下 Redis 的實(shí)現(xiàn)分布式鎖中如何使用 Redis 發(fā)布訂閱機(jī)制,提高加鎖的性能。

          ?

          PS:redission ?分布式鎖實(shí)現(xiàn)原理,可以參考之前寫過(guò)的文章:

          1. 可重入分布式鎖的實(shí)現(xiàn)方式
          2. Redis 分布式鎖,看似簡(jiǎn)單,其實(shí)真不簡(jiǎn)單
          ?

          首先我們來(lái)看下 redission ? 加鎖的方法:

          Redisson?redisson?=?....
          RLock?redissonLock?=?redisson.getLock("xxxx");
          redissonLock.lock();

          RLock 繼承自 Java 標(biāo)準(zhǔn)的 Lock 接口,調(diào)用 lock 方法,如果當(dāng)前鎖已被其他客戶端獲取,那么當(dāng)前加鎖的線程將會(huì)被阻塞,直到其他客戶端釋放這把鎖。

          這里其實(shí)有個(gè)問(wèn)題,當(dāng)前阻塞的線程如何感知分布式鎖已被釋放呢?

          這里其實(shí)有兩種實(shí)現(xiàn)方法:

          第一鐘,定時(shí)查詢分布時(shí)鎖的狀態(tài),一旦查到鎖已被釋放(Redis 中不存在這個(gè)鍵值),那么就去加鎖。

          實(shí)現(xiàn)偽碼如下:

          while?(true)?{
          ??boolean?result=lock();
          ??if?(!result)?{
          ????Thread.sleep(N);
          ??}
          }

          這種方式實(shí)現(xiàn)起來(lái)起來(lái)簡(jiǎn)單,不過(guò)缺點(diǎn)也比較多。

          如果定時(shí)任務(wù)時(shí)間過(guò)短,將會(huì)導(dǎo)致查詢次數(shù)過(guò)多,其實(shí)這些都是無(wú)效查詢。

          如果定時(shí)任務(wù)休眠時(shí)間過(guò)長(zhǎng),那又會(huì)導(dǎo)致加鎖時(shí)間過(guò)長(zhǎng),導(dǎo)致加鎖性能不好。

          那么第二種實(shí)現(xiàn)方案,就是采用服務(wù)通知的機(jī)制,當(dāng)分布式鎖被釋放之后,客戶端可以收到鎖釋放的消息,然后第一時(shí)間再去加鎖。

          這個(gè)服務(wù)通知的機(jī)制我們可以使用 Redis 發(fā)布訂閱模式。

          當(dāng)線程加鎖失敗之后,線程將會(huì)訂閱 redisson_lock__channel_xxx(xx 代表鎖的名稱) 頻道,使用異步線程監(jiān)聽消息,然后利用 Java 中 Semaphore 使當(dāng)前線程進(jìn)入阻塞。

          一旦其他客戶端進(jìn)行解鎖,redission ?就會(huì)往這個(gè)redisson_lock__channel_xxx 發(fā)送解鎖消息。

          等異步線程收到消息,將會(huì)調(diào)用 Semaphore 釋放信號(hào)量,從而讓當(dāng)前被阻塞的線程喚醒去加鎖。

          ?

          ps:這里只是簡(jiǎn)單描述了 redission ?加鎖部分原理,出于篇幅,這里就不再消息解析源碼。

          感興趣的小伙伴可以自己看下 redission ? 加鎖的源碼。

          ?

          通過(guò)發(fā)布訂閱機(jī)制,被阻塞的線程可以及時(shí)被喚醒,減少無(wú)效的空轉(zhuǎn)的查詢,有效的提高的加鎖的效率。

          ?

          ps: 這種方式,性能確實(shí)提高,但是實(shí)現(xiàn)起來(lái)的復(fù)雜度也很高,這部分源碼有點(diǎn)東西,快看暈了。

          ?

          總結(jié)

          今天我們主要介紹 Redis 發(fā)布訂閱功能,主要對(duì)應(yīng)的 Redis 命令為:

          • 「subscribe channel [channel ...]」 ?訂閱一個(gè)或多個(gè)頻道
          • 「unsubscribe channel」 ?退訂指定頻道
          • 「publish channel message」 發(fā)送消息
          • 「psubscribe pattern」 ?訂閱指定模式
          • 「punsubscribe pattern」 ?退訂指定模式

          我們可以利用 Redis ?發(fā)布訂閱功能,實(shí)現(xiàn)的簡(jiǎn)單 MQ 功能,實(shí)現(xiàn)上下游的解耦。

          不過(guò)需要注意了,由于 Redis 發(fā)布的消息不會(huì)被持久化,這就會(huì)導(dǎo)致新訂閱的客戶端將不會(huì)收到歷史消息。

          所以,如果當(dāng)前的業(yè)務(wù)場(chǎng)景不能容忍這些缺點(diǎn),那還是用專業(yè) MQ 吧。

          最后介紹了兩個(gè)使用 Redis 發(fā)布訂閱功能使用場(chǎng)景供大家參考。


          60eebf4755cf8de675cb2ebd34f3cf34.webp對(duì)了,看完記得一鍵三連,這個(gè)對(duì)我真的很重要。


          如果對(duì)你有用,歡迎 在看、點(diǎn)贊、轉(zhuǎn)發(fā) ,您的認(rèn)可是我最大的動(dòng)力。
          整理了幾百本各類技術(shù)電子書,送給小伙伴們。關(guān)注公號(hào)回復(fù)【666】自行領(lǐng)取。和一些小伙伴們建了一個(gè)技術(shù)交流群,一起探討技術(shù)、分享技術(shù)資料,旨在共同學(xué)習(xí)進(jìn)步,如果感興趣就加入我們吧!


          82f84e993f3355cd4c322e88651b99e8.webp82f84e993f3355cd4c322e88651b99e8.webp關(guān)注,邁開成長(zhǎng)的第一步
          瀏覽 118
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产中文字幕在线观看 | 91n成人久久 | 美女黄频免费 | 免费在线看片黄在线观看 | 黄色搞逼视频 |