Redis 主從復(fù)制(Replication)
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
? 作者?|? 原野漫步
來源 |? urlify.cn/Zfmuau
76套java從入門到精通實(shí)戰(zhàn)課程分享
為了保證服務(wù)的可用性,現(xiàn)代數(shù)據(jù)庫都提供了復(fù)制功能,同時(shí)在多個(gè)進(jìn)程中維護(hù)一致的數(shù)據(jù)狀態(tài)。
Redis 支持一主多從的復(fù)制架構(gòu),該功能被簡化成了一條?SLAVEOF?命令,下面通過條命令來解析 Redis 的主從復(fù)制機(jī)制。
通過 tcpdump 觀察
在本機(jī)上通過 redis-server 啟動(dòng)兩個(gè)服務(wù),然后通過 tcpdump 觀察主從間的交互情況:
redis-server?--port?6379?--requirepass?123456?#?啟動(dòng)?master
redis-server?--port?6380?--masterauth?123456??#?啟動(dòng)?slave
tcpdump?-t?-i?lo0?host?localhost?and?port?6379?|?awk?-F?']'?'{print?$1"]"$3}'
#?在?localhost:6380?上執(zhí)行?SLAVEOF?localhost?6379?建立同步連接,進(jìn)入?Full-ReSync?階段
localhost.59297?>?localhost.6379:?Flags?[S]
localhost.6379?>?localhost.59297:?Flags?[S.]
localhost.59297?>?localhost.6379:?Flags?[P.]?"PING"
localhost.6379?>?localhost.59297:?Flags?[P.]?"NOAUTH?Authentication?required."
localhost.59297?>?localhost.6379:?Flags?[P.]?"AUTH?123456"
localhost.6379?>?localhost.59297:?Flags?[P.]?"OK"
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF?listening-port?6380"
localhost.6379?>?localhost.59297:?Flags?[P.]?"OK":
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF?capa?eof"
localhost.6379?>?localhost.59297:?Flags?[P.]?"OK":
localhost.59297?>?localhost.6379:?Flags?[P.]?"PSYNC???-1"
localhost.6379?>?localhost.59297:?Flags?[P.]?"FULLRESYNC?8efb6ca4edf1258c05a5ced43b0c73fe4deb1908?1"
localhost.6379?>?localhost.59297:?Flags?[P.]?[|RESP:
localhost.6379?>?localhost.59297:?Flags?[P.]?"REDIS0007M-z^Iredis-ver^F3.2.11M-z"?[|RESP
#?完成?Full-ReSync?后進(jìn)入?Propagation?階段
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"1"
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"1"
localhost.6379?>?localhost.59297:?Flags?[P.]?"PING"
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"15"
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"15"
localhost.6379?>?localhost.59297:?Flags?[P.]?"SELECT"?"0"?"SET"?"KEY"?"VALUE"
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"85"
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"85"
localhost.6379?>?localhost.59297:?Flags?[P.]?"SET"?"KEY2"?"VALUE2"
localhost.6379?>?localhost.59297:?Flags?[P.]?"MSET"?"KEY3"?"VALUE3"?"KEY4"?"VALUE4"?"KEY5"?"VALUE5"
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"256"
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"256"
localhost.6379?>?localhost.59297:?Flags?[P.]?"PING"
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"270"
localhost.59297?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"270"
#?在?localhost:6380?上執(zhí)行?DEBUG?SLEEP?60?模擬網(wǎng)絡(luò)中斷的情況
localhost.6379?>?localhost.59297:?Flags?[P.]?"PING"
localhost.6379?>?localhost.59297:?Flags?[P.]?"SET"?"KEY6"?"VALUE6"
localhost.6379?>?localhost.59297:?Flags?[P.]?"SET"?"KEY7"?"VALUE7"
localhost.6379?>?localhost.59297:?Flags?[P.]?"PING"
localhost.6379?>?localhost.59297:?Flags?[P.]?"MSET"?"KEY8"?"VALUE8"?"KEY9"?"VALUE9"
localhost.6379?>?localhost.59297:?Flags?[P.]?"PING"
localhost.6379?>?localhost.59297:?Flags?[P.]?"PING"
localhost.59297?>?localhost.6379:?Flags?[.]
localhost.59297?>?localhost.6379:?Flags?[R.]
#?舊的同步連接斷開后重新建立同步連接,進(jìn)入?Partical-ReSync?階段
localhost.59313?>?localhost.6379:?Flags?[S]
localhost.6379?>?localhost.59313:?Flags?[S.]
localhost.59313?>?localhost.6379:?Flags?[P.]?"PING"
localhost.6379?>?localhost.59313:?Flags?[P.]?"NOAUTH?Authentication?required."
localhost.59313?>?localhost.6379:?Flags?[P.]?"AUTH?123456"
localhost.6379?>?localhost.59313:?Flags?[P.]?"OK"
localhost.59313?>?localhost.6379:?Flags?[P.]?"REPLCONF?listening-port?6380"
localhost.6379?>?localhost.59313:?Flags?[P.]?"OK"
localhost.59313?>?localhost.6379:?Flags?[P.]?"REPLCONF?capa?eof"
localhost.6379?>?localhost.59313:?Flags?[P.]?"OK"
localhost.59313?>?localhost.6379:?Flags?[P.]?"PSYNC?8efb6ca4edf1258c05a5ced43b0c73fe4deb1908?271"
localhost.6379?>?localhost.59313:?Flags?[P.]?"CONTINUE"
localhost.6379?>?localhost.59313:?Flags?[P.]?"PING"?"PING"?"SET"?"KEY6"?"VALUE6"?"PING"?"SET"?"KEY7"?"VALUE7"?"PING"?"MSET"?"KEY8"?"VALUE8"?"KEY9"?"VALUE9"?"PING"?"PING"
localhost.59313?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"519"
localhost.59313?>?localhost.6379:?Flags?[P.]?"REPLCONF"?"ACK"?"519"
localhost.6379?>?localhost.59313:?Flags?[P.]?"PING"
localhost.59313?>?localhost.6379:?Flags?[P.]:?"REPLCONF"?"ACK"?"533"
localhost.59313?>?localhost.6379:?Flags?[P.]:?"REPLCONF"?"ACK"?"533"
整個(gè)過程可以分為?Full-ReSync,?Command-Propagate,?Partical-ReSync?總共 3 階段:
?????????????+----------------------+????????????????+---------------------+
?????????????|?redisServer?(master)?|????????????????|?redisServer?(slave)?|
?????????????|????localhost:6379????|????????????????|????localhost:6380???|
?????????????+----------------------+????????????????+---------------------+?
?????????????|????????slaves????????|????????????????|????????master???????|?
?????????????+----------------------+????????????????+---------------------+?
????????????????????????|???????????????????????????????????????|
?????????????????+----------------+??????????????????????+-------------+
?????????????????|?redisClient[?]?|??????????????????????|?redisClient?|??
?????????????????+----------------+??????????????????????+-------------+
????????????????????????????????????????????????????????????????|
???????^????????????????<<<<<<<<<<<<<<<<<???????|????????????????|???????????????????????????????????????????????Step?1?:?檢查套接字與?master?狀態(tài)
???????|????????????????>>>>>>>>>>>>>?PONG?/?NOAUTH?>>>>>>>>>>>>>????????????????
???????|????????????????????????????????????????????????????????|????????????????
???????|????????????????<<<<<<<<<<<<<<<<<???????|????????????????|???????????????????????????????????????????????Step?2?:?身份驗(yàn)證
???????|????????????????>>>>>>>>>>>>>>>>>>>?OK?>>>>>>>>>>>>>>>>>>????????????????
???????|????????????????????????????????????????????????????????|????????????????
???????|????????????????<<<???????|????????????????|???????????????????????????????????????????????Step?3?:?發(fā)送?slave?端口?
???Full-ReSync??????????>>>>>>>>>>>>>>>>>>>?OK?>>>>>>>>>>>>>>>>>>???????????
???????|????????????????????????????????????????????????????????|????????????????
???????|????????????????<<<<<???????|????????????????|???????????????????????????????????????????????Step?4?:?檢查命令兼容性
???????|????????????????>>>>>>>>>>>>>>>>>>>?OK?>>>>>>>>>>>>>>>>>>????????????????
???????|????????????????????????????????????????????????????????|????????????????
???????|????????????????<<<<<<<<<<<<<???????|????????????????|????????????????????????????????????????????????????????
???????|????????????????>>>>>>?FULLRESYNC?[replid]?[offset]?>>>>>???????Step?6?:?執(zhí)行全量同步
???????|????????????????V????????????????????????????????????????????????????????
???????|??????????????BGSAVE?????????????????????????????????????????????????????
???????|????????????????V????????????????????????????????????????????????????????
???????v????????????????>>>>>>>>>>>>>?RDB?Snapshot?>>>>>>>>>>>>>>????????????????
???????^????????????????<<<<<<<<???????|????????????????>>>>>>>>>>>>>>>?COMMAND?1?>>>>>>>>>>>>>>>????????????????
???????|????????????????>>>>>>>>>>>>>>>?COMMAND?2?>>>>>>>>>>>>>>>????????????????
???????|????????????????<<<<<<<Command-Propagate???????>>>>>>>>>>>>>>>>>>?PING?>>>>>>>>>>>>>>>>>??????????
???????|????????????????>>>>>>>>>>>>>>>?COMMAND?3?>>>>>>>>>>>>>>>????????????????
???????|????????????????<<<<<<<???????|????????????????<<<<<<<???????v????????????????>>>>>>>>>>>>>>>>>>?PING?>>>>>>>>>>>>>>>>>????????????????
???????^????????????????=========================================????????????????
???????|????????????????======?The?Same?With?Full-ReSync?========????????????????
???????|????????????????=========================================????????????????
???????|????????????????????????????????????????????????????????|????????????????
??Partical-ReSync???????<<<<<<<???????|????????????????|????????????????????????????????????????????????????????
???????|????????????????>>>>>>>>>>>>>>>?CONTINUE?>>>>>>>>>>>>>>>>????????????????
???????|????????????????>>>>>>>>>>>>>>>?COMMAND?N?>>>>>>>>>>>>>>>????????????????
???????v????????????????>>>>>>>>>>>>>>>?COMMAND?...?>>>>>>>>>>>>>???
PSYNC 命令
最初 Redis 用于同步的命令是SYNC,每次重連執(zhí)行該命令時(shí)都會(huì)生成、傳輸、加載整個(gè)完整的 RDB 快照,嚴(yán)重占用機(jī)器資源與網(wǎng)絡(luò)帶寬。為了解決這一問題,后續(xù)版本的 Redis 追加了PSYNC命令,該命令支持以下兩種同步模式:
全量重新同步
Full-ReSyncslave 首次連接 master
master 與 slave 之間的狀態(tài)差異過大
部分重新同步
Partical-ReSync網(wǎng)絡(luò)抖動(dòng)導(dǎo)致同步連接斷開重連
sentinel 機(jī)制導(dǎo)致 master 節(jié)點(diǎn)發(fā)生變更
數(shù)據(jù)結(jié)構(gòu)
下面看看 redisServer 中與PSYNC相關(guān)的數(shù)據(jù)結(jié)構(gòu):
struct?redisServer?{
????/*
?????*??節(jié)點(diǎn)ID?與?復(fù)制偏移量
?????*
?????*?????若當(dāng)前節(jié)點(diǎn)是?master
?????*?????server.replid?就是?server.runid
?????*
?????*?????若當(dāng)前節(jié)點(diǎn)原本是?master,轉(zhuǎn)化為?slave?節(jié)點(diǎn)后
?????*?????server.replid?與?server.master_repl_offset?會(huì)被新?master?的同步信息覆蓋
?????*
?????*?????若當(dāng)前節(jié)點(diǎn)原本是?slave,被提升為?master?節(jié)點(diǎn)后
?????*?????rserver.eplid2?與?server.second_replid_offset?會(huì)記錄當(dāng)前節(jié)點(diǎn)作為?slave?時(shí)的同步信息
?????*/
????char?runid[CONFIG_RUN_ID_SIZE+1];???/*?當(dāng)前節(jié)點(diǎn)的運(yùn)行時(shí)ID(每次重啟都會(huì)發(fā)生變化)?*/
????char?replid[CONFIG_RUN_ID_SIZE+1];??/*?當(dāng)前?master?節(jié)點(diǎn)的?runid?*/
????char?replid2[CONFIG_RUN_ID_SIZE+1];?/*?當(dāng)前?master?節(jié)點(diǎn)作為?slave?節(jié)點(diǎn)時(shí)連接的?master?的?runid?*/
????long?long?master_repl_offset;???/*?當(dāng)前?master?節(jié)點(diǎn)的復(fù)制偏移量?*/
????long?long?second_replid_offset;?/*?當(dāng)前?master?節(jié)點(diǎn)作為?slave?節(jié)點(diǎn)時(shí)的同步偏移量?*/
????/*
?????*?復(fù)制積壓緩沖
?????*
?????*?????master?只維護(hù)一個(gè)全局的?server.repl_backlog,由所有?slave?節(jié)點(diǎn)共享?
?????*?????為了減少內(nèi)存占用,server.repl_backlog?僅在?slave?節(jié)點(diǎn)存在時(shí)按需創(chuàng)建
?????*/
????char?*repl_backlog;?????????????/*?復(fù)制積壓緩沖(環(huán)形緩沖)*/
????long?long?repl_backlog_size;????/*?積壓緩沖大小?*/
????long?long?repl_backlog_histlen;?/*?積壓數(shù)據(jù)長度?*/
????long?long?repl_backlog_idx;?????/*?積壓緩沖尾部(可寫位置)*/
????long?long?repl_backlog_off;?????/*?積壓緩沖首字節(jié)對(duì)應(yīng)的同步偏移量(master?offset)*/
}
運(yùn)行ID
無論主從,每個(gè) Redis 服務(wù)器會(huì)在啟動(dòng)時(shí)生成一個(gè)長度為 40 的十六進(jìn)制字符串作為運(yùn)行IDrunid:
當(dāng) slave 首次請(qǐng)求同步時(shí),會(huì)將 master 返回的
server.runid保存至server.replid當(dāng) slave 重新請(qǐng)求同步時(shí),會(huì)將之前保存的
server.replid發(fā)送給 master:如果該 ID 與當(dāng)前 master 的
server.runid不一致,則必須執(zhí)行一次全量重新同步如果該 ID 與當(dāng)前 master 的
server.runid一致,則可以嘗試執(zhí)行部分同步操作
復(fù)制偏移量
主從雙方都會(huì)維護(hù)一個(gè)單位為字節(jié)的復(fù)制偏移量offset,通過該偏移量可以判斷主從間的狀態(tài)是否一致:
master 向 slave 傳播 N 字節(jié)數(shù)據(jù)后,會(huì)將自己的復(fù)制偏移量增加 N
slave 接收到 master 傳來的 N 個(gè)字節(jié)數(shù)據(jù)時(shí),會(huì)將自己的復(fù)制偏移量增加 N
當(dāng) master 接收到?REPLCONF ACK?中的偏移量時(shí),可以據(jù)此判斷發(fā)送給 slave 的數(shù)據(jù)是否發(fā)生了丟失,并重發(fā)丟失的數(shù)據(jù)。
積壓緩沖
master 端維護(hù)了一個(gè)定長的積壓緩沖隊(duì)列backlog。
master 向 slave 傳播命令時(shí)會(huì)同時(shí)將命令放入該隊(duì)列,因此緩沖區(qū)里會(huì)保留一部分最新的命令。
slave 發(fā)出同步請(qǐng)求時(shí),如果 slave 的偏移量之后?(offset+1)?的數(shù)據(jù)存在于積壓緩沖,master 才會(huì)執(zhí)行部分同步。
同步流程
SLAVE 視角
slave 接收到SLAVEOF命令后,會(huì)調(diào)用replicaofCommand開始執(zhí)行主從同步:
void?replicaofCommand(client?*c)?{
????//?...
????if?(!strcasecmp(c->argv[1]->ptr,"no")?&&
????????!strcasecmp(c->argv[2]->ptr,"one"))?{
????????if?(server.masterhost)?{?//?如果接收到的命令是?SLAVE?NO?ONE?則斷開主從同步
????????????//?...
????????}
????}?else?{?
????????if?(c->flags?&?CLIENT_SLAVE)?{
????????????return;?//?如果已經(jīng)是客戶端是一個(gè)?slave?節(jié)點(diǎn),則拒絕該命令
????????}
????????if?(server.masterhost?&&?!strcasecmp(server.masterhost,c->argv[1]->ptr)?&&?server.masterport?==?port)?{
????????????return;?//?如果已經(jīng)連接上?SLAVEOF?中指定的?master?節(jié)點(diǎn),則直接返回
????????}
????????//?如果尚未連接任意?master?節(jié)點(diǎn),則根據(jù)?masterhost?與?masterport?建立?TCP?連接
????????//?并注冊(cè)監(jiān)聽函數(shù)?syncWithMaster
????}
}
void?syncWithMaster(connection?*conn)?{
????//?向?master?節(jié)點(diǎn)發(fā)送?PING?命令
????if?(server.repl_state?==?REPL_STATE_CONNECTING)?{
????????server.repl_state?=?REPL_STATE_RECEIVE_PONG;
????????err?=?sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL);?//?發(fā)送?PING?命令
????????//?...
????}
????//?監(jiān)聽到?master?對(duì)?PING?命令的響應(yīng)
????if?(server.repl_state?==?REPL_STATE_RECEIVE_PONG)?{
????????if?(err[0]?!=?'+'?&&
????????????strncmp(err,"-NOAUTH",7)?!=?0?&&
????????????strncmp(err,"-NOPERM",7)?!=?0?&&
????????????strncmp(err,"-ERR?operation?not?permitted",28)?!=?0)
????????{
????????????goto?error;
????????}
????????server.repl_state?=?REPL_STATE_SEND_AUTH;?//?只處理?master?響應(yīng)值為?PONG、NOAUTH、NOPERM?的情況
????}
????//?根據(jù)?master?對(duì)?PING?的響應(yīng)值,判斷是否需要授權(quán)
????if?(server.repl_state?==?REPL_STATE_SEND_AUTH)?{
????????if?(server.masteruser?&&?server.masterauth)?{
????????????err?=?sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",
?????????????????????????????????????????server.masteruser,server.masterauth,NULL);?//?發(fā)送?AUTH?命令
????????????//?...
????????????server.repl_state?=?REPL_STATE_RECEIVE_AUTH;
????????}?else?{
????????????//?如果沒有設(shè)置?server.masteruser?與?server.masterauth?授權(quán)信息,則跳過?AUTH
????????????server.repl_state?=?REPL_STATE_SEND_PORT;
????????}
????}
????//?此處略過以下步驟:
????//?使用?REPLCONF?listening-port?命令將?slave?的端口告知?master
????//?使用?REPLCONF?ip-address?命令將?slave?的?IP?告知?master
????//?使用?REPLCONF?capa?eof?/?capa?psync2?命令將?slave?兼容性(支持的特性)告知?master
?????//?開始發(fā)送?PSYNC?命令
????if?(server.repl_state?==?REPL_STATE_SEND_PSYNC)?{
????????if?(slaveTryPartialResynchronization(conn,0)?==?PSYNC_WRITE_ERROR)?{
????????????goto?write_error;
????????}
????????server.repl_state?=?REPL_STATE_RECEIVE_PSYNC;
????????return;
????}
????//?讀取?PSYNC?命令的響應(yīng)
????psync_result?=?slaveTryPartialResynchronization(conn,1);
????//?如果監(jiān)聽到?CONTINUE?響應(yīng),跳過全量同步
????if?(psync_result?==?PSYNC_CONTINUE)?return;
????//?如果返回值為?PSYNC_FULLRESYNC?或?PSYNC_NOT_SUPPORTED
????//?開始執(zhí)行執(zhí)行全量同步,注冊(cè)?readSyncBulkPayload?監(jiān)聽?RDB?文件下載
????if?(connSetReadHandler(conn,?readSyncBulkPayload)?==?C_ERR)?{
????????//?...
????????goto?error;
????}
????server.repl_state?=?REPL_STATE_TRANSFER;
????//?...
}
int?slaveTryPartialResynchronization(connection?*conn,?int?read_reply)?{
????if?(!read_reply)?{
????????if?(server.cached_master)?{?//?server.cached_master?中存在記錄,嘗試執(zhí)行部分同步
????????????psync_replid?=?server.cached_master->replid;
????????}?else?{
????????????psync_replid?=?"?";?//?server.cached_master?中不存在記錄,只能執(zhí)行全量同步
????????}
????????//?發(fā)起?PSYNC?命令
????????reply?=?sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
????????//?...
????????return?PSYNC_WAIT_REPLY;
????}
????reply?=?sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);?//?讀取?PSYNC?響應(yīng)
????//?如果?master?響應(yīng)?FULLRESYNC?則直接進(jìn)行全量同步
????if?(!strncmp(reply,"+FULLRESYNC",11))?{
????????//?...
????????return?PSYNC_FULLRESYNC;
????}
????//?如果?master?響應(yīng)?CONTINUE?則嘗試執(zhí)行部分同步
????if?(!strncmp(reply,"+CONTINUE",9))?{
????????//?...
????????return?PSYNC_CONTINUE;
????}
????//?master?暫時(shí)無法處理?PSYNC?命令?—>?PSYNC_TRY_LATER
????//?master?不支持?PSYNC?命令?->?PSYNC_NOT_SUPPORTED
}
MASTER 視角
master 接收到PSYNC命令后,會(huì)調(diào)用syncCommand開啟同步流程:
void?syncCommand(client?*c)?{
????//?接收到?slave?發(fā)送的?PSYNC?命令
????if?(!strcasecmp(c->argv[0]->ptr,"psync"))?{
????????if?(masterTryPartialResynchronization(c)?==?C_OK)?{
????????????return;?//?無需全量同步,直接返回
????????}
????}
????//?若代碼運(yùn)行至此處,意味著部分同步失敗,需要執(zhí)行全量同步
????//?master?會(huì)執(zhí)行?BGSAVE?命令生成快照并傳輸給?slave
????//?同步?RDB?快照的方式有兩種:
????//?????基于磁盤(Disk-backed):在磁盤生成?RDB?快照文件,然后再傳輸給?slave
????//?????無盤(Diskless):直接將?RDB?快照數(shù)據(jù)寫入?slave?socket
}
int?masterTryPartialResynchronization(client?*c)?{
????long?long?psync_offset;?//?該?slave?最新的同步偏移量
????char?*master_replid;????//?slave?同步偏移量對(duì)應(yīng)的?master?的?runid
????/*
?????*??以下情況可以避免全量同步:
?????*??1.?slave?最近一次同步的?master?是當(dāng)前實(shí)例(網(wǎng)絡(luò)抖動(dòng))
?????*??2.?slave?與當(dāng)前節(jié)點(diǎn)原本是同個(gè)?master?的從節(jié)點(diǎn),且當(dāng)前節(jié)點(diǎn)的同步偏移量?second_replid_offset?較大(維護(hù)重啟、故障切換)*/
????if?(strcasecmp(master_replid,?server.replid)?&&
???????(strcasecmp(master_replid,?server.replid2)?||psync_offset?>?server.second_replid_offset))
????{
????????goto?need_full_resync;?//?不滿足?PSYNC?條件,需要執(zhí)行全量同步
????}
????/*
?????*??以下情況只能執(zhí)行全量同步:
?????*??1.?master?沒有初始化積壓緩沖?
?????*??2.?slave?的同步偏移量落后于積壓緩沖?*/
????if?(!server.repl_backlog?||?
????????psync_offset?????????psync_offset?>?(server.repl_backlog_off?+?server.repl_backlog_histlen))
????{
????????goto?need_full_resync;?//?進(jìn)行全量同步
????}
????//?若代碼運(yùn)行至此處,意味著可以執(zhí)行部分同步
????listAddNodeTail(server.slaves,c);
????//?根據(jù)客戶端是否兼容?PSYNC2,返回不同的?CONTINUE?響應(yīng)
????if?(c->slave_capa?&?SLAVE_CAPA_PSYNC2)?{
????????buflen?=?snprintf(buf,sizeof(buf),"+CONTINUE?%s\r\n",?server.replid);
????}?else?{
????????buflen?=?snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
????}
????//?CONTINUE?命令后面,緊接著就是?server.repl_backlog?的內(nèi)容
????psync_len?=?addReplyReplicationBacklog(c,psync_offset);
????//?...
}
心跳 & 命令傳播
Redis 每秒會(huì)執(zhí)行一次定時(shí)任務(wù)replicationCron,其中就包含主從同步間的心跳,可以發(fā)現(xiàn)主從雙方的心跳頻率是不一致的:
void?replicationCron(void)?{
????//?slave?定時(shí)向?master?發(fā)送?REPLCONF?ACK?命令
????if?(server.masterhost?&&?server.master?&&
????????!(server.master->flags?&?CLIENT_PRE_PSYNC))?{
????????addReplyArrayLen(c,3);
????????addReplyBulkCString(c,"REPLCONF");
????????addReplyBulkCString(c,"ACK");
????????addReplyBulkLongLong(c,c->reploff);
????}
????//?master?定時(shí)向?slave?發(fā)送?PING?命令
????if?((replication_cron_loops?%?server.repl_ping_slave_period)?==?0?&&
????????listLength(server.slaves))
????{
????????robj?*ping_argv[1];
????????ping_argv[0]?=?createStringObject("PING",4);
????????replicationFeedSlaves(server.slaves,?server.slaveseldb,?ping_argv,?1);
????????decrRefCount(ping_argv[0]);
????}
}
master 在調(diào)用call函數(shù)執(zhí)行客戶端傳過來的命令時(shí),會(huì)將命令傳播給 slave 并同時(shí)寫入積壓緩沖:
void?call(client?*c,?int?flags)?{
????//?...
????if?(flags?&?CMD_CALL_PROPAGATE?&&?(c->flags?&?CLIENT_PREVENT_PROP)?!=?CLIENT_PREVENT_PROP)
????{
????????//?當(dāng)前命令是否需要傳播
????????if?(propagate_flags?!=?PROPAGATE_NONE?&&?!(c->cmd->flags?&?CMD_MODULE))
????????????propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
????}
}
void?propagate(struct?redisCommand?*cmd,?int?dbid,?robj?**argv,?int?argc,
???????????????int?flags)
{
????//?...
????if?(flags?&?PROPAGATE_REPL)
????????replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
void?replicationFeedSlaves(list?*slaves,?int?dictid,?robj?**argv,?int?argc)?{
????//?如果當(dāng)前節(jié)點(diǎn)沒有?slave?節(jié)點(diǎn)或復(fù)制積壓緩沖,立即返回
????if?(server.repl_backlog?==?NULL?&&?listLength(slaves)?==?0)?return;
????//?向?repl_backlog?中批量寫入命令
????if?(server.repl_backlog)?{
????????char?aux[LONG_STR_SIZE+3];?//?命令緩沖,用于序列化?redis?命令
????????/*?寫入當(dāng)前批次的命令數(shù)量?*/
????????aux[0]?=?'*';
????????len?=?ll2string(aux+1,sizeof(aux)-1,argc);
????????aux[len+1]?=?'\r';
????????aux[len+2]?=?'\n';
????????feedReplicationBacklog(aux,len+3);
????????/*?逐個(gè)遍歷命令,將其序列化后寫入?repl_backlog?*/
????????for?(j?=?0;?j?????????????long?objlen?=?stringObjectLen(argv[j]);
????????????
????????????aux[0]?=?'$';
????????????len?=?ll2string(aux+1,sizeof(aux)-1,objlen);
????????????aux[len+1]?=?'\r';
????????????aux[len+2]?=?'\n';
????????????feedReplicationBacklog(aux,len+3);
????????????feedReplicationBacklogWithObject(argv[j]);
????????????feedReplicationBacklog(aux+len+1,2);
????????}
????}
????//?將命令批量傳播給所有?slaves?對(duì)應(yīng)的?client
????listRewind(slaves,&li);
????while((ln?=?listNext(&li)))?{
????????client?*slave?=?ln->value;
????????/*?寫入當(dāng)前批次的命令數(shù)量?*/
????????addReplyArrayLen(slave,argc);
????????/*?逐個(gè)遍歷命令,將傳播給?slave?節(jié)點(diǎn)?*/
????????for?(j?=?0;?j?????????????addReplyBulk(slave,argv[j]);
????}
}
相關(guān)參數(shù)
slave-serve-stale-data
主從節(jié)點(diǎn)斷開時(shí)或同步未完成時(shí),slave 如何響應(yīng)客戶端請(qǐng)求
yes:正常響應(yīng)命令,但是不保證數(shù)據(jù)質(zhì)量
no:拒絕響應(yīng)命令,返回?SYNC with master in progress
repl-diskless-sync
執(zhí)行全量同步時(shí),master 如何將 RDB 快照傳輸給 slave
no:先在磁盤生成 RDB 文件再進(jìn)行傳輸(低帶寬網(wǎng)絡(luò))
yes:直接將 RDB 快照寫入 slave 的 socket(低速磁盤 + 高帶寬網(wǎng)絡(luò))
repl-ping-slave-period
master 向 slave 發(fā)送?PING?心跳的間隔,默認(rèn) 10s 發(fā)送一次
repl-backlog-size
同步積壓緩沖的空間,默認(rèn)值大小為 1mb。
由于主從連接斷開后,所有的命令都會(huì)積壓在這里,如果該值太小會(huì)導(dǎo)致?PSYNC?命令會(huì)無法執(zhí)行部分同步。
如果 master 需要執(zhí)行大量寫命令,或者 slave 需要較長時(shí)間才能重連成功,則需要根據(jù)實(shí)際情況進(jìn)行估算。
min-slaves-to-write & min-slaves-max-lag
則當(dāng)不滿足下列條件時(shí),master 會(huì)拒絕寫命令直至恢復(fù):
連接當(dāng)前 master 的 slave 數(shù)量大于等于?min-slaves-to-write?個(gè)節(jié)點(diǎn)連接正常
連接正常的 slave 節(jié)點(diǎn)中不少于?min-slaves-to-write?個(gè)節(jié)點(diǎn)的延遲時(shí)間小于 *min-slaves-max-lag?秒
啟用這兩個(gè)選項(xiàng)后,寫命令大概率能夠被復(fù)制到?min-slaves-to-write?個(gè)從節(jié)點(diǎn)中,減少了命令丟失的概率。
至此,對(duì) redis 的主從同步分析完畢,后續(xù)將對(duì) redis 的一些其他細(xì)節(jié)進(jìn)行分享,感謝觀看。
粉絲福利:Java從入門到入土學(xué)習(xí)路線圖
??????

??長按上方微信二維碼?2 秒
感謝點(diǎn)贊支持下哈?
