企業(yè)級 RPC 框架 zRPC
近期比較火的開源項(xiàng)目go-zero是一個(gè)集成了各種工程實(shí)踐的包含了 Web 和 RPC 協(xié)議的功能完善的微服務(wù)框架,今天我們就一起來分析一下其中的 RPC 部分zRPC。
zRPC 底層依賴 gRPC,內(nèi)置了服務(wù)注冊、負(fù)載均衡、攔截器等模塊,其中還包括自適應(yīng)降載,自適應(yīng)熔斷,限流等微服務(wù)治理方案,是一個(gè)簡單易用的可直接用于生產(chǎn)的企業(yè)級 RPC 框架。
zRPC 初探
zRPC 支持直連和基于 etcd 服務(wù)發(fā)現(xiàn)兩種方式,我們以基于 etcd 做服務(wù)發(fā)現(xiàn)為例演示 zRPC 的基本使用:
配置
創(chuàng)建 hello.yaml 配置文件,配置如下:
Name: hello.rpc // 服務(wù)名
ListenOn: 127.0.0.1:9090 // 服務(wù)監(jiān)聽地址
Etcd:
Hosts:
- 127.0.0.1:2379 // etcd服務(wù)地址
Key: hello.rpc // 服務(wù)注冊key
創(chuàng)建 proto 文件
創(chuàng)建 hello.proto 文件,并生成對應(yīng)的 go 代碼
syntax = "proto3";
package pb;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
生成 go 代碼
protoc --go_out=plugins=grpc:. hello.proto
Server 端
package main
import (
"context"
"flag"
"log"
"example/zrpc/pb"
"github.com/tal-tech/go-zero/core/conf"
"github.com/tal-tech/go-zero/zrpc"
"google.golang.org/grpc"
)
type Config struct {
zrpc.RpcServerConf
}
var cfgFile = flag.String("f", "./hello.yaml", "cfg file")
func main() {
flag.Parse()
var cfg Config
conf.MustLoad(*cfgFile, &cfg)
srv, err := zrpc.NewServer(cfg.RpcServerConf, func(s *grpc.Server) {
pb.RegisterGreeterServer(s, &Hello{})
})
if err != nil {
log.Fatal(err)
}
srv.Start()
}
type Hello struct{}
func (h *Hello) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: "hello " + in.Name}, nil
}
Client 端
package main
import (
"context"
"log"
"example/zrpc/pb"
"github.com/tal-tech/go-zero/core/discov"
"github.com/tal-tech/go-zero/zrpc"
)
func main() {
client := zrpc.MustNewClient(zrpc.RpcClientConf{
Etcd: discov.EtcdConf{
Hosts: []string{"127.0.0.1:2379"},
Key: "hello.rpc",
},
})
conn := client.Conn()
hello := pb.NewGreeterClient(conn)
reply, err := hello.SayHello(context.Background(), &pb.HelloRequest{Name: "go-zero"})
if err != nil {
log.Fatal(err)
}
log.Println(reply.Message)
}
啟動服務(wù),查看服務(wù)是否注冊:
ETCDCTL_API=3 etcdctl get hello.rpc --prefix
顯示服務(wù)已經(jīng)注冊:
hello.rpc/7587849401504590084
127.0.0.1:9090
運(yùn)行客戶端即可看到輸出:
hello go-zero
這個(gè)例子演示了 zRPC 的基本使用,可以看到通過 zRPC 構(gòu)建 RPC 服務(wù)非常簡單,只需要很少的幾行代碼,接下來我們繼續(xù)進(jìn)行探索
zRPC 原理分析
下圖展示 zRPC 的架構(gòu)圖和主要組成部分

zRPC 主要有以下幾個(gè)模塊組成:
discov: 服務(wù)發(fā)現(xiàn)模塊,基于 etcd 實(shí)現(xiàn)服務(wù)發(fā)現(xiàn)功能
resolver: 服務(wù)注冊模塊,實(shí)現(xiàn)了 gRPC 的 resolver.Builder 接口并注冊到 gRPC
interceptor: 攔截器,對請求和響應(yīng)進(jìn)行攔截處理
balancer: 負(fù)載均衡模塊,實(shí)現(xiàn)了 p2c 負(fù)載均衡算法,并注冊到 gRPC
client: zRPC 客戶端,負(fù)責(zé)發(fā)起請求
server: zRPC 服務(wù)端,負(fù)責(zé)處理請求
這里介紹了 zRPC 的主要組成模塊和每個(gè)模塊的主要功能,其中 resolver 和 balancer 模塊實(shí)現(xiàn)了 gRPC 開放的接口,實(shí)現(xiàn)了自定義的 resolver 和 balancer,攔截器模塊是整個(gè) zRPC 的功能重點(diǎn),自適應(yīng)降載、自適應(yīng)熔斷、prometheus 服務(wù)指標(biāo)收集等功能都在這里實(shí)現(xiàn)
Interceptor 模塊
gRPC 提供了攔截器功能,主要是對請求前后進(jìn)行額外處理的攔截操作,其中攔截器包含客戶端攔截器和服務(wù)端攔截器,又分為一元 (Unary) 攔截器和流 (Stream) 攔截器,這里我們主要講解一元攔截器,流攔截器同理。

客戶端攔截器定義如下:
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
其中 method 為方法名,req,reply 分別為請求和響應(yīng)參數(shù),cc 為客戶端連接對象,invoker 參數(shù)是真正執(zhí)行 rpc 方法的 handler 其實(shí)在攔截器中被調(diào)用執(zhí)行
服務(wù)端攔截器定義如下:
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
其中 req 為請求參數(shù),info 中包含了請求方法屬性,handler 為對 server 端方法的包裝,也是在攔截器中被調(diào)用執(zhí)行
zRPC 中內(nèi)置了豐富的攔截器,其中包括自適應(yīng)降載、自適應(yīng)熔斷、權(quán)限驗(yàn)證、prometheus 指標(biāo)收集等等,由于攔截器較多,篇幅有限沒法所有的攔截器給大家一一解析,這里我們主要分析兩個(gè),自適應(yīng)熔斷和 prometheus 服務(wù)監(jiān)控指標(biāo)收集:
內(nèi)置攔截器分析
自適應(yīng)熔斷 (breaker)
當(dāng)客戶端向服務(wù)端發(fā)起請求,客戶端會記錄服務(wù)端返回的錯誤,當(dāng)錯誤達(dá)到一定的比例,客戶端會自行的進(jìn)行熔斷處理,丟棄掉一定比例的請求以保護(hù)下游依賴,且可以自動恢復(fù)。zRPC 中自適應(yīng)熔斷遵循《Google SRE》中過載保護(hù)策略,算法如下:

requests: 總請求數(shù)量
accepts: 正常請求數(shù)量
K: 倍值 (Google SRE 推薦值為 2)
可以通過修改 K 的值來修改熔斷發(fā)生的激進(jìn)程度,降低 K 的值會使得自適應(yīng)熔斷算法更加激進(jìn),增加 K 的值則自適應(yīng)熔斷算法變得不再那么激進(jìn)
熔斷攔截器定義如下:
func BreakerInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// target + 方法名
breakerName := path.Join(cc.Target(), method)
return breaker.DoWithAcceptable(breakerName, func() error {
// 真正執(zhí)行調(diào)用
return invoker(ctx, method, req, reply, cc, opts...)
}, codes.Acceptable)
}
accept 方法實(shí)現(xiàn)了 Google SRE 過載保護(hù)算法,判斷否進(jìn)行熔斷
func (b *googleBreaker) accept() error {
// accepts為正常請求數(shù),total為總請求數(shù)
accepts, total := b.history()
weightedAccepts := b.k * float64(accepts)
// 算法實(shí)現(xiàn)
dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
if dropRatio <= 0 {
return nil
}
// 是否超過比例
if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}
return nil
}
doReq 方法首先判斷是否熔斷,滿足條件直接返回 error(circuit breaker is open),不滿足條件則對請求數(shù)進(jìn)行累加
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
if err := b.accept(); err != nil {
if fallback != nil {
return fallback(err)
} else {
return err
}
}
defer func() {
if e := recover(); e != nil {
b.markFailure()
panic(e)
}
}()
// 此處執(zhí)行RPC請求
err := req()
// 正常請求total和accepts都會加1
if acceptable(err) {
b.markSuccess()
} else {
// 請求失敗只有total會加1
b.markFailure()
}
return err
}
prometheus 指標(biāo)收集
服務(wù)監(jiān)控是了解服務(wù)當(dāng)前運(yùn)行狀態(tài)以及變化趨勢的重要手段,監(jiān)控依賴于服務(wù)指標(biāo)的收集,通過 prometheus 進(jìn)行監(jiān)控指標(biāo)的收集是業(yè)界主流方案,zRPC 中也采用了 prometheus 來進(jìn)行指標(biāo)的收集
prometheus 攔截器定義如下:
這個(gè)攔截器主要是對服務(wù)的監(jiān)控指標(biāo)進(jìn)行收集,這里主要是對 RPC 方法的耗時(shí)和調(diào)用錯誤進(jìn)行收集,這里主要使用了 Prometheus 的 Histogram 和 Counter 數(shù)據(jù)類型
func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
interface{}, error) {
// 執(zhí)行前記錄一個(gè)時(shí)間
startTime := timex.Now()
resp, err := handler(ctx, req)
// 執(zhí)行后通過Since算出執(zhí)行該調(diào)用的耗時(shí)
metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)
// 方法對應(yīng)的錯誤碼
metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
return resp, err
}
}
添加自定義攔截器
除了內(nèi)置了豐富的攔截器之外,zRPC 同時(shí)支持添加自定義攔截器
Client 端通過 AddInterceptor 方法添加一元攔截器:
func (rc *RpcClient) AddInterceptor(interceptor grpc.UnaryClientInterceptor) {
rc.client.AddInterceptor(interceptor)
}
Server 端通過 AddUnaryInterceptors 方法添加一元攔截器:
func (rs *RpcServer) AddUnaryInterceptors(interceptors ...grpc.UnaryServerInterceptor) {
rs.server.AddUnaryInterceptors(interceptors...)
}
resolver 模塊
zRPC 服務(wù)注冊架構(gòu)圖:

zRPC 中自定義了 resolver 模塊,用來實(shí)現(xiàn)服務(wù)的注冊功能。zRPC 底層依賴 gRPC,在 gRPC 中要想自定義 resolver 需要實(shí)現(xiàn) resolver.Builder 接口:
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}
其中 Build 方法返回 Resolver,Resolver 定義如下:
type Resolver interface {
ResolveNow(ResolveNowOptions)
Close()
}
在 zRPC 中定義了兩種 resolver,direct 和 discov,這里我們主要分析基于 etcd 做服務(wù)發(fā)現(xiàn)的 discov,自定義的 resolver 需要通過 gRPC 提供了 Register 方法進(jìn)行注冊代碼如下:
func RegisterResolver() {
resolver.Register(&dirBuilder)
resolver.Register(&disBuilder)
}
當(dāng)我們啟動我們的 zRPC Server 的時(shí)候,調(diào)用 Start 方法,會像 etcd 中注冊對應(yīng)的服務(wù)地址:
func (ags keepAliveServer) Start(fn RegisterFn) error {
// 注冊服務(wù)地址
if err := ags.registerEtcd(); err != nil {
return err
}
// 啟動服務(wù)
return ags.Server.Start(fn)
}
當(dāng)我們啟動 zRPC 客戶端的時(shí)候,在 gRPC 內(nèi)部會調(diào)用我們自定義 resolver 的 Build 方法,zRPC 通過在 Build 方法內(nèi)調(diào)用執(zhí)行了 resolver.ClientConn 的 UpdateState 方法,該方法會把服務(wù)地址注冊到 gRPC 客戶端內(nèi)部:
func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
return r == EndpointSepChar
})
// 服務(wù)發(fā)現(xiàn)
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}
update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
// 向gRPC注冊服務(wù)地址
cc.UpdateState(resolver.State{
Addresses: addrs,
})
}
// 監(jiān)聽
sub.AddListener(update)
update()
// 返回自定義的resolver.Resolver
return &nopResolver{cc: cc}, nil
}
在 discov 中,通過調(diào)用 load 方法從 etcd 中獲取指定服務(wù)的所有地址:
func (c *cluster) load(cli EtcdClient, key string) {
var resp *clientv3.GetResponse
for {
var err error
ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
// 從etcd中獲取指定服務(wù)的所有地址
resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
cancel()
if err == nil {
break
}
logx.Error(err)
time.Sleep(coolDownInterval)
}
var kvs []KV
c.lock.Lock()
for _, ev := range resp.Kvs {
kvs = append(kvs, KV{
Key: string(ev.Key),
Val: string(ev.Value),
})
}
c.lock.Unlock()
c.handleChanges(key, kvs)
}
并通過 watch 監(jiān)聽服務(wù)地址的變化:
func (c *cluster) watch(cli EtcdClient, key string) {
rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
for {
select {
case wresp, ok := <-rch:
if !ok {
logx.Error("etcd monitor chan has been closed")
return
}
if wresp.Canceled {
logx.Error("etcd monitor chan has been canceled")
return
}
if wresp.Err() != nil {
logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
return
}
// 監(jiān)聽變化通知更新
c.handleWatchEvents(key, wresp.Events)
case <-c.done:
return
}
}
}
這部分主要介紹了 zRPC 中是如何自定義的 resolver,以及基于 etcd 的服務(wù)發(fā)現(xiàn)原理,通過這部分的介紹大家可以了解到 zRPC 內(nèi)部服務(wù)注冊發(fā)現(xiàn)的原理,源代碼比較多只是粗略的從整個(gè)流程上進(jìn)行了分析,如果大家對 zRPC 的源碼比較感興趣可以自行進(jìn)行學(xué)習(xí)
balancer 模塊
負(fù)載均衡原理圖:

避免過載是負(fù)載均衡策略的一個(gè)重要指標(biāo),好的負(fù)載均衡算法能很好的平衡服務(wù)端資源。常用的負(fù)載均衡算法有輪訓(xùn)、隨機(jī)、Hash、加權(quán)輪訓(xùn)等。但為了應(yīng)對各種復(fù)雜的場景,簡單的負(fù)載均衡算法往往表現(xiàn)的不夠好,比如輪訓(xùn)算法當(dāng)服務(wù)響應(yīng)時(shí)間變長就很容易導(dǎo)致負(fù)載不再平衡, 因此 zRPC 中自定義了默認(rèn)負(fù)載均衡算法 P2C(Power of Two Choices),和 resolver 類似,要想自定義 balancer 也需要實(shí)現(xiàn) gRPC 定義的 balancer.Builder 接口,由于和 resolver 類似這里不再帶大家一起分析如何自定義 balancer,感興趣的朋友可以查看 gRPC 相關(guān)的文檔來進(jìn)行學(xué)習(xí)
注意,zRPC 是在客戶端進(jìn)行負(fù)載均衡,常見的還有通過 nginx 中間代理的方式
zRPC 框架中默認(rèn)的負(fù)載均衡算法為 P2C,該算法的主要思想是:
從可用節(jié)點(diǎn)列表中做兩次隨機(jī)選擇操作,得到節(jié)點(diǎn) A、B
比較 A、B 兩個(gè)節(jié)點(diǎn),選出負(fù)載最低的節(jié)點(diǎn)作為被選中的節(jié)點(diǎn)
偽代碼如下:

主要算法邏輯在 Pick 方法中實(shí)現(xiàn):
func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
p.lock.Lock()
defer p.lock.Unlock()
var chosen *subConn
switch len(p.conns) {
case 0:
return nil, nil, balancer.ErrNoSubConnAvailable
case 1:
chosen = p.choose(p.conns[0], nil)
case 2:
chosen = p.choose(p.conns[0], p.conns[1])
default:
var node1, node2 *subConn
for i := 0; i < pickTimes; i++ {
// 隨機(jī)數(shù)
a := p.r.Intn(len(p.conns))
b := p.r.Intn(len(p.conns) - 1)
if b >= a {
b++
}
// 隨機(jī)獲取所有節(jié)點(diǎn)中的兩個(gè)節(jié)點(diǎn)
node1 = p.conns[a]
node2 = p.conns[b]
// 效驗(yàn)節(jié)點(diǎn)是否健康
if node1.healthy() && node2.healthy() {
break
}
}
// 選擇其中一個(gè)節(jié)點(diǎn)
chosen = p.choose(node1, node2)
}
atomic.AddInt64(&chosen.inflight, 1)
atomic.AddInt64(&chosen.requests, 1)
return chosen.conn, p.buildDoneFunc(chosen), nil
}
choose 方法對隨機(jī)選擇出來的節(jié)點(diǎn)進(jìn)行負(fù)載比較從而最終確定選擇哪個(gè)節(jié)點(diǎn)
func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
start := int64(timex.Now())
if c2 == nil {
atomic.StoreInt64(&c1.pick, start)
return c1
}
if c1.load() > c2.load() {
c1, c2 = c2, c1
}
pick := atomic.LoadInt64(&c2.pick)
if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
return c2
} else {
atomic.StoreInt64(&c1.pick, start)
return c1
}
}
上面主要介紹了 zRPC 默認(rèn)負(fù)載均衡算法的設(shè)計(jì)思想和代碼實(shí)現(xiàn),那自定義的 balancer 是如何注冊到 gRPC 的呢,resolver 提供了 Register 方法來進(jìn)行注冊,同樣 balancer 也提供了 Register 方法來進(jìn)行注冊:
func init() {
balancer.Register(newBuilder())
}
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
}
注冊 balancer 之后 gRPC 怎么知道使用哪個(gè) balancer 呢?這里我們需要使用配置項(xiàng)進(jìn)行配置,在 NewClient 的時(shí)候通過 grpc.WithBalancerName 方法進(jìn)行配置:
func NewClient(target string, opts ...ClientOption) (*client, error) {
var cli client
opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
if err := cli.dial(target, opts...); err != nil {
return nil, err
}
return &cli, nil
}
這部分主要介紹了 zRPC 中內(nèi)中的負(fù)載均衡算法的實(shí)現(xiàn)原理以及具體的實(shí)現(xiàn)方式,之后介紹了 zRPC 是如何注冊自定義的 balancer 以及如何選擇自定義的 balancer,通過這部分大家應(yīng)該對負(fù)載均衡有了更進(jìn)一步的認(rèn)識
總結(jié)
首先,介紹了 zRPC 的基本使用方法,可以看到 zRPC 使用非常簡單,只需要少數(shù)幾行代碼就可以構(gòu)建高性能和自帶服務(wù)治理能力的 RPC 服務(wù),當(dāng)然這里沒有面面俱到的介紹 zRPC 的基本使用,大家可以查看相關(guān)文檔進(jìn)行學(xué)習(xí)
接著,介紹了 zRPC 的幾個(gè)重要組成模塊以及其實(shí)現(xiàn)原理,并分析了部分源碼。攔截器模塊是整個(gè) zRPC 的重點(diǎn),其中內(nèi)置了豐富的功能,像熔斷、監(jiān)控、降載等等也是構(gòu)建高可用微服務(wù)必不可少的。resolver 和 balancer 模塊自定義了 gRPC 的 resolver 和 balancer,通過該部分可以了解到整個(gè)服務(wù)注冊與發(fā)現(xiàn)的原理以及如何構(gòu)建自己的服務(wù)發(fā)現(xiàn)系統(tǒng),同時(shí)自定義負(fù)載均衡算法也變得不再神秘
最后,zRPC 是一個(gè)經(jīng)歷過各種工程實(shí)踐的 RPC 框架,不論是想要用于生產(chǎn)還是學(xué)習(xí)其中的設(shè)計(jì)模式都是一個(gè)不可多得的開源項(xiàng)目。希望通過這篇文章的介紹大家能夠進(jìn)一步了解 zRPC
項(xiàng)目地址
https://github.com/tal-tech/go-zero
框架地址
https://github.com/tal-tech/go-zero/tree/master/zrpc
文檔地址
https://www.yuque.com/tal-tech/go-zero/rhakzy
