Golang 從零到一開發(fā)實現(xiàn) RPC 框架(二)集群實現(xiàn)
內容提要
在上一篇文章中分享了如何從零開始搭建一個 RPC 框架,并完成了 P2P 版本功能,本章繼續(xù)完善增加服務注冊發(fā)現(xiàn)和負載均衡實現(xiàn)集群能力。
傳送門:RPC框架(一)
本文主要內容包括:
RPC 接入服務注冊中心 服務端實現(xiàn)平滑啟停
客戶端實現(xiàn)服務發(fā)現(xiàn)
客戶端實現(xiàn)負載均衡
客戶端實現(xiàn)失敗策略
服務注冊發(fā)現(xiàn)
在 P2P 版本 RPC 中,客戶端要知道服務端的地址,并發(fā)起點對點連接,雖然滿足了服務調用的能力,但其弊端也顯而易見。為了保障服務高可用,通常會冗余部署多個服務端實例,而客戶端如何知道每一個服務實例的調用地址,服務端實例上下線又如何告知客戶端,這就需要引入服務自動注冊發(fā)現(xiàn)的能力。


注冊發(fā)現(xiàn)是指客戶端具備動態(tài)發(fā)現(xiàn)服務端實例的能力,一般借助服務注冊中心來實現(xiàn),開源注冊中心有“Eurake”或“Nacos”等,本人之前專門有文章講過其實現(xiàn),對應項目為 “service_discovery”,這里將以它為服務注冊中心,完成客戶端接入。
具體參閱:
首先定義客戶端接口,既要滿足服務提供者注冊/下線的能力,又要滿足服務消費者發(fā)現(xiàn)/觀察的能力。
type Registry interface {
Register(context.Context, *Instance) (context.CancelFunc, error)
Fetch(context.Context, string) ([]*Instance, bool)
Close() error
}
naming/naming.go
type Discovery struct {
once *sync.Once
conf *Config
ctx context.Context
cancelFunc context.CancelFunc
//local cache
mutex sync.RWMutex
apps map[string]*FetchData
registry map[string]struct{}
//registry center node
idx uint64 //node index
node atomic.Value //node list
}
func New(conf *Config) *Discovery {
if len(conf.Nodes) == 0 {
panic("conf nodes empty!")
}
ctx, cancel := context.WithCancel(context.Background())
dis := &Discovery{
ctx: ctx,
cancelFunc: cancel,
conf: conf,
apps: map[string]*FetchData{},
registry: map[string]struct{}{},
}
//from conf get node list
dis.node.Store(conf.Nodes)
go dis.updateNode()
return dis
}
naming/discovery.go
func (dis *Discovery) updateNode() {
ticker := time.NewTicker(NodeInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
uri := fmt.Sprintf(_nodesURL, dis.pickNode())
log.Println("discovery - request and update node, url:" + uri)
params := make(map[string]interface{})
params["env"] = dis.conf.Env
resp, err := HttpPost(uri, params)
if err != nil {
log.Println(err)
continue
}
res := ResponseFetch{}
err = json.Unmarshal([]byte(resp), &res)
if err != nil {
log.Println(err)
continue
}
newNodes := []string{}
for _, ins := range res.Data.Instances {
for _, addr := range ins.Addrs {
newNodes = append(newNodes, strings.TrimPrefix(addr, "http://"))
}
}
if len(newNodes) == 0 {
continue
}
curNodes := dis.node.Load().([]string)
if !compareNodes(curNodes, newNodes) {
dis.node.Store(newNodes)
log.Println("nodes list changed!", newNodes)
log.Println(newNodes)
} else {
log.Println("nodes list not change:", curNodes)
}
}
}
}
naming/discovery.go
//對比兩個數據是否完全相等
func compareNodes(a, b []string) bool {
if len(a) != len(b) {
return false
}
mapB := make(map[string]struct{}, len(b))
for _, node := range b {
mapB[node] = struct{}{}
}
for _, node := range a {
if _, ok := mapB[node]; !ok {
return false
}
}
return true
}
naming/discovery.go
實現(xiàn)服務注冊能力,先檢測本地緩存查看是否已注冊,沒有則請求注冊中心并發(fā)起注冊,異步維護一個定時任務來維持心跳(續(xù)約),如果發(fā)生終止則會調用取消接口從注冊中心注銷。
func (dis *Discovery) Register(ctx context.Context, instance *Instance) (context.CancelFunc, error)
{
var err error
//check local cache
dis.mutex.Lock()
if _, ok := dis.registry[instance.AppId]; ok {
err = errors.New("instance duplicate register")
} else {
dis.registry[instance.AppId] = struct{}{} //register local cache
}
dis.mutex.Unlock()
if err != nil {
return nil, err
}
//http register
ctx, cancel := context.WithCancel(dis.ctx)
if err = dis.register(instance); err != nil {
//fail
dis.mutex.Lock()
delete(dis.registry, instance.AppId)
dis.mutex.Unlock()
return cancel, err
}
ch := make(chan struct{}, 1)
cancelFunc := context.CancelFunc(func() {
cancel()
<-ch
})
//renew&cancel
go func() {
ticker := time.NewTicker(RenewInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := dis.renew(instance); err != nil {
dis.register(instance)
}
case <-ctx.Done():
dis.cancel(instance)
ch <- struct{}{}
}
}
}()
return cancelFunc, nil
}
naming/discovery.go
func (dis *Discovery) Fetch(ctx context.Context, appId string) ([]*Instance, bool) {
//from local
dis.mutex.RLock()
fetchData, ok := dis.apps[appId]
dis.mutex.RUnlock()
if ok {
log.Println("get data from local memory, appid:" + appId)
return fetchData.Instances, ok
}
//from remote
uri := fmt.Sprintf(_fetchURL, dis.pickNode())
params := make(map[string]interface{})
params["env"] = dis.conf.Env
params["appid"] = appId
params["status"] = 1 //up
resp, err := HttpPost(uri, params)
if err != nil {
dis.switchNode()
return nil, false
}
res := ResponseFetch{}
err = json.Unmarshal([]byte(resp), &res)
if res.Code != 200 {
return nil, false
}
if err != nil {
log.Println(err)
return nil, false
}
var result []*Instance
for _, ins := range res.Data.Instances {
result = append(result, ins)
}
if len(result) > 0 {
ok = true
dis.mutex.Lock()
dis.apps[appId] = &res.Data
dis.mutex.Unlock()
}
return result, ok
}
naming/discovery.go
服務端改造
服務端與注冊中心的交互包括服務啟動時會將自身服務信息(監(jiān)聽地址和端口)寫入注冊中心,開啟定時續(xù)約,在服務關閉退出時會注銷自身的注冊信息。

服務啟動注冊
type RPCServer struct {
listener Listener
++ registry naming.Registry
}
func NewRPCServer(option Option, registry naming.Registry) *RPCServer {
return &RPCServer{
listener: NewRPCListener(option),
++ registry: registry,
option: option,
}
}
provider/server.go
func main() {
//服務注冊中心
conf := &naming.Config{Nodes: config.RegistryAddrs, Env: config.Env}
discovery := naming.New(conf)
//注入依賴
srv := provider.NewRPCServer(option, discovery)
}
demo/server/server.go

func (svr *RPCServer) Run() {
//先啟動后暴露服務
err := svr.listener.Run()
if err != nil {
panic(err)
}
//register in discovery,注冊失敗(重試失敗)退出服務
err = svr.registerToNaming()
if err != nil {
svr.Close() //注冊失敗關閉服務
panic(err)
}
}
func (svr *RPCServer) registerToNaming() error {
instance := &naming.Instance{
Env: svr.option.Env,
AppId: svr.option.AppId,
Hostname: svr.option.Hostname,
Addrs: svr.listener.GetAddrs(),
}
retries := maxRegisterRetry
for retries > 0 {
retries--
cancel, err := svr.registry.Register(context.Background(), instance)
if err == nil {
svr.cancelFunc = cancel
return nil
}
}
return errors.New("register to naming server fail")
}
provider/server.go
做個測試,先啟動服務注冊中心(service_discovery),再運行 demo/server,通過配置不同端口和hostname,啟動兩個服務,從服務注冊中心可以看到其結果。

服務退出注銷
服務端從注冊中心注銷后,客戶端從注冊中心感知服務下線,就不再發(fā)送新連接和請求到該服務端實例。
這里也可能有些問題,由于客戶端緩存機制導致客戶端感知服務端變化滯后,仍會有少許時間新連接和請求提交到當前服務端。目前由于還未使用長鏈接管理,無法知曉有哪些客戶端連接。如果此時服務仍存活就正常處理返回,如果失敗可以返回“特殊失敗碼“,告知客戶端不要再請求了,服務端關閉了。

func (svr *RPCServer) Close() {
//從服務注冊中心注銷
if svr.cancelFunc != nil {
svr.cancelFunc()
}
//關閉當前服務
if svr.listener != nil {
svr.listener.Close()
}
}
func (svr *RPCServer) registerToNaming() error {
++ cancel, err := svr.registry.Register(context.Background(), instance)
++ svr.cancelFunc = cancel
}
//注冊中心注冊 (naming/discovery.go)
func (dis *Discovery) Register(ctx context.Context, instance *Instance) (context.CancelFunc, error)
{
ctx, cancel := context.WithCancel(dis.ctx)
ch := make(chan struct{}, 1)
cancelFunc := context.CancelFunc(func() {
cancel()
<-ch
})
for {
select {
case <-ctx.Done():
dis.cancel(instance) //服務注銷
ch <- struct{}{}
}
}
return cancelFunc, nil
}
服務關閉時,除了不再接受新請求外,還需要考慮處理中的請求,不能因為服務關閉而強制中斷所有處理中的請求。根據請求所處階段不同,可以分別設置“擋板”,告知服務調用方當前服務處于關閉流程,不再接受請求了。
func main() {
//...
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
<-quit
srv.Shutdown()
}
demo/server/server.go
func (svr *RPCServer) Shutdown() {
//從服務注冊中心注銷
if svr.cancelFunc != nil {
svr.cancelFunc()
}
//關閉當前服務
if svr.listener != nil {
svr.listener.Shutdown()
}
}
provider/server.go
(1)首先是服務端接收到客戶端連接階段。如果此時發(fā)現(xiàn)服務關閉,設置擋板不再往下執(zhí)行,直接返回。
func (l *RPCListener) Run() error {
//... listen ...
++ go l.acceptConn() //accept conn
}
func (l *RPCListener) acceptConn() {
for {
conn, err := l.nl.Accept()
if err != nil {
select {
case <-l.getDoneChan(): //擋板:server closed done
return
default:
}
return
}
go l.handleConn(conn) //處理連接
}
}
type RPCListener struct {
++ doneChan chan struct{} //控制結束
}
func (l *RPCListener) getDoneChan() <-chan struct{} {
return l.doneChan
}
//關閉時關閉通道
func (l *RPCListener) Shutdown() {
l.closeDoneChan()
}
//關閉通道
func (l *RPCListener) closeDoneChan() {
select {
case <-l.doneChan:
default:
close(l.doneChan)
}
}
provider/listener.go
func (l *RPCListener) handleConn(conn net.Conn) {
//關閉擋板
++ if l.isShutdown() {
++ return
++ }
for {
++ if l.isShutdown() {
++ return
++ }
//handle ...
}
}
type RPCListener struct {
++ shutdown int32 //關閉處理中標識位
}
//判斷是否關閉
func (l *RPCListener) isShutdown() bool {
return atomic.LoadInt32(&l.shutdown) == 1
}
//關閉邏輯
func (l *RPCListener) Shutdown() {
atomic.CompareAndSwapInt32(&l.shutdown, 0, 1)
}
provider/listener.go
(3)最后請求已進入服務實際處理階段。此時無法簡單設置擋板了,因為已經是處理中,就應該將請求處理完成。但我們需要確認有多少處理中的請求,并且確保這些請求全部執(zhí)行完成,然后就可以安全退出了。這有點像 WaitGroup 計數器,我們也維護一個處理中任務計數來達到目的。
type RPCListener struct {
++ handlingNum int32 //處理中任務數
}
func (l *RPCListener) handleConn(conn net.Conn) {
//...
//處理中任務數+1
++ atomic.AddInt32(&l.handlingNum, 1)
//任意退出都會導致處理中任務數-1
++ defer atomic.AddInt32(&l.handlingNum, -1)
//read from network
//decode
//call local func
//encode
//send result
}
func (l *RPCListener) Shutdown() {
atomic.CompareAndSwapInt32(&l.shutdown, 0, 1)
++ for {
++ if atomic.LoadInt32(&l.handlingNum) == 0 {
++ break
++ }
++ }
l.closeDoneChan()
}
provider/listener.go

客戶端改造
實現(xiàn)服務發(fā)現(xiàn)

客戶端通過 client_proxy 接入服務發(fā)現(xiàn),首先要在初始化時增加服務端的標識(appId),通過服務注冊中心獲取該標識對應的實例列表。
func NewClientProxy(appId string, option Option, registry naming.Registry) ClientProxy {
cp := &RPCClientProxy{
option: option,
failMode: option.FailMode,
registry: registry,
}
servers, err := cp.discoveryService(context.Background(), appId)
if err != nil {
log.Fatal(err)
}
cp.servers = servers
cp.loadBalance = LoadBalanceFactory(option.LoadBalanceMode, cp.servers)
return cp
}
//獲取服務列表
func (cp *RPCClientProxy) discoveryService(ctx context.Context, appId string) ([]string, error) {
instances, ok := cp.registry.Fetch(ctx, appId)
if !ok {
return nil, errors.New("service not found")
}
var servers []string
for _, instance := range instances {
servers = append(servers, instance.Addrs...)
}
return servers, nil
}
consumer/client_proxy.go
實現(xiàn)負載均衡

type LoadBalanceMode int
const (
RandomBalance LoadBalanceMode = iota
RoundRobinBalance
WeightRoundRobinBalance
)
type LoadBalance interface {
Get() string
}
func LoadBalanceFactory(mode LoadBalanceMode, servers []string) LoadBalance {
switch mode {
case RandomBalance:
return newRandomBalance(servers)
case RoundRobinBalance:
return newRoundRobinBalance(servers)
default:
return newRandomBalance(servers)
}
}
consumer/loadbalance.go
type randomBalance struct {
servers []string
}
func newRandomBalance(servers []string) LoadBalance {
return &randomBalance{servers: servers}
}
func (b *randomBalance) Get() string {
rand.Seed(time.Now().Unix())
return b.servers[rand.Intn(len(b.servers))]
}
consumer/loadbalance.go
type roundRobinBalance struct {
servers []string
curIdx int
}
func newRoundRobinBalance(servers []string) LoadBalance {
return &roundRobinBalance{servers: servers, curIdx: 0}
}
func (b *roundRobinBalance) Get() string {
lens := len(b.servers)
if b.curIdx >= lens {
b.curIdx = 0
}
server := b.servers[b.curIdx]
b.curIdx = (b.curIdx + 1) % lens
return server
}
consumer/loadbalance.go
func (cp *RPCClientProxy) getConn() error {
addr := strings.Replace(cp.loadBalance.Get(), cp.option.NetProtocol+"://", "", -1)
err := cp.client.Connect(addr) //長連接管理
if err != nil {
return err
}
return nil
}
consumer/client_proxy.go
實現(xiàn)失敗策略
執(zhí)行調用階段還要考慮失敗策略,即在調用服務端過程中出錯后如何處理?這里出錯通常是網絡原因或是服務端程序異常產生,而非業(yè)務錯誤。
處理辦法可分為接受失敗或發(fā)起重試,接受失敗對應策略就是 Failfast (快速失敗)。而重試可以繼續(xù)對上一次服務端地址發(fā)起調用 Failtry,它可以解決臨時性網絡失敗,但如果該實例服務端掛了再重試幾次也無濟于事,所以有另一個種策略 Failover,也就是故障轉移,換個服務端實例再試。

type FailMode int
const (
Failover FailMode = iota
Failfast
Failretry
)
consumer/fail.go
func (cp *RPCClientProxy) Call(ctx context.Context, servicePath string, stub interface{}, params ..
.interface{}) (interface{}, error) {
service, err := NewService(servicePath)
if err != nil {
return nil, err
}
err := cp.getConn()
if err != nil && cp.failMode == Failfast { //快速失敗
return nil, err
}
//失敗策略
switch cp.failMode {
case Failretry:
//...
case Failover:
//...
case Failfast:
//...
}
return nil, errors.New("call error")
}
consumer/client_proxy.go
switch cp.failMode {
case Failretry:
retries := cp.option.Retries
for retries > 0 {
retries--
if client != nil {
rs, err := cp.client.Invoke(ctx, service, stub, params...)
if err == nil {
return rs, nil
}
}
}
case Failover:
retries := cp.option.Retries
for retries > 0 {
retries--
if client != nil {
rs, err := cp.client.Invoke(ctx, service, stub, params...)
if err == nil {
return rs, nil
}
}
err = cp.getConn()
}
case Failfast:
if client != nil {
rs, err := cp.client.Invoke(ctx, service, stub, params...)
if err == nil {
return rs, nil
}
return nil, err
}
consumer/client_proxy.go


總結與補充
這一版 RPC 框架具備了集群能力、負載均衡和簡單容錯能力,當然離一個完善的微服務框架仍有不少距離,所以后續(xù)會陸續(xù)迭代,希望大家多多支持。
