微服務(wù)注冊(cè)中心分布式集群設(shè)計(jì)原理與 Golang 實(shí)現(xiàn)
內(nèi)容提要
服務(wù)注冊(cè)發(fā)現(xiàn)作為微服務(wù)的基礎(chǔ)組件,它的穩(wěn)定性和可用性備受考驗(yàn)。在之前的文章中,我們介紹了服務(wù)注冊(cè)中心的基本原理和實(shí)現(xiàn),具體參閱:
分布式集群架構(gòu)原理 分布式數(shù)據(jù)復(fù)制技術(shù)
狀態(tài)一致性協(xié)同算法
Golang 代碼實(shí)現(xiàn)集群服務(wù)
集群待解決問(wèn)題
要實(shí)現(xiàn)注冊(cè)中心從單機(jī)版到分布式集群,有幾個(gè)關(guān)鍵問(wèn)題要解決:
集群成員間的關(guān)系與成員發(fā)現(xiàn)問(wèn)題 集群成員間數(shù)據(jù)復(fù)制與一致性問(wèn)題 數(shù)據(jù)副本機(jī)制和數(shù)據(jù)分區(qū)策略
針對(duì)上述問(wèn)題會(huì)有不同解決方案,而不同方案會(huì)對(duì)集群的可用性、容錯(cuò)能力和數(shù)據(jù)一致性造成不同結(jié)果,著名的 CAP 理論就是對(duì)分布式問(wèn)題的最好詮釋。架構(gòu)就是在不同的方案和結(jié)果中進(jìn)行的折中,沒(méi)有最好的方案,只有適合場(chǎng)景的最佳實(shí)踐,權(quán)衡取舍也是架構(gòu)之魅力所在。
節(jié)點(diǎn)關(guān)系與成員發(fā)現(xiàn)
架構(gòu)模型
集群中節(jié)點(diǎn)關(guān)系可以分為兩種:平等公平關(guān)系和非公平關(guān)系。
P2P (pear to pear)點(diǎn)對(duì)點(diǎn)架構(gòu)就是平等公平關(guān)系,這種關(guān)系中各節(jié)點(diǎn)沒(méi)有領(lǐng)導(dǎo)分工,大家分?jǐn)偣ぷ?,共同努力完成目?biāo)。
技術(shù)選型
針對(duì)注冊(cè)中心場(chǎng)景選擇哪種架構(gòu)呢?可以從以下幾點(diǎn)分析。
主從架構(gòu)一般是做讀寫分離,寫主讀從(當(dāng)然也有同步寫,后面會(huì)分析到),相對(duì)來(lái)說(shuō)寫性能有限,但可以通過(guò)多個(gè)從來(lái)提升讀性能。
注冊(cè)中心場(chǎng)景一般讀多寫少,這點(diǎn)上倒也沒(méi)有絕對(duì)的優(yōu)劣。
2.可用性
主從架構(gòu)中主掛了會(huì)影響寫,比如 MySQL 的 MHA,Redis 的 Sentinel 都是用來(lái)監(jiān)控并實(shí)現(xiàn)切主,來(lái)保障高可用。而像 Zookeeper 支持半數(shù)以內(nèi)的節(jié)點(diǎn)掛掉,超過(guò)半數(shù)就要觸發(fā)重新選主了,此時(shí)不能寫入。相比于點(diǎn)對(duì)點(diǎn)架構(gòu),整體可用性會(huì)差一點(diǎn)。
CAP 理論告訴我們,分布式系統(tǒng)在一致性(Consistency)、可用性(Availability) 和分區(qū)容錯(cuò)性 (Partition tolerance)三者只能選其二。在集群正常情況下,一致性和可用性都沒(méi)問(wèn)題(也就是 CA,網(wǎng)上大多數(shù)文章說(shuō) CA 模型不存在,其實(shí)說(shuō)法并不準(zhǔn)確,在正常情況下,一致性和可用性還是可以同時(shí)保障的)。但當(dāng)集群出現(xiàn)異常,分區(qū)容錯(cuò)性必須保障(想想為什么?),那么一致性和可用性就要二選一,選 AP 還是 CP?

(CAP 理論 圖片來(lái)自網(wǎng)絡(luò))
3.架構(gòu)實(shí)現(xiàn)
點(diǎn)對(duì)點(diǎn)架構(gòu)實(shí)現(xiàn)相對(duì)更簡(jiǎn)單,不用考慮選主或主從切換的問(wèn)題,節(jié)點(diǎn)狀態(tài)也只要考慮上線狀態(tài)和下線狀態(tài)即可;
而領(lǐng)導(dǎo)者協(xié)調(diào)者架構(gòu)在實(shí)現(xiàn)實(shí)現(xiàn)選主時(shí)要應(yīng)對(duì)復(fù)雜的一致性協(xié)同算法,維護(hù)更復(fù)雜的狀態(tài)機(jī)。
集群架構(gòu)設(shè)計(jì)
我們來(lái)看點(diǎn)對(duì)點(diǎn)集群架構(gòu)圖:

(注冊(cè)中心集群點(diǎn)對(duì)點(diǎn)架構(gòu)圖)

(注冊(cè)中心集群節(jié)點(diǎn)自發(fā))
代碼實(shí)現(xiàn)
下面我們通過(guò)具體代碼來(lái)展開講解實(shí)現(xiàn)原理。首先我們定義節(jié)點(diǎn)的概念和結(jié)構(gòu)體,一個(gè)節(jié)點(diǎn)就是一個(gè)獨(dú)立的注冊(cè)中心服務(wù),集群由多個(gè)節(jié)點(diǎn)組成。結(jié)構(gòu)體 Node 存儲(chǔ)節(jié)點(diǎn)地址和節(jié)點(diǎn)狀態(tài),節(jié)點(diǎn)狀態(tài)有兩種:上線狀態(tài)(可對(duì)外提供服務(wù)),下線狀態(tài)(不對(duì)外服務(wù))。
type Node struct {
config *configs.Config
addr string
status int
}
func NewNode(config *configs.GlobalConfig, addr string) *Node {
return &Node{
addr: addr,
status: configs.NodeStatusDown, //初始化設(shè)為下線狀態(tài)
}
}(代碼 model/node.go)
結(jié)構(gòu)體 Nodes 用于存放所有節(jié)點(diǎn)列表和當(dāng)前節(jié)點(diǎn)地址,方便節(jié)點(diǎn)初始化和節(jié)點(diǎn)感知。
type Nodes struct {
nodes []*Node
selfAddr string
}
//初始化默認(rèn)從配置文件中加載節(jié)點(diǎn)信息
func NewNodes(c *configs.GlobalConfig) *Nodes {
nodes := make([]*Node, 0, len(c.Nodes))
for _, addr := range c.Nodes {
n := NewNode(c, addr)
nodes = append(nodes, n)
}
return &Nodes{
nodes: nodes,
selfAddr: c.HttpServer,
}
}
(代碼 model/nodes.go)
type Discovery struct {
config *configs.GlobalConfig
protected bool
Registry *Registry
+ Nodes atomic.Value
}
func NewDiscovery(config *configs.GlobalConfig) *Discovery {
//...
+ dis.Nodes.Store(NewNodes(config))
}
(代碼 model/discovery.go)
func (dis *Discovery) regSelf() {
now := time.Now().UnixNano()
instance := &Instance{
Env: dis.config.Env,
Hostname: dis.config.Hostname,
AppId: configs.DiscoveryAppId, //Kavin.discovery
Addrs: []string{"http://" + dis.config.HttpServer},
Status: configs.NodeStatusUp,
RegTimestamp: now,
UpTimestamp: now,
LatestTimestamp: now,
RenewTimestamp: now,
DirtyTimestamp: now,
}
dis.Registry.Register(instance, now)
//注冊(cè)后同步到其他集群,下面部分會(huì)展開講解
dis.Nodes.Load().(*Nodes).Replicate(configs.Register, instance)
}
(代碼 model/discovery.go)
func (dis *Discovery) renewTask(instance *Instance) {
now := time.Now().UnixNano()
ticker := time.NewTicker(configs.RenewInterval) //30 second
defer ticker.Stop()
for {
select {
case <-ticker.C:
_, err := dis.Registry.Renew(instance.Env, instance.AppId, instance.Hostname)
if err == errcode.NotFound {
dis.Registry.Register(instance, now)
dis.Nodes.Load().(*Nodes).Replicate(configs.Register, instance)
} else {
dis.Nodes.Load().(*Nodes).Replicate(configs.Renew, instance)
}
}
}
}
(代碼 model/discovery.go)
節(jié)點(diǎn)如果要進(jìn)行下線操作,會(huì)先進(jìn)行節(jié)點(diǎn)注銷操作,在項(xiàng)目 main() 中增加注銷自己的代碼,實(shí)現(xiàn)比較簡(jiǎn)單,可直接參考代碼:Discovery.CancelSelf(),代碼可通過(guò)本公眾號(hào)“技術(shù)歲月”發(fā)送“注冊(cè)發(fā)現(xiàn)”獲取。
func main() {
//graceful restart
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
<-quit
log.Println("shutdown discovery server...")
//cancel
++ global.Discovery.CancelSelf()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
}
節(jié)點(diǎn)的狀態(tài)變更感知,用于維護(hù)集群節(jié)點(diǎn)的上下線,從節(jié)點(diǎn)注冊(cè)表中拉取 AppId 為 Kavin.discovery 的數(shù)據(jù),然后通過(guò)該數(shù)據(jù)中的實(shí)例信息來(lái)維護(hù)節(jié)點(diǎn)列表。
func (dis *Discovery) nodesPerception() {
var lastTimestamp int64
ticker := time.NewTicker(configs.NodePerceptionInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fetchData, err := dis.Registry.Fetch(dis.config.Env, configs.DiscoveryAppId, configs.NodeStatusUp, lastTimest
amp)
if err != nil || fetchData == nil {
continue
}
var nodes []string
for _, instance := range fetchData.Instances {
for _, addr := range instance.Addrs {
u, err := url.Parse(addr)
if err == nil {
nodes = append(nodes, u.Host)
}
}
}
lastTimestamp = fetchData.LatestTimestamp
config := new(configs.GlobalConfig)
*config = *dis.config
config.Nodes = nodes
ns := NewNodes(config)
ns.SetUp()
dis.Nodes.Store(ns)
}
}
}
(代碼 model/discovery.go)
數(shù)據(jù)副本與數(shù)據(jù)一致性
數(shù)據(jù)模型一般會(huì)有副本和分區(qū)兩種形式,分區(qū)我們等會(huì)討論,先說(shuō)說(shuō)副本機(jī)制。
所謂副本機(jī)制 Replication,是指分布式系統(tǒng)在各節(jié)點(diǎn)上保存相同的數(shù)據(jù)拷貝,來(lái)達(dá)到備份的目的。
副本提供了幾個(gè)好處:數(shù)據(jù)冗余;可伸縮性;改善數(shù)據(jù)局部性。在點(diǎn)對(duì)點(diǎn)架構(gòu)中,每個(gè)節(jié)點(diǎn)都是一個(gè)獨(dú)立的數(shù)據(jù)副本,這樣某個(gè)節(jié)點(diǎn)出事不會(huì)影響別人,還可通過(guò)擴(kuò)充節(jié)點(diǎn)提升可用性,抗住更大并發(fā)。
多副本最大的困擾,就是數(shù)據(jù)的一致性了,上面我們分析了 CAP,明確了使用 AP 模型,成員間數(shù)據(jù)雖然不能做到強(qiáng)一致性,但怎么保障最終一致性呢?這里考慮如下幾點(diǎn):
服務(wù)啟動(dòng)時(shí)當(dāng)前節(jié)點(diǎn)數(shù)據(jù)為空,需要同步其他節(jié)點(diǎn)數(shù)據(jù)
某節(jié)點(diǎn)接收到服務(wù)最新數(shù)據(jù)變更,需要同步給其他節(jié)點(diǎn)
節(jié)點(diǎn)間數(shù)據(jù)不一致性如何“反熵”
節(jié)點(diǎn)啟動(dòng)時(shí)注冊(cè)表初始化
節(jié)點(diǎn)首次啟動(dòng)時(shí),其注冊(cè)表是空的,那么就要想辦法從其他節(jié)點(diǎn)同步數(shù)據(jù)。其邏輯就是遍歷所有節(jié)點(diǎn),獲取注冊(cè)表數(shù)據(jù),依次注冊(cè)到本地。這里注意只有當(dāng)所有數(shù)據(jù)同步完畢后,該注冊(cè)中心才可對(duì)外提供服務(wù),切換為上線狀態(tài)。
func (dis *Discovery) initSync() {
nodes := dis.Nodes.Load().(*Nodes)
for _, node := range nodes.AllNodes() {
if node.addr == nodes.selfAddr {
continue
}
uri := fmt.Sprintf("http://%s%s", node.addr, configs.FetchAllURL)
resp, err := httputil.HttpPost(uri, nil)
if err != nil {
log.Println(err)
continue
}
var res struct {
Code int `json:"code"`
Message string `json:"message"`
Data map[string][]*Instance `json:"data"`
}
err = json.Unmarshal([]byte(resp), &res)
if err != nil {
log.Printf("get from %v error : %v", uri, err)
continue
}
if res.Code != configs.StatusOK {
log.Printf("get from %v error : %v", uri, res.Message)
continue
}
dis.protected = false
for _, v := range res.Data {
for _, instance := range v {
dis.Registry.Register(instance, instance.LatestTimestamp)
}
}
}
nodes.SetUp()
}
(代碼 model/discovery.go)
這里考慮到節(jié)點(diǎn)數(shù)據(jù)可能不一致,循環(huán)同步了所有節(jié)點(diǎn)數(shù)據(jù)來(lái)提高一致性,相應(yīng)的會(huì)有網(wǎng)絡(luò) io 開銷與浪費(fèi),在一致性和資源開銷上做了取舍選擇。
注冊(cè)表數(shù)據(jù)變更時(shí)同步
Gossip 過(guò)程是由種子節(jié)點(diǎn)發(fā)起,當(dāng)一個(gè)種子節(jié)點(diǎn)有狀態(tài)需要更新到其他節(jié)點(diǎn)時(shí),它會(huì)隨機(jī)的選擇周圍幾個(gè)節(jié)點(diǎn)散播消息,收到消息的節(jié)點(diǎn)也會(huì)重復(fù)該過(guò)程,直至最終網(wǎng)絡(luò)中所有的節(jié)點(diǎn)都收到消息。

(當(dāng)前節(jié)點(diǎn)發(fā)起廣播同步數(shù)據(jù))
關(guān)于數(shù)據(jù)更新還要多做一些說(shuō)明,這里我們是更新完當(dāng)前節(jié)點(diǎn),即代表寫入完成,此時(shí)可以通過(guò)該節(jié)點(diǎn)獲取最新數(shù)據(jù),而同步其他節(jié)點(diǎn)并沒(méi)做檢查,也就是說(shuō)其他節(jié)點(diǎn)在同步完成前,獲取的數(shù)據(jù)可能不一致。如果在同步前當(dāng)前節(jié)點(diǎn)掛了,可能這次操作會(huì)丟失,我們并沒(méi)有采用同步寫模式,采用了弱一致性策略。
如果我們要強(qiáng)一致性怎么做呢?在數(shù)據(jù)寫入當(dāng)前節(jié)點(diǎn)并完成同步之前,所有節(jié)點(diǎn)數(shù)據(jù)不可讀或仍讀取之前版本數(shù)據(jù)(快照/多版本控制)。
在數(shù)據(jù)復(fù)制技術(shù)中,有同步復(fù)制、異步復(fù)制、半同步復(fù)制技術(shù),對(duì)應(yīng)的響應(yīng)延遲時(shí)間(可用性)和一致性也會(huì)有差別。
來(lái)看具體代碼實(shí)現(xiàn),以服務(wù)注冊(cè)為例,在 RegisterHandler 中,增加數(shù)據(jù)同步的邏輯,將數(shù)據(jù)變動(dòng)同步給其他節(jié)點(diǎn)。
func RegisterHandler(c *gin.Context) {
//...
+ if req.Replication {
+ global.Discovery.Nodes.Load().(*model.Nodes).Replicate(c, configs.Register, instance)
+ }
}(代碼 api/handler/register.go)
在 Replicate 方法中,遍歷所有的節(jié)點(diǎn),依次執(zhí)行注冊(cè)操作。
func (nodes *Nodes) Replicate(c *gin.Context, action configs.Action, instance *Instance) error {
if len(nodes.nodes) == 0 {
return nil
}
for _, node := range nodes.nodes {
if node.addr != nodes.selfAddr {
go nodes.action(c, node, action, instance)
}
}
return nil
}
func (nodes *Nodes) action(c *gin.Context, node *Node, action configs.Action, instance *Instance) {
switch action {
case configs.Register:
go node.Register(c, instance)
case configs.Renew:
go node.Renew(c, instance)
case configs.Cancel:
go node.Cancel(c, instance)
}
}
(代碼 model/nodes.go)
func (node *Node) Register(c *gin.Context, instance *Instance) error {
return node.call(c, node.registerURL, configs.Register, instance, nil)
}
func (node *Node) call(c *gin.Context, uri string, action configs.Action, instance *Instance, data interface{}) error {
params := make(map[string]interface{})
params["env"] = instance.Env
params["appid"] = instance.AppId
params["hostname"] = instance.Hostname
params["replication"] = true //broadcast stop here
switch action {
case configs.Register:
params["addrs"] = instance.Addrs
params["status"] = instance.Status
params["version"] = instance.Version
params["reg_timestamp"] = strconv.FormatInt(instance.RegTimestamp, 10)
params["dirty_timestamp"] = strconv.FormatInt(instance.DirtyTimestamp, 10)
params["latest_timestamp"] = strconv.FormatInt(instance.LatestTimestamp, 10)
case configs.Renew:
params["dirty_timestamp"] = strconv.FormatInt(instance.DirtyTimestamp, 10)
case configs.Cancel:
params["latest_timestamp"] = strconv.FormatInt(instance.LatestTimestamp, 10)
}
resp, err := httputil.HttpPost(uri, params)
if err != nil {
return err
}
res := Response{}
err = json.Unmarshal([]byte(resp), &res)
if err != nil {
return err
}
if res.Code != configs.StatusOK {
json.Unmarshal([]byte(res.Data), data)
return errcode.Conflict
}
return nil
}(代碼 model/node.go)
如果是續(xù)約事件丟失,可以在下一次續(xù)約時(shí)補(bǔ)上; 如果是注冊(cè)事件丟失,也可以在下次續(xù)約時(shí)發(fā)現(xiàn)并修復(fù)(NotFound 邏輯);
如果是取消事件丟失,長(zhǎng)時(shí)間不續(xù)約會(huì)有剔除。
最終一致性保障
自測(cè)方案
我們準(zhǔn)備 3 個(gè)配置,搭建 3 個(gè)節(jié)點(diǎn),通過(guò)靜態(tài)配置集群節(jié)點(diǎn)列表。
nodes: ["localhost:8881", "localhost:8882", "localhost:8883"]http_server: "localhost:8881" //其他節(jié)點(diǎn)配置8882和8883hostname: "sd1" //其他節(jié)點(diǎn)配置sd2和sd3
數(shù)據(jù)分區(qū)策略
分區(qū)有不同的描述,如 Kafka 叫分區(qū) Partitioning,而 MySQL 中叫分表,在 MongoDB、Elasticsearch 又叫分片 Shard,HBase、Tidb 中叫 Region,雖然實(shí)現(xiàn)原理可能不盡相同,但底層數(shù)據(jù)分區(qū)的思想?yún)s是一致性的。
注冊(cè)中心可以實(shí)現(xiàn)劃分區(qū)域 zone 的機(jī)制,zone1 保存服務(wù) A、B、C,zone2 保存服務(wù) D、E、F,來(lái)實(shí)現(xiàn)數(shù)據(jù)分區(qū)治理。

(多注冊(cè)中心數(shù)據(jù)分區(qū))
總結(jié)
本文實(shí)現(xiàn)了注冊(cè)中心的集群版,在集群實(shí)現(xiàn)過(guò)程中,先明確了點(diǎn)對(duì)點(diǎn)的平等架構(gòu)方式,并通過(guò)復(fù)制技術(shù)實(shí)現(xiàn)各副本間一致性問(wèn)題,也說(shuō)明了在可用性和一致性問(wèn)題上做的取舍。
感謝您的閱讀,歡迎點(diǎn)贊、轉(zhuǎn)發(fā)
