<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          一萬字詳解 Redis Cluster Gossip 協(xié)議

          共 19708字,需瀏覽 40分鐘

           ·

          2020-12-07 12:34

          點擊上方"程序員歷小冰",選擇“置頂或者星標”

          ? ?你的關注意義重大!

          大家好,我是歷小冰,今天來講一下 Reids Cluster 的 Gossip 協(xié)議和集群操作,文章的思維導圖如下所示。

          集群模式和 Gossip 簡介

          對于數(shù)據(jù)存儲領域,當數(shù)據(jù)量或者請求流量大到一定程度后,就必然會引入分布式。比如 Redis,雖然其單機性能十分優(yōu)秀,但是因為下列原因時,也不得不引入集群。

          • 單機無法保證高可用,需要引入多實例來提供高可用性
          • 單機能夠提供高達 8W 左右的QPS,再高的QPS則需要引入多實例
          • 單機能夠支持的數(shù)據(jù)量有限,處理更多的數(shù)據(jù)需要引入多實例;
          • 單機所處理的網(wǎng)絡流量已經(jīng)超過服務器的網(wǎng)卡的上限值,需要引入多實例來分流。

          有集群,集群往往需要維護一定的元數(shù)據(jù),比如實例的ip地址,緩存分片的 slots 信息等,所以需要一套分布式機制來維護元數(shù)據(jù)的一致性。這類機制一般有兩個模式:分散式和集中式

          分散式機制將元數(shù)據(jù)存儲在部分或者所有節(jié)點上,不同節(jié)點之間進行不斷的通信來維護元數(shù)據(jù)的變更和一致性。Redis Cluster,Consul 等都是該模式。

          而集中式是將集群元數(shù)據(jù)集中存儲在外部節(jié)點或者中間件上,比如 zookeeper。舊版本的 kafka 和 storm 等都是使用該模式。
          兩種模式各有優(yōu)劣,具體如下表所示:
          模式優(yōu)點缺點
          集中式數(shù)據(jù)更新及時,時效好,元數(shù)據(jù)的更新和讀取,時效性非常好,一旦元數(shù)據(jù)出現(xiàn)了變更,立即就更新到集中式的外部節(jié)點中,其他節(jié)點讀取的時候立即就可以感知到;較大數(shù)據(jù)更新壓力,更新壓力全部集中在外部節(jié)點,作為單點影響整個系統(tǒng)
          分散式數(shù)據(jù)更新壓力分散,元數(shù)據(jù)的更新比較分散,不是集中某一個節(jié)點,更新請求比較分散,而且有不同節(jié)點處理,有一定的延時,降低了并發(fā)壓力數(shù)據(jù)更新延遲,可能導致集群的感知有一定的滯后

          分散式的元數(shù)據(jù)模式有多種可選的算法進行元數(shù)據(jù)的同步,比如說 Paxos、Raft 和 Gossip。Paxos 和 Raft 等都需要全部節(jié)點或者大多數(shù)節(jié)點(超過一半)正常運行,整個集群才能穩(wěn)定運行,而 Gossip 則不需要半數(shù)以上的節(jié)點運行。

          Gossip 協(xié)議,顧名思義,就像流言蜚語一樣,利用一種隨機、帶有傳染性的方式,將信息傳播到整個網(wǎng)絡中,并在一定時間內(nèi),使得系統(tǒng)內(nèi)的所有節(jié)點數(shù)據(jù)一致。對你來說,掌握這個協(xié)議不僅能很好地理解這種最常用的,實現(xiàn)最終一致性的算法,也能在后續(xù)工作中得心應手地實現(xiàn)數(shù)據(jù)的最終一致性。

          Gossip 協(xié)議又稱 epidemic 協(xié)議(epidemic protocol),是基于流行病傳播方式的節(jié)點或者進程之間信息交換的協(xié)議,在P2P網(wǎng)絡和分布式系統(tǒng)中應用廣泛,它的方法論也特別簡單:

          在一個處于有界網(wǎng)絡的集群里,如果每個節(jié)點都隨機與其他節(jié)點交換特定信息,經(jīng)過足夠長的時間后,集群各個節(jié)點對該份信息的認知終將收斂到一致。

          這里的“特定信息”一般就是指集群狀態(tài)、各節(jié)點的狀態(tài)以及其他元數(shù)據(jù)等。Gossip協(xié)議是完全符合 BASE 原則,可以用在任何要求最終一致性的領域,比如分布式存儲和注冊中心。另外,它可以很方便地實現(xiàn)彈性集群,允許節(jié)點隨時上下線,提供快捷的失敗檢測和動態(tài)負載均衡等。

          此外,Gossip 協(xié)議的最大的好處是,即使集群節(jié)點的數(shù)量增加,每個節(jié)點的負載也不會增加很多,幾乎是恒定的。這就允許 Redis Cluster 或者 Consul 集群管理的節(jié)點規(guī)模能橫向擴展到數(shù)千個。

          Redis Cluster 的 Gossip 通信機制

          Redis Cluster 是在 3.0 版本引入集群功能。為了讓讓集群中的每個實例都知道其他所有實例的狀態(tài)信息,Redis 集群規(guī)定各個實例之間按照 Gossip 協(xié)議來通信傳遞信息。

          上圖展示了主從架構的 Redis Cluster 示意圖,其中實線表示節(jié)點間的主從復制關系,而虛線表示各個節(jié)點之間的 Gossip 通信。

          Redis Cluster 中的每個節(jié)點都維護一份自己視角下的當前整個集群的狀態(tài),主要包括:

          1. 當前集群狀態(tài)
          2. 集群中各節(jié)點所負責的 slots信息,及其migrate狀態(tài)
          3. 集群中各節(jié)點的master-slave狀態(tài)
          4. 集群中各節(jié)點的存活狀態(tài)及懷疑Fail狀態(tài)

          也就是說上面的信息,就是集群中Node相互八卦傳播流言蜚語的內(nèi)容主題,而且比較全面,既有自己的更有別人的,這么一來大家都相互傳,最終信息就全面而且一致了。

          Redis Cluster 的節(jié)點之間會相互發(fā)送多種消息,較為重要的如下所示:

          • MEET:通過「cluster meet ip port」命令,已有集群的節(jié)點會向新的節(jié)點發(fā)送邀請,加入現(xiàn)有集群,然后新節(jié)點就會開始與其他節(jié)點進行通信;
          • PING:節(jié)點按照配置的時間間隔向集群中其他節(jié)點發(fā)送 ping 消息,消息中帶有自己的狀態(tài),還有自己維護的集群元數(shù)據(jù),和部分其他節(jié)點的元數(shù)據(jù);
          • PONG: ?節(jié)點用于回應 PING 和 MEET 的消息,結構和 PING 消息類似,也包含自己的狀態(tài)和其他信息,也可以用于信息廣播和更新;
          • FAIL: 節(jié)點 PING 不通某節(jié)點后,會向集群所有節(jié)點廣播該節(jié)點掛掉的消息。其他節(jié)點收到消息后標記已下線。

          Redis 的源碼中 cluster.h 文件定義了全部的消息類型,代碼為 redis 4.0版本。

          //?注意,PING 、 PONG 和 MEET 實際上是同一種消息。
          //?PONG?是對?PING?的回復,它的實際格式也為?PING?消息,
          //?而?MEET?則是一種特殊的?PING?消息,用于強制消息的接收者將消息的發(fā)送者添加到集群中(如果節(jié)點尚未在節(jié)點列表中的話)
          #define?CLUSTERMSG_TYPE_PING?0??????????/*?Ping?消息?*/
          #define?CLUSTERMSG_TYPE_PONG?1??????????/*?Pong?用于回復Ping?*/
          #define?CLUSTERMSG_TYPE_MEET?2??????????/*?Meet?請求將某個節(jié)點添加到集群中?*/
          #define?CLUSTERMSG_TYPE_FAIL?3??????????/*?Fail?將某個節(jié)點標記為?FAIL?*/
          #define?CLUSTERMSG_TYPE_PUBLISH?4???????/*?通過發(fā)布與訂閱功能廣播消息?*/
          #define?CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST?5?/*?請求進行故障轉移操作,要求消息的接收者通過投票來支持消息的發(fā)送者?*/
          #define?CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK?6?????/*?消息的接收者同意向消息的發(fā)送者投票?*/
          #define?CLUSTERMSG_TYPE_UPDATE?7????????/*?slots?已經(jīng)發(fā)生變化,消息發(fā)送者要求消息接收者進行相應的更新?*/
          #define?CLUSTERMSG_TYPE_MFSTART?8???????/*?為了進行手動故障轉移,暫停各個客戶端?*/
          #define?CLUSTERMSG_TYPE_COUNT?9?????????/*?消息總數(shù)?*/

          通過上述這些消息,集群中的每一個實例都能獲得其它所有實例的狀態(tài)信息。這樣一來,即使有新節(jié)點加入、節(jié)點故障、Slot 變更等事件發(fā)生,實例間也可以通過 PING、PONG 消息的傳遞,完成集群狀態(tài)在每個實例上的同步。下面,我們依次來看看幾種常見的場景。

          定時 PING/PONG 消息

          Redis Cluster 中的節(jié)點都會定時地向其他節(jié)點發(fā)送 PING 消息,來交換各個節(jié)點狀態(tài)信息,檢查各個節(jié)點狀態(tài),包括在線狀態(tài)、疑似下線狀態(tài) PFAIL 和已下線狀態(tài) FAIL。

          Redis 集群的定時 PING/PONG 的工作原理可以概括成兩點:

          • 一是,每個實例之間會按照一定的頻率,從集群中隨機挑選一些實例,把 PING 消息發(fā)送給挑選出來的實例,用來檢測這些實例是否在線,并交換彼此的狀態(tài)信息。PING 消息中封裝了發(fā)送消息的實例自身的狀態(tài)信息、部分其它實例的狀態(tài)信息,以及 Slot 映射表。
          • 二是,一個實例在接收到 PING 消息后,會給發(fā)送 PING 消息的實例,發(fā)送一個 PONG 消息。PONG 消息包含的內(nèi)容和 PING 消息一樣。

          下圖顯示了兩個實例間進行 PING、PONG 消息傳遞的情況,其中實例一為發(fā)送節(jié)點,實例二是接收節(jié)點


          新節(jié)點上線

          Redis Cluster 加入新節(jié)點時,客戶端需要執(zhí)行 CLUSTER MEET 命令,如下圖所示。


          節(jié)點一在執(zhí)行 CLUSTER MEET 命令時會首先為新節(jié)點創(chuàng)建一個 clusterNode 數(shù)據(jù),并將其添加到自己維護的 clusterState 的 nodes 字典中。有關 clusterState 和 clusterNode 關系,我們在最后一節(jié)會有詳盡的示意圖和源碼來講解。

          然后節(jié)點一會根據(jù)據(jù) CLUSTER MEET 命令中的 IP 地址和端口號,向新節(jié)點發(fā)送一條 MEET 消息。新節(jié)點接收到節(jié)點一發(fā)送的MEET消息后,新節(jié)點也會為節(jié)點一創(chuàng)建一個 clusterNode 結構,并將該結構添加到自己維護的 clusterState 的 nodes 字典中。

          接著,新節(jié)點向節(jié)點一返回一條PONG消息。節(jié)點一接收到節(jié)點B返回的PONG消息后,得知新節(jié)點已經(jīng)成功的接收了自己發(fā)送的MEET消息。

          最后,節(jié)點一還會向新節(jié)點發(fā)送一條 PING 消息。新節(jié)點接收到該條 PING 消息后,可以知道節(jié)點A已經(jīng)成功的接收到了自己返回的P ONG消息,從而完成了新節(jié)點接入的握手操作。

          MEET 操作成功之后,節(jié)點一會通過稍早時講的定時 PING 機制將新節(jié)點的信息發(fā)送給集群中的其他節(jié)點,讓其他節(jié)點也與新節(jié)點進行握手,最終,經(jīng)過一段時間后,新節(jié)點會被集群中的所有節(jié)點認識。

          節(jié)點疑似下線和真正下線

          Redis Cluster 中的節(jié)點會定期檢查已經(jīng)發(fā)送 PING 消息的接收方節(jié)點是否在規(guī)定時間 ( cluster-node-timeout ) 內(nèi)返回了 PONG 消息,如果沒有則會將其標記為疑似下線狀態(tài),也就是 PFAIL 狀態(tài),如下圖所示。


          然后,節(jié)點一會通過 PING 消息,將節(jié)點二處于疑似下線狀態(tài)的信息傳遞給其他節(jié)點,例如節(jié)點三。節(jié)點三接收到節(jié)點一的 PING 消息得知節(jié)點二進入 PFAIL 狀態(tài)后,會在自己維護的 clusterState 的 nodes 字典中找到節(jié)點二所對應的 clusterNode 結構,并將主節(jié)點一的下線報告添加到 clusterNode 結構的 fail_reports 鏈表中。


          隨著時間的推移,如果節(jié)點十 (舉個例子) 也因為 PONG 超時而認為節(jié)點二疑似下線了,并且發(fā)現(xiàn)自己維護的節(jié)點二的 clusterNode 的 fail_reports 中有半數(shù)以上的主節(jié)點數(shù)量的未過時的將節(jié)點二標記為 PFAIL 狀態(tài)報告日志,那么節(jié)點十將會把節(jié)點二將被標記為已下線 FAIL 狀態(tài),并且節(jié)點十會立刻向集群其他節(jié)點廣播主節(jié)點二已經(jīng)下線的 FAIL 消息,所有收到 FAIL 消息的節(jié)點都會立即將節(jié)點二狀態(tài)標記為已下線。如下圖所示。


          需要注意的是,報告疑似下線記錄是由時效性的,如果超過 cluster-node-timeout *2 的時間,這個報告就會被忽略掉,讓節(jié)點二又恢復成正常狀態(tài)。

          Redis Cluster 通信源碼實現(xiàn)

          綜上,我們了解了 Redis Cluster 在定時 PING/PONG、新節(jié)點上線、節(jié)點疑似下線和真正下線等環(huán)節(jié)的原理和操作流程,下面我們來真正看一下 Redis 在這些環(huán)節(jié)的源碼實現(xiàn)和具體操作。

          涉及的數(shù)據(jù)結構體

          首先,我們先來講解一下其中涉及的數(shù)據(jù)結構,也就是上文提到的 ClusterNode 等結構。

          每個節(jié)點都會維護一個 clusterState 結構,表示當前集群的整體狀態(tài),它的定義如下所示。

          typedef?struct?clusterState?{
          ???clusterNode?*myself;??/*?當前節(jié)點的clusterNode信息?*/
          ???....
          ???dict?*nodes;??????????/*?name到clusterNode的字典?*/
          ???....
          ???clusterNode?*slots[CLUSTER_SLOTS];?/*?slot?和節(jié)點的對應關系*/
          ???....
          }?clusterState;

          它有三個比較關鍵的字段,具體示意圖如下所示:

          • myself 字段,是一個 clusterNode 結構,用來記錄自己的狀態(tài);
          • nodes 字典,記錄一個 name 到 clusterNode 結構的映射,以此來記錄其他節(jié)點的狀態(tài);
          • slot 數(shù)組,記錄slot 對應的節(jié)點 clusterNode結構。

          clusterNode 結構保存了一個節(jié)點的當前狀態(tài),比如節(jié)點的創(chuàng)建時間、節(jié)點的名字、節(jié)點 當前的配置紀元、節(jié)點的IP地址和端口號等等。除此之外,clusterNode結構的 link 屬性是一個clusterLink結構,該結構保存了連接節(jié)點所需的有關信息**,比如**套接字描述符,輸入緩沖區(qū)和輸出緩沖區(qū)。clusterNode 還有一個 fail_report 的列表,用來記錄疑似下線報告。具體定義如下所示。

          typedef?struct?clusterNode?{
          ????mstime_t?ctime;?/*?創(chuàng)建節(jié)點的時間?*/
          ????char?name[CLUSTER_NAMELEN];?/*?節(jié)點的名字?*/
          ????int?flags;??????/*?節(jié)點標識,標記節(jié)點角色或者狀態(tài),比如主節(jié)點從節(jié)點或者在線和下線?*/
          ????uint64_t?configEpoch;?/*?當前節(jié)點已知的集群統(tǒng)一epoch?*/
          ????unsigned?char?slots[CLUSTER_SLOTS/8];?/*?slots?handled?by?this?node?*/
          ????int?numslots;???/*?Number?of?slots?handled?by?this?node?*/
          ????int?numslaves;??/*?Number?of?slave?nodes,?if?this?is?a?master?*/
          ????struct?clusterNode?**slaves;?/*?pointers?to?slave?nodes?*/
          ????struct?clusterNode?*slaveof;?/*?pointer?to?the?master?node.?Note?that?it
          ????????????????????????????????????may?be?NULL?even?if?the?node?is?a?slave
          ????????????????????????????????????if?we?don't?have?the?master?node?in?our
          ????????????????????????????????????tables.?*/

          ????mstime_t?ping_sent;??????/*?當前節(jié)點最后一次向該節(jié)點發(fā)送?PING?消息的時間?*/
          ????mstime_t?pong_received;??/*?當前節(jié)點最后一次收到該節(jié)點?PONG?消息的時間?*/
          ????mstime_t?fail_time;??????/*?FAIL?標志位被設置的時間?*/
          ????mstime_t?voted_time;?????/*?Last?time?we?voted?for?a?slave?of?this?master?*/
          ????mstime_t?repl_offset_time;??/*?Unix?time?we?received?offset?for?this?node?*/
          ????mstime_t?orphaned_time;?????/*?Starting?time?of?orphaned?master?condition?*/
          ????long?long?repl_offset;??????/*?當前節(jié)點的repl便宜?*/
          ????char?ip[NET_IP_STR_LEN];??/*?節(jié)點的IP?地址?*/
          ????int?port;???????????????????/*?端口?*/
          ????int?cport;??????????????????/*?通信端口,一般是端口+1000?*/
          ????clusterLink?*link;??????????/*?和該節(jié)點的?tcp?連接?*/
          ????list?*fail_reports;?????????/*?下線記錄列表?*/
          }?clusterNode;

          clusterNodeFailReport 是記錄節(jié)點下線報告的結構體, node 是報告節(jié)點的信息,而 time 則代表著報告時間。

          typedef?struct?clusterNodeFailReport?{
          ????struct?clusterNode?*node;??/*?報告當前節(jié)點已經(jīng)下線的節(jié)點?*/
          ????mstime_t?time;?????????????/*?報告時間?*/
          }?clusterNodeFailReport;

          消息結構體

          了解了 Reids 節(jié)點維護的數(shù)據(jù)結構體后,我們再來看節(jié)點進行通信的消息結構體。通信消息最外側的結構體為 clusterMsg,它包括了很多消息記錄信息,包括 RCmb 標志位,消息總長度,消息協(xié)議版本,消息類型;它還包括了發(fā)送該消息節(jié)點的記錄信息,比如節(jié)點名稱,節(jié)點負責的slot信息,節(jié)點ip和端口等;最后它包含了一個 clusterMsgData 來攜帶具體類型的消息。

          typedef?struct?{
          ????char?sig[4];????????/*?標志位,"RCmb"?(Redis?Cluster?message?bus).?*/
          ????uint32_t?totlen;????/*?消息總長度?*/
          ????uint16_t?ver;???????/*?消息協(xié)議版本?*/
          ????uint16_t?port;??????/*?端口?*/
          ????uint16_t?type;??????/*?消息類型?*/
          ????uint16_t?count;?????/*??*/
          ????uint64_t?currentEpoch;??/*?表示本節(jié)點當前記錄的整個集群的統(tǒng)一的epoch,用來決策選舉投票等,與configEpoch不同的是:configEpoch表示的是master節(jié)點的唯一標志,currentEpoch是集群的唯一標志。?*/
          ????uint64_t?configEpoch;???/*?每個master節(jié)點都有一個唯一的configEpoch做標志,如果和其他master節(jié)點沖突,會強制自增使本節(jié)點在集群中唯一?*/
          ????uint64_t?offset;????/*?主從復制偏移相關信息,主節(jié)點和從節(jié)點含義不同?*/
          ????char?sender[CLUSTER_NAMELEN];?/*?發(fā)送節(jié)點的名稱?*/
          ????unsigned?char?myslots[CLUSTER_SLOTS/8];?/*?本節(jié)點負責的slots信息,16384/8個char數(shù)組,一共為16384bit?*/
          ????char?slaveof[CLUSTER_NAMELEN];?/*?master信息,假如本節(jié)點是slave節(jié)點的話,協(xié)議帶有master信息?*/
          ????char?myip[NET_IP_STR_LEN];????/*?IP?*/
          ????char?notused1[34];??/*?保留字段?*/
          ????uint16_t?cport;??????/*?集群的通信端口?*/
          ????uint16_t?flags;??????/*?本節(jié)點當前的狀態(tài),比如?CLUSTER_NODE_HANDSHAKE、CLUSTER_NODE_MEET?*/
          ????unsigned?char?state;?/*?Cluster?state?from?the?POV?of?the?sender?*/
          ????unsigned?char?mflags[3];?/*?本條消息的類型,目前只有兩類:CLUSTERMSG_FLAG0_PAUSED、CLUSTERMSG_FLAG0_FORCEACK */
          ????union?clusterMsgData?data;
          }?clusterMsg;

          clusterMsgData 是一個 union 結構體,它可以為 PING,MEET,PONG 或者 FAIL 等消息體。其中當消息為 PING、MEET 和 PONG 類型時,ping 字段是被賦值的,而是 FAIL 類型時,fail 字段是被賦值的。

          //?注意這是?union?關鍵字
          union?clusterMsgData?{
          ????/*?PING,?MEET?或者?PONG?消息時,ping?字段被賦值?*/
          ????struct?{
          ????????/*?Array?of?N?clusterMsgDataGossip?structures?*/
          ????????clusterMsgDataGossip?gossip[1];
          ????}?ping;
          ????/*??FAIL?消息時,fail?被賦值?*/
          ????struct?{
          ????????clusterMsgDataFail?about;
          ????}?fail;
          ????//?....?省略?publish?和?update?消息的字段
          };

          clusterMsgDataGossip 是 PING、PONG 和 MEET 消息的結構體,它會包括發(fā)送消息節(jié)點維護的其他節(jié)點信息,也就是上文中 clusterState 中 nodes 字段包含的信息,具體代碼如下所示,你也會發(fā)現(xiàn)二者的字段是類似的。

          typedef?struct?{
          ?/*?節(jié)點的名字,默認是隨機的,MEET消息發(fā)送并得到回復后,集群會為該節(jié)點設置正式的名稱*/
          ????char?nodename[CLUSTER_NAMELEN];?
          ????uint32_t?ping_sent;?/*?發(fā)送節(jié)點最后一次給接收節(jié)點發(fā)送?PING?消息的時間戳,收到對應?PONG?回復后會被賦值為0?*/
          ????uint32_t?pong_received;?/*?發(fā)送節(jié)點最后一次收到接收節(jié)點發(fā)送?PONG?消息的時間戳?*/
          ????char?ip[NET_IP_STR_LEN];??/*?IP?address?last?time?it?was?seen?*/
          ????uint16_t?port;???????/*?IP*/???????
          ????uint16_t?cport;??????/*?端口*/??
          ????uint16_t?flags;??????/*?標識*/?
          ????uint32_t?notused1;???/*?對齊字符*/
          }?clusterMsgDataGossip;

          typedef?struct?{
          ????char?nodename[CLUSTER_NAMELEN];?/*?下線節(jié)點的名字?*/
          }?clusterMsgDataFail;

          看完了節(jié)點維護的數(shù)據(jù)結構體和發(fā)送的消息結構體后,我們就來看看 Redis 的具體行為源碼了。

          隨機周期性發(fā)送PING消息

          Redis 的 clusterCron 函數(shù)會被定時調(diào)用,每被執(zhí)行10次,就會準備向隨機的一個節(jié)點發(fā)送 PING 消息。

          它會先隨機的選出 5 個節(jié)點,然后從中選擇最久沒有與之通信的節(jié)點,調(diào)用 clusterSendPing 函數(shù)發(fā)送類型為 CLUSTERMSG_TYPE_PING 的消息

          //?cluster.c?文件?
          //?clusterCron()?每執(zhí)行?10?次(至少間隔一秒鐘),就向一個隨機節(jié)點發(fā)送?gossip?信息
          if?(!(iteration?%?10))?{
          ????int?j;

          ????/*?隨機?5?個節(jié)點,選出其中一個?*/
          ????for?(j?=?0;?j?5;?j++)?{
          ????????de?=?dictGetRandomKey(server.cluster->nodes);
          ????????clusterNode?*this?=?dictGetVal(de);

          ????????/*?不要?PING?連接斷開的節(jié)點,也不要?PING?最近已經(jīng)?PING?過的節(jié)點?*/
          ????????if?(this->link?==?NULL?||?this->ping_sent?!=?0)?continue;
          ????????if?(this->flags?&?(CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
          ????????????continue;
          ????????/*?對比?pong_received?字段,選出更長時間未收到其?PONG?消息的節(jié)點(表示好久沒有接受到該節(jié)點的PONG消息了)?*/
          ????????if?(min_pong_node?==?NULL?||?min_pong?>?this->pong_received)?{
          ????????????min_pong_node?=?this;
          ????????????min_pong?=?this->pong_received;
          ????????}
          ????}
          ????/*?向最久沒有收到?PONG?回復的節(jié)點發(fā)送?PING?命令?*/
          ????if?(min_pong_node)?{
          ????????serverLog(LL_DEBUG,"Pinging?node?%.40s",?min_pong_node->name);
          ????????clusterSendPing(min_pong_node->link,?CLUSTERMSG_TYPE_PING);
          ????}
          }

          clusterSendPing 函數(shù)的具體行為我們后續(xù)再了解,因為該函數(shù)在其他環(huán)節(jié)也會經(jīng)常用到

          節(jié)點加入集群

          當節(jié)點執(zhí)行 CLUSTER MEET 命令后,會在自身給新節(jié)點維護一個 clusterNode 結構體,該結構體的 link 也就是TCP連接字段是 null,表示是新節(jié)點尚未建立連接。

          clusterCron 函數(shù)中也會處理這些未建立連接的新節(jié)點,調(diào)用 createClusterLink 創(chuàng)立連接,然后調(diào)用 clusterSendPing 函數(shù)來發(fā)送 MEET 消息

          /*?cluster.c?clusterCron?函數(shù)部分,為未創(chuàng)建連接的節(jié)點創(chuàng)建連接?*/
          if?(node->link?==?NULL)?{
          ????int?fd;
          ????mstime_t?old_ping_sent;
          ????clusterLink?*link;
          ????/*?和該節(jié)點建立連接?*/
          ????fd?=?anetTcpNonBlockBindConnect(server.neterr,?node->ip,
          ????????node->cport,?NET_FIRST_BIND_ADDR);
          ????/*?....?fd?為-1時的異常處理?*/
          ????/*?建立?link?*/
          ????link?=?createClusterLink(node);
          ????link->fd?=?fd;
          ????node->link?=?link;
          ????aeCreateFileEvent(server.el,link->fd,AE_READABLE,
          ????????????clusterReadHandler,link);
          ????/*?向新連接的節(jié)點發(fā)送?PING?命令,防止節(jié)點被識進入下線?*/
          ????/*?如果節(jié)點被標記為?MEET?,那么發(fā)送?MEET?命令,否則發(fā)送?PING?命令?*/
          ????old_ping_sent?=?node->ping_sent;
          ????clusterSendPing(link,?node->flags?&?CLUSTER_NODE_MEET??
          ????????????CLUSTERMSG_TYPE_MEET?:?CLUSTERMSG_TYPE_PING);
          ????/*?....?*/
          ????/*?如果當前節(jié)點(發(fā)送者)沒能收到 MEET 信息的回復,那么它將不再向目標節(jié)點發(fā)送命令。*/
          ????/*?如果接收到回復的話,那么節(jié)點將不再處于?HANDSHAKE?狀態(tài),并繼續(xù)向目標節(jié)點發(fā)送普通?PING?命令*/
          ????node->flags?&=?~CLUSTER_NODE_MEET;
          }

          防止節(jié)點假超時及狀態(tài)過期

          防止節(jié)點假超時和標記疑似下線標記也是在 clusterCron 函數(shù)中,具體如下所示。它會檢查當前所有的 nodes 節(jié)點列表,如果發(fā)現(xiàn)某個節(jié)點與自己的最后一個 PONG 通信時間超過了預定的閾值的一半時,為了防止節(jié)點是假超時,會主動釋放掉與之的 link 連接,然后會主動向它發(fā)送一個 PING 消息。

          /*?cluster.c?clusterCron?函數(shù)部分,遍歷節(jié)點來檢查?fail?的節(jié)點*/
          while((de?=?dictNext(di))?!=?NULL)?{
          ????clusterNode?*node?=?dictGetVal(de);
          ????now?=?mstime();?/*?Use?an?updated?time?at?every?iteration.?*/
          ????mstime_t?delay;

          ????/*?如果等到?PONG?到達的時間超過了?node?timeout?一半的連接?*/
          ????/*?因為盡管節(jié)點依然正常,但連接可能已經(jīng)出問題了?*/
          ????if?(node->link?&&?/*?is?connected?*/
          ????????now?-?node->link->ctime?>
          ????????server.cluster_node_timeout?&&?/*?還未重連?*/
          ????????node->ping_sent?&&?/*?已經(jīng)發(fā)過ping消息?*/
          ????????node->pong_received?ping_sent?&&?/*?還在等待pong消息?*/
          ????????/*?等待pong消息超過了?timeout/2?*/
          ????????now?-?node->ping_sent?>?server.cluster_node_timeout/2)
          ????{
          ????????/*?釋放連接,下次?clusterCron()?會自動重連?*/
          ????????freeClusterLink(node->link);
          ????}

          ????/*?如果目前沒有在?PING?節(jié)點*/
          ????/*?并且已經(jīng)有?node?timeout?一半的時間沒有從節(jié)點那里收到?PONG?回復?*/
          ????/*?那么向節(jié)點發(fā)送一個?PING?,確保節(jié)點的信息不會太舊,有可能一直沒有隨機中?*/
          ????if?(node->link?&&
          ????????node->ping_sent?==?0?&&
          ????????(now?-?node->pong_received)?>?server.cluster_node_timeout/2)
          ????{
          ????????clusterSendPing(node->link,?CLUSTERMSG_TYPE_PING);
          ????????continue;
          ????}
          ????/*?....?處理failover和標記遺失下線?*/
          }

          處理failover和標記疑似下線

          如果防止節(jié)點假超時處理后,節(jié)點依舊未收到目標節(jié)點的 PONG 消息,并且時間已經(jīng)超過了 cluster_node_timeout,那么就將該節(jié)點標記為疑似下線狀態(tài)。

          /*?如果這是一個主節(jié)點,并且有一個從服務器請求進行手動故障轉移,那么向從服務器發(fā)送?PING*/
          if?(server.cluster->mf_end?&&
          ????nodeIsMaster(myself)?&&
          ????server.cluster->mf_slave?==?node?&&
          ????node->link)
          {
          ????clusterSendPing(node->link,?CLUSTERMSG_TYPE_PING);
          ????continue;
          }

          /*?后續(xù)代碼只在節(jié)點發(fā)送了?PING?命令的情況下執(zhí)行*/
          if?(node->ping_sent?==?0)?continue;

          /*?計算等待?PONG?回復的時長?*/?
          delay?=?now?-?node->ping_sent;
          /*?等待?PONG?回復的時長超過了限制值,將目標節(jié)點標記為?PFAIL?(疑似下線)*/
          if?(delay?>?server.cluster_node_timeout)?{
          ????/*?超時了,標記為疑似下線?*/
          ????if?(!(node->flags?&?(REDIS_NODE_PFAIL|REDIS_NODE_FAIL)))?{
          ????????redisLog(REDIS_DEBUG,"***?NODE?%.40s?possibly?failing",
          ????????????node->name);
          ????????//?打開疑似下線標記
          ????????node->flags?|=?REDIS_NODE_PFAIL;
          ????????update_state?=?1;
          ????}
          }

          實際發(fā)送Gossip消息

          以下是前方多次調(diào)用過的clusterSendPing()方法的源碼,代碼中有詳細的注釋,大家可以自行閱讀。主要的操作就是將節(jié)點自身維護的 clusterState 轉換為對應的消息結構體,。

          /*?向指定節(jié)點發(fā)送一條?MEET?、?PING?或者?PONG?消息?*/
          void?clusterSendPing(clusterLink?*link,?int?type)?{
          ????unsigned?char?*buf;
          ????clusterMsg?*hdr;
          ????int?gossipcount?=?0;?/*?Number?of?gossip?sections?added?so?far.?*/
          ????int?wanted;?/*?Number?of?gossip?sections?we?want?to?append?if?possible.?*/
          ????int?totlen;?/*?Total?packet?length.?*/
          ????//?freshnodes?是用于發(fā)送?gossip?信息的計數(shù)器
          ????//?每次發(fā)送一條信息時,程序將?freshnodes?的值減一
          ????//?當?freshnodes?的數(shù)值小于等于?0?時,程序停止發(fā)送?gossip?信息
          ????//?freshnodes?的數(shù)量是節(jié)點目前的?nodes?表中的節(jié)點數(shù)量減去?2?
          ????//?這里的?2?指兩個節(jié)點,一個是?myself?節(jié)點(也即是發(fā)送信息的這個節(jié)點)
          ????//?另一個是接受?gossip?信息的節(jié)點
          ????int?freshnodes?=?dictSize(server.cluster->nodes)-2;

          ????
          ????/*?計算要攜帶多少節(jié)點的信息,最少3個,最多?1/10?集群總節(jié)點數(shù)量*/
          ????wanted?=?floor(dictSize(server.cluster->nodes)/10);
          ????if?(wanted?3)?wanted?=?3;
          ????if?(wanted?>?freshnodes)?wanted?=?freshnodes;

          ????/*?....?省略?totlen?的計算等*/

          ????/*?如果發(fā)送的信息是?PING?,那么更新最后一次發(fā)送?PING?命令的時間戳?*/
          ????if?(link->node?&&?type?==?CLUSTERMSG_TYPE_PING)
          ????????link->node->ping_sent?=?mstime();
          ????/*?將當前節(jié)點的信息(比如名字、地址、端口號、負責處理的槽)記錄到消息里面?*/
          ????clusterBuildMessageHdr(hdr,type);

          ????/*?Populate?the?gossip?fields?*/
          ????int?maxiterations?=?wanted*3;
          ????/*?每個節(jié)點有?freshnodes?次發(fā)送?gossip?信息的機會
          ???????每次向目標節(jié)點發(fā)送?2?個被選中節(jié)點的?gossip?信息(gossipcount?計數(shù))?*/

          ????while(freshnodes?>?0?&&?gossipcount?????????/*?從?nodes?字典中隨機選出一個節(jié)點(被選中節(jié)點)?*/
          ????????dictEntry?*de?=?dictGetRandomKey(server.cluster->nodes);
          ????????clusterNode?*this?=?dictGetVal(de);

          ????????/*?以下節(jié)點不能作為被選中節(jié)點:
          ?????????* Myself:節(jié)點本身。
          ?????????*?PFAIL狀態(tài)的節(jié)點
          ?????????*?處于 HANDSHAKE 狀態(tài)的節(jié)點。
          ?????????*?帶有?NOADDR?標識的節(jié)點
          ?????????*?因為不處理任何?Slot?而被斷開連接的節(jié)點?
          ?????????*/

          ????????if?(this?==?myself)?continue;
          ????????if?(this->flags?&?CLUSTER_NODE_PFAIL)?continue;
          ????????if?(this->flags?&?(CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR)?||
          ????????????(this->link?==?NULL?&&?this->numslots?==?0))
          ????????{
          ????????????freshnodes--;?/*?Tecnically?not?correct,?but?saves?CPU.?*/
          ????????????continue;
          ????????}

          ????????//?檢查被選中節(jié)點是否已經(jīng)在?hdr->data.ping.gossip?數(shù)組里面
          ????????//?如果是的話說明這個節(jié)點之前已經(jīng)被選中了
          ????????//?不要再選中它(否則就會出現(xiàn)重復)
          ????????if?(clusterNodeIsInGossipSection(hdr,gossipcount,this))?continue;

          ????????/*?這個被選中節(jié)點有效,計數(shù)器減一?*/
          ????????clusterSetGossipEntry(hdr,gossipcount,this);
          ????????freshnodes--;
          ????????gossipcount++;
          ????}

          ????/*?....?如果有?PFAIL?節(jié)點,最后添加?*/


          ????/*?計算信息長度?*/
          ????totlen?=?sizeof(clusterMsg)-sizeof(union?clusterMsgData);
          ????totlen?+=?(sizeof(clusterMsgDataGossip)*gossipcount);
          ????/*?將被選中節(jié)點的數(shù)量(gossip?信息中包含了多少個節(jié)點的信息)記錄在?count?屬性里面*/
          ????hdr->count?=?htons(gossipcount);
          ????/*?將信息的長度記錄到信息里面?*/
          ????hdr->totlen?=?htonl(totlen);
          ????/*?發(fā)送網(wǎng)絡請求?*/
          ????clusterSendMessage(link,buf,totlen);
          ????zfree(buf);
          }


          void?clusterSetGossipEntry(clusterMsg?*hdr,?int?i,?clusterNode?*n)?{
          ????clusterMsgDataGossip?*gossip;
          ????/*?指向?gossip?信息結構?*/
          ????gossip?=?&(hdr->data.ping.gossip[i]);
          ????/*?將被選中節(jié)點的名字記錄到?gossip?信息?*/???
          ????memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
          ????/*?將被選中節(jié)點的?PING?命令發(fā)送時間戳記錄到?gossip?信息?*/
          ????gossip->ping_sent?=?htonl(n->ping_sent/1000);
          ????/*?將被選中節(jié)點的?PONG?命令回復的時間戳記錄到?gossip?信息?*/
          ????gossip->pong_received?=?htonl(n->pong_received/1000);
          ????/*?將被選中節(jié)點的?IP?記錄到?gossip?信息?*/
          ????memcpy(gossip->ip,n->ip,sizeof(n->ip));
          ????/*?將被選中節(jié)點的端口號記錄到?gossip?信息?*/
          ????gossip->port?=?htons(n->port);
          ????gossip->cport?=?htons(n->cport);
          ????/*?將被選中節(jié)點的標識值記錄到?gossip?信息?*/
          ????gossip->flags?=?htons(n->flags);
          ????gossip->notused1?=?0;
          }

          下面是 clusterBuildMessageHdr 函數(shù),它主要負責填充消息結構體中的基礎信息和當前節(jié)點的狀態(tài)信息。

          /*?構建消息的?header?*/
          void?clusterBuildMessageHdr(clusterMsg?*hdr,?int?type)?{
          ????int?totlen?=?0;
          ????uint64_t?offset;
          ????clusterNode?*master;

          ????/*?如果當前節(jié)點是salve,則master為其主節(jié)點,如果當前節(jié)點是master節(jié)點,則master就是當前節(jié)點?*/
          ????master?=?(nodeIsSlave(myself)?&&?myself->slaveof)??
          ??????????????myself->slaveof?:?myself;

          ????memset(hdr,0,sizeof(*hdr));
          ????/*?初始化協(xié)議版本、標識、及類型,?*/
          ????hdr->ver?=?htons(CLUSTER_PROTO_VER);
          ????hdr->sig[0]?=?'R';
          ????hdr->sig[1]?=?'C';
          ????hdr->sig[2]?=?'m';
          ????hdr->sig[3]?=?'b';
          ????hdr->type?=?htons(type);
          ????/*?消息頭設置當前節(jié)點id?*/
          ????memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);

          ????/*?消息頭設置當前節(jié)點ip?*/
          ????memset(hdr->myip,0,NET_IP_STR_LEN);
          ????if?(server.cluster_announce_ip)?{
          ????????strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
          ????????hdr->myip[NET_IP_STR_LEN-1]?=?'\0';
          ????}

          ????/*?基礎端口及集群內(nèi)節(jié)點通信端口?*/
          ????int?announced_port?=?server.cluster_announce_port??
          ?????????????????????????server.cluster_announce_port?:?server.port;
          ????int?announced_cport?=?server.cluster_announce_bus_port??
          ??????????????????????????server.cluster_announce_bus_port?:
          ??????????????????????????(server.port?+?CLUSTER_PORT_INCR);
          ????/*?設置當前節(jié)點的槽信息?*/
          ????memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
          ????memset(hdr->slaveof,0,CLUSTER_NAMELEN);
          ????if?(myself->slaveof?!=?NULL)
          ????????memcpy(hdr->slaveof,myself->slaveof->name,?CLUSTER_NAMELEN);
          ????hdr->port?=?htons(announced_port);
          ????hdr->cport?=?htons(announced_cport);
          ????hdr->flags?=?htons(myself->flags);
          ????hdr->state?=?server.cluster->state;

          ????/*?設置?currentEpoch?and?configEpochs.?*/
          ????hdr->currentEpoch?=?htonu64(server.cluster->currentEpoch);
          ????hdr->configEpoch?=?htonu64(master->configEpoch);

          ????/*?設置復制偏移量?*/
          ????if?(nodeIsSlave(myself))
          ????????offset?=?replicationGetSlaveOffset();
          ????else
          ????????offset?=?server.master_repl_offset;
          ????hdr->offset?=?htonu64(offset);

          ????/*?Set?the?message?flags.?*/
          ????if?(nodeIsMaster(myself)?&&?server.cluster->mf_end)
          ????????hdr->mflags[0]?|=?CLUSTERMSG_FLAG0_PAUSED;

          ????/*?計算并設置消息的總長度?*/
          ????if?(type?==?CLUSTERMSG_TYPE_FAIL)?{
          ????????totlen?=?sizeof(clusterMsg)-sizeof(union?clusterMsgData);
          ????????totlen?+=?sizeof(clusterMsgDataFail);
          ????}?else?if?(type?==?CLUSTERMSG_TYPE_UPDATE)?{
          ????????totlen?=?sizeof(clusterMsg)-sizeof(union?clusterMsgData);
          ????????totlen?+=?sizeof(clusterMsgDataUpdate);
          ????}
          ????hdr->totlen?=?htonl(totlen);
          }

          后記

          本來只想寫一下 Redis Cluster 的 Gossip 協(xié)議,沒想到文章越寫,內(nèi)容越多,最后源碼分析也是有點虎頭蛇尾,大家就湊合看一下,也希望大家繼續(xù)關注我后續(xù)的問題。

          -關注我


          瀏覽 44
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黑人大屌轮奸视频播放免费成人 | 中文字幕在线播放第一页 | 国产精品无码无卡无需播放器 | 一区二区豆花视频 | 台湾无码免费 |