Golang 從零到一開發(fā)實(shí)現(xiàn) RPC 框架
內(nèi)容提要
RPC 框架是分布式領(lǐng)域核心組件,也是微服務(wù)的基礎(chǔ)。今天嘗試從零擼一個(gè) RPC 框架,剖析其核心原理及代碼實(shí)現(xiàn),后續(xù)還會(huì)逐步迭代追加微服務(wù)治理等功能,將之前文章覆蓋的熔斷、限流、負(fù)載均衡、注冊(cè)發(fā)現(xiàn)等功能融合進(jìn)來(lái),打造一個(gè)五臟俱全的 RPC 框架。本文主要內(nèi)容包括:
- RPC 實(shí)現(xiàn)原理
RPC 協(xié)議設(shè)計(jì)
RPC 服務(wù)端實(shí)現(xiàn)
RPC 客戶端實(shí)現(xiàn)
實(shí)現(xiàn)原理
RPC (Remote Procedure Call)全稱是遠(yuǎn)程過(guò)程調(diào)用,相對(duì)于本地方法調(diào)用,在同一內(nèi)存空間可以直接通過(guò)方法棧實(shí)現(xiàn)調(diào)用,遠(yuǎn)程調(diào)用則跨了不同的服務(wù)終端,并不能直接調(diào)用。

RPC框架 要解決的就是遠(yuǎn)程方法調(diào)用的問(wèn)題,并且實(shí)現(xiàn)調(diào)用遠(yuǎn)程服務(wù)像調(diào)用本地服務(wù)一樣簡(jiǎn)單,框架內(nèi)部封裝實(shí)現(xiàn)了網(wǎng)絡(luò)調(diào)用的細(xì)節(jié)(透明化遠(yuǎn)程調(diào)用),其核心過(guò)程原理如下圖所示。

這個(gè)版本可以稱為 “P2P?RPC” ,而生產(chǎn)環(huán)境部署往往會(huì)將服務(wù)提供者(Server)部署多個(gè)實(shí)例(集群部署),那么客戶端就需要具備發(fā)現(xiàn)服務(wù)端的能力和負(fù)載均衡的支持,所以有了服務(wù)注冊(cè)發(fā)現(xiàn)和負(fù)載均衡。

再然后,為了保障 RPC 調(diào)用的可靠性和穩(wěn)定性,增加了服務(wù)監(jiān)控和服務(wù)容錯(cuò)治理的能力,考慮性能提升的異步化能力以及考慮可擴(kuò)展性的插件化管理,這些完善構(gòu)成了更完整的微服務(wù) RPC 框架。
RPC 協(xié)議實(shí)現(xiàn)
協(xié)議設(shè)計(jì)
協(xié)議設(shè)計(jì)算是 RPC 最重要的一部分了,它主要解決服務(wù)端與客戶端通信的問(wèn)題。一般來(lái)說(shuō)通訊要解決如下問(wèn)題:

1. 網(wǎng)絡(luò)傳輸協(xié)議
基于 TCP、UDP 還是 HTTP,UDP 要自己解決可靠性傳輸問(wèn)題,而 HTTP 又太重,包含很多沒(méi)必要的頭信息,所以一般 RPC 框架會(huì)優(yōu)先選擇 TCP 協(xié)議。
(當(dāng)然也有大名鼎鼎的 gRPC 基于 HTTP2)
2. 序列化協(xié)議
網(wǎng)絡(luò)傳輸數(shù)據(jù)必須是二進(jìn)制數(shù)據(jù),而執(zhí)行過(guò)程是編程語(yǔ)言的對(duì)象方法,那么就涉及到如何將對(duì)象序列化成可傳輸消息(二進(jìn)制),并可反序列化還原。常見(jiàn)的通用型協(xié)議如 XML、 JSON、Protobuf、Thrift 等,也有語(yǔ)言綁定的如 Python 原生支持的?pickle 協(xié)議,?Java 實(shí)現(xiàn)的 Serializbale 接口及 Hessian 協(xié)議,Golang 原生支持的 Gob 協(xié)議等。
3. 消息編碼協(xié)議
它是一種客戶端和服務(wù)端的調(diào)用約定,比如請(qǐng)求和參數(shù)如何組織,Header 放置什么內(nèi)容。這部分每個(gè)框架設(shè)計(jì)均不同,有時(shí)也稱這一層為狹義的 RPC 協(xié)議層。
另外客戶端發(fā)起調(diào)用一般來(lái)說(shuō)要知道調(diào)用的具體類方法(請(qǐng)求標(biāo)識(shí)符)以及入?yún)ⅲ≒ayload),而網(wǎng)絡(luò)傳輸?shù)氖嵌M(jìn)制字節(jié)流,如何能從這些字節(jié)中找出哪些是方法名,哪些是參數(shù)?進(jìn)一步如果客戶端不斷的發(fā)送消息,如何將每一條消息分割?(解決 TCP 粘包問(wèn)題)
采用定長(zhǎng)消息很容易解決,但事先并不能確定要固定多長(zhǎng),所以這種方式并不可行。消息加分隔符可以實(shí)現(xiàn),但要確保分隔符不會(huì)與正文沖突。而最常用的實(shí)現(xiàn)方案就是用定長(zhǎng)的頭標(biāo)識(shí)出不定長(zhǎng)的體,比如用 int32 (定長(zhǎng) 4 字節(jié))標(biāo)識(shí)后面的內(nèi)容長(zhǎng)度,這樣就能較優(yōu)雅實(shí)現(xiàn)消息分割了。
(注:這個(gè)方案中如果消息體的長(zhǎng)度大于 2^32 會(huì)發(fā)生溢出而導(dǎo)致解析失敗,可以換更長(zhǎng)類型,但理論上總會(huì)有溢出風(fēng)險(xiǎn),設(shè)計(jì)使用時(shí)應(yīng)該限制避免傳輸過(guò)大數(shù)據(jù)體)
協(xié)議實(shí)現(xiàn)
網(wǎng)絡(luò)傳輸協(xié)議,這里使用 TCP 協(xié)議即可,沒(méi)有太多爭(zhēng)議,可預(yù)留接口支持。
序列化協(xié)議,這里使用 Golang 專有的 Gob 協(xié)議,保留接口后期可以擴(kuò)展支持 JSON、Protobuf 等協(xié)議。
type?Codec?interface?{
????Encode(i?interface{})?([]byte,?error)
????Decode(data?[]byte,?i?interface{})?error
}
type?GobCodec?struct{}
func?(c?GobCodec)?Encode(i?interface{})?([]byte,?error)?{
????var?buffer?bytes.Buffer
????encoder?:=?gob.NewEncoder(&buffer)
????if?err?:=?encoder.Encode(i);?err?!=?nil?{
????????return?nil,?err?
????}???
????return?buffer.Bytes(),?nil?
}
func?(c?GobCodec)?Decode(data?[]byte,?i?interface{})?error?{
????buffer?:=?bytes.NewBuffer(data)
????decoder?:=?gob.NewDecoder(buffer)
????return?decoder.Decode(i)
}
codec/codec.go
RPC 消息格式編碼設(shè)計(jì)如下,協(xié)議消息頭定義定長(zhǎng) 5 字節(jié)(byte),依次放置魔術(shù)數(shù)(用于校驗(yàn)),協(xié)議版本,消息類型(區(qū)分請(qǐng)求/響應(yīng)),壓縮類型,序列化協(xié)議類型,每個(gè)占 1 個(gè)字節(jié)(8 個(gè) bit)。可擴(kuò)展追加 消息 ID 以及 元數(shù)據(jù) 等信息用于做服務(wù)治理。
const?(
????HEADER_LEN?=?5
)
const?(
????magicNumber?byte?=?0x06
)
type?MsgType?byte
const?(
????Request?MsgType?=?iota
????Response
)
type?CompressType?byte
const?(
????None?CompressType?=?iota
????Gzip
)
type?SerializeType?byte
const?(
????Gob?SerializeType?=?iota
????JSON
)
type?Header?[HEADER_LEN]byte
func?(h?*Header)?CheckMagicNumber()?bool?{
????return?h[0]?==?magicNumber
}
func?(h?*Header)?Version()?byte?{
????return?h[1]
}
func?(h?*Header)?SetVersion(version?byte)?{
????h[1]?=?version
}
//省略 MsgType,CompressType,SerializeType
protocol/header.go
定義協(xié)議消息格式,除了協(xié)議頭,還包括調(diào)用的服務(wù)類名、方法名以及參數(shù)(Payload)。type?RPCMsg?struct?{
????*Header
????ServiceClass??string
????ServiceMethod?string
????Payload???????[]byte
}
func?NewRPCMsg()?*RPCMsg?{
????header?:=?Header([HEADER_LEN]byte{})
????header[0]?=?magicNumber
????return?&RPCMsg{
????????Header:?&header,
????}
}
protocol/msg.go
實(shí)現(xiàn)傳輸
定義好協(xié)議后,要解決的問(wèn)題就是如何通過(guò)網(wǎng)絡(luò)(IO)發(fā)送和接收,實(shí)現(xiàn)通信的目的。
func?(msg?*RPCMsg)?Send(writer?io.Writer)?error?{
????//send?header
????_,?err?:=?writer.Write(msg.Header[:])
????if?err?!=?nil?{
????????return?err
????}
????//寫入消息體總長(zhǎng)度,方便一次性解析
????dataLen?:=?SPLIT_LEN?+?len(msg.ServiceClass)?+?SPLIT_LEN?+?len(msg.ServiceMethod)?+?SPLIT_L
EN?+?len(msg.Payload)
????err?=?binary.Write(writer,?binary.BigEndian,?uint32(dataLen))?//4
????if?err?!=?nil?{
????????return?err
????}
????//write?service.class?len
????err?=?binary.Write(writer,?binary.BigEndian,?uint32(len(msg.ServiceClass)))
????if?err?!=?nil?{
????????return?err
????}
????//write?service.class content
????err?=?binary.Write(writer,?binary.BigEndian,?util.StringToByte(msg.ServiceClass))
????if?err?!=?nil?{
????????return?err
????}
????//省略 service.method,payload?
}protocol/msg.go
其中類名、方法名、payload 均為不定長(zhǎng)部分,要想順利解析就需要一一對(duì)應(yīng)的長(zhǎng)度字段標(biāo)識(shí)不定長(zhǎng)的長(zhǎng)度,也就是 SPLIT_LEN 代表各部分長(zhǎng)度,是 int32 類型(32 bit),正好相當(dāng)于 4 個(gè) byte,所以 SPLIT_LEN 為 4。另外要注意網(wǎng)絡(luò)傳輸一般使用大端字節(jié)序。先理解字節(jié)序即為字節(jié)(byte)的組成順序,分為大端序(最高有效位放低地址)和小端序(最低有效位放低地址)。CPU 一般采用小端序讀寫,而 TCP 網(wǎng)絡(luò)傳輸采用大端序則更方便。對(duì)應(yīng)這里的 binary.BigEndian 代碼實(shí)現(xiàn)大端序。
func?Read(r?io.Reader)?(*RPCMsg,?error)?{
????msg?:=?NewRPCMsg()
????err?:=?msg.Decode(r)
????if?err?!=?nil?{
????????return?nil,?err
????}
????return?msg,?nil
}
func?(msg?*RPCMsg)?Decode(r?io.Reader)?error?{
????//read?header
????_,?err?:=?io.ReadFull(r,?msg.Header[:])
????if?!msg.Header.CheckMagicNumber()?{?//magicNumber
????????return?fmt.Errorf("magic?number?error:?%v",?msg.Header[0])
????}
????//total?body?len????
headerByte?:=?make([]byte,?4)
????_,?err?=?io.ReadFull(r,?headerByte)
????if?err?!=?nil?{
????????return?err
????}
????bodyLen?:=?binary.BigEndian.Uint32(headerByte)
????//一次將整個(gè)body讀取,再依次拆解
????data?:=?make([]byte,?bodyLen)
????_,?err?=?io.ReadFull(r,?data)
????//service.class?len
????start?:=?0
????end?:=?start?+?SPLIT_LEN
????classLen?:=?binary.BigEndian.Uint32(data[start:end])?//0,4
????//service.class
????start?=?end
????end?=?start?+?int(classLen)
????msg.ServiceClass?=?util.ByteToString(data[start:end])?//4,x
????//省略 method,payload}
解決了最復(fù)雜的協(xié)議部分,下面依次來(lái)看服務(wù)端和客戶端的實(shí)現(xiàn)。
服務(wù)端實(shí)現(xiàn)
服務(wù)端實(shí)現(xiàn)主要包括服務(wù)啟停(端口監(jiān)聽(tīng))、服務(wù)注冊(cè)、響應(yīng)連接和處理請(qǐng)求幾部分。
定義服務(wù)接口
服務(wù)接口提供服務(wù)啟停和處理方法注冊(cè)的能力。
type?Server?interface?{
????Register(string,?interface{})
????Run()
????Close()
}
provider/server.go
服務(wù)啟停
實(shí)現(xiàn)服務(wù)啟停,關(guān)鍵在于通過(guò) ip 和端口開啟監(jiān)聽(tīng),這里通過(guò) Listener 封裝 net 包開啟 tcp Listen。
type?RPCServer?struct?{
????listener?Listener
}
func?NewRPCServer(ip?string,?port?int)?*RPCServer?{
????return?&RPCServer{
????????listener:?NewRPCListener(ip,?port),
????}???
}
func?(svr?*RPCServer)?Run()?{
????go?svr.listener.Run()
}
func?(svr?*RPCServer)?Close()?{
????if?svr.listener?!=?nil?{
????????svr.listener.Close()
????}
}
provider/server.go
type?Listener?interface?{
????Run()
????SetHandler(string,?Handler)
????Close()
}
type?RPCListener?struct?{
????ServiceIp???string
????ServicePort?int
????Handlers????map[string]Handler
????nl??????????net.Listener
}
func?NewRPCListener(serviceIp?string,?servicePort?int)?*RPCListener?{
????return?&RPCListener{ServiceIp:?serviceIp,
????????ServicePort:?servicePort,
????????Handlers:????make(map[string]Handler)}
}
func?(l?*RPCListener)?Run()?{
????addr?:=?fmt.Sprintf("%s:%d",?l.ServiceIp,?l.ServicePort)
????nl,?err?:=?net.Listen(config.NET_TRANS_PROTOCOL,?addr) //tcp
????if?err?!=?nil?{
????????panic(err)
????}???
????l.nl?=?nl
????for?{
????????conn,?err?:=?l.nl.Accept()
????????if?err?!=?nil?{
????????????continue
????????}??
????????go?l.handleConn(conn)
????}
}
func?(l?*RPCListener)?Close()?{ if l.nl != nil {????
l.nl.Close()
}
}
provider/listener.go
這里通過(guò)為每個(gè)連接創(chuàng)建一個(gè)協(xié)程處理請(qǐng)求,得益于 Golang 的協(xié)程優(yōu)勢(shì),Thread-Per-Message 模式來(lái)滿足并發(fā)請(qǐng)求更容易實(shí)現(xiàn)(Java 線程成本太大,一般采用線程池實(shí)現(xiàn) Worker Thread 模式)。
服務(wù)注冊(cè)就是在內(nèi)存中維護(hù)一個(gè)映射關(guān)系,map (key=服務(wù)名,value=對(duì)象實(shí)例),通過(guò) interface{} 泛化,可以反射還原。服務(wù)注冊(cè)
func?(svr?*RPCServer)?Register(class?interface{})?{
????name?:=?reflect.Indirect(reflect.ValueOf(class)).Type().Name()
????svr.RegisterName(name,?class)
}
func?(svr?*RPCServer)?RegisterName(name?string,?class?interface{})?{
????handler?:=?&RPCServerHandler{class:?reflect.ValueOf(class)}
????svr.listener.SetHandler(name,?handler)
????log.Printf("%s?registered?success!\n",?name)
}
func?(l?*RPCListener)?SetHandler(name?string,?handler?Handler)?{
????if?_,?ok?:=?l.Handlers[name];?ok?{
????????log.Printf("%s?is?registered!\n",?name)
????????return
????}
????l.Handlers[name]?=?handler
}
provider/server.go
(2)這里沒(méi)有注冊(cè)到服務(wù)注冊(cè)中心,設(shè)計(jì)考慮將以應(yīng)用服務(wù)(系統(tǒng))為單位進(jìn)行注冊(cè),而具體的服務(wù)接口通過(guò)應(yīng)用內(nèi)存映射。這種注冊(cè)粒度大,優(yōu)點(diǎn)就是減少對(duì)注冊(cè)中心的依賴和注冊(cè)實(shí)例數(shù)量,提高服務(wù)發(fā)現(xiàn)資源利用率。(dubbo 3 重要改進(jìn)就是將接口級(jí)服務(wù)發(fā)現(xiàn)切換為應(yīng)用級(jí)服務(wù)發(fā)現(xiàn))
響應(yīng)連接請(qǐng)求
整個(gè)過(guò)程依次涉及從網(wǎng)絡(luò)連接讀取數(shù)據(jù),反序列化獲得請(qǐng)求結(jié)構(gòu)體 (RPCMsg),根據(jù)注冊(cè)類和方法找到目標(biāo)函數(shù)并執(zhí)行,將執(zhí)行結(jié)果序列化后封裝成 RPCMsg 通過(guò)網(wǎng)絡(luò)發(fā)送,整個(gè)過(guò)程是同步 io 模型。
func?(l?*RPCListener)?handleConn(conn?net.Conn)?{
????defer?catchPanic()
????for?{
????????msg,?err?:=?l.receiveData(conn)
????????if?err?!=?nil?||?msg?==?nil?{
????????????return
????????}
????????coder?:=?global.Codecs[msg.Header.SerializeType()]
????????if?coder?==?nil?{
????????????return
????????}
????????inArgs?:=?make([]interface{},?0)
????????err?=?coder.Decode(msg.Payload,?&inArgs)
????????if?err?!=?nil?{
????????????return
????????}
????????handler,?ok?:=?l.Handlers[msg.ServiceClass]
????????if?!ok?{
????????????return
????????}
????????result,?err?:=?handler.Handle(msg.ServiceMethod,?inArgs)
????????encodeRes,?err?:=?coder.Encode(result)?
????????if?err?!=?nil?{
????????????return
????????}
????????err?=?l.sendData(conn,?encodeRes)
????????if?err?!=?nil?{
????????????return
????????}
????}
}provider/listener.go
其中實(shí)際執(zhí)行本地方法過(guò)程如下:func?(handler?*RPCServerHandler)?Handle(method?string,?params?[]interface{})?([]interface{},?error)?{
????args?:=?make([]reflect.Value,?len(params))
????for?i?:=?range?params?{
????????args[i]?=?reflect.ValueOf(params[i])
????}?
????reflectMethod?:=?handler.class.MethodByName(method)????
????result?:=?reflectMethod.Call(args)
????resArgs?:=?make([]interface{},?len(result))
????for?i?:=?0;?i?len(result);?i++?{
????????resArgs[i]?=?result[i].Interface()
????}?
????var?err?error
????if?_,?ok?:=?result[len(result)-1].Interface().(error);?ok?{
????????err?=?result[len(result)-1].Interface().(error)
????}
????return?resArgs,?err
}provider/handler.go
收發(fā)網(wǎng)絡(luò)請(qǐng)求通過(guò)調(diào)用之前封裝的 Protocol 來(lái)完成。func?(l?*RPCListener)?receiveData(conn?net.Conn)?(*protocol.RPCMsg,?error)?{
????msg,?err?:=?protocol.Read(conn)
????if?err?!=?nil?{
????????if?err?!=?io.EOF?{?//close
????????????return?nil,?err
????????}
????}
????return?msg,?nil
}
func?(l?*RPCListener)?sendData(conn?net.Conn,?payload?[]byte)?error?{
????resMsg?:=?protocol.NewRPCMsg()
????resMsg.SetVersion(config.Protocol_MsgVersion)
????resMsg.SetMsgType(protocol.Response)
????resMsg.SetCompressType(protocol.None)
????resMsg.SetSerializeType(protocol.Gob)
????resMsg.Payload?=?payload
????return??resMsg.Send(conn)
}
provider/listener.go
測(cè)試服務(wù)端
通過(guò)環(huán)境變量注入 ip 和 port,開啟服務(wù)監(jiān)聽(tīng),依次注冊(cè)幾個(gè)服務(wù)。
func?main()?{
????flag.Parse()
????if?ip?==?""?||?port?==?0?{
????????panic("init?ip?and?port?error")
????}
????srv?:=?provider.NewRPCServer(ip,?port)
????srv.RegisterName("User",?&UserHandler{})
????srv.RegisterName("Test",?&TestHandler{})
????gob.Register(User{})
????go?srv.Run()
????quit?:=?make(chan?os.Signal)
????signal.Notify(quit,?syscall.SIGINT,?syscall.SIGTERM,?syscall.SIGHUP,?syscall.SIGQUIT)
????<-quit
????srv.Close()
}
server.go
這里注冊(cè)兩個(gè)結(jié)構(gòu)體?User 和 Test,特別注意:只有可導(dǎo)出的類方法(首字母大寫)才能被客戶端調(diào)用執(zhí)行,否則會(huì)找不到對(duì)應(yīng)類方法而失敗。此外 User 作為接口值實(shí)現(xiàn)傳輸必須注冊(cè)才行(gob.Register(User{}))。
type?TestHandler?struct{}
func?(t?*TestHandler)?Hello()?string?{
????return?"hello?world"
}
type?User?struct?{
????ID???int????`json:"id"`
????Name?string?`json:"name"`
????Age??int????`json:"age"`
}
var?userList?=?map[int]User{
????1:?User{1,?"hero",?11},
????2:?User{2,?"kavin",?12},
}
type?UserHandler?struct{}
func?(u?*UserHandler)?GetUserById(id?int)?(User,?error)?{
????if?u,?ok?:=?userList[id];?ok?{
????????return?u,?nil
????}
????return?User{},?fmt.Errorf("id?%d?not?found",?id)
}server.go

客戶端實(shí)現(xiàn)
客戶端發(fā)起 RPC 調(diào)用,就像調(diào)本地服務(wù)一樣,所以需要定義一個(gè) stub,該 stub 同請(qǐng)求服務(wù)端方法簽名一致,然后通過(guò)代理實(shí)現(xiàn)網(wǎng)絡(luò)請(qǐng)求和解析。
var?Hello?func()?string
r,?err?:=?cli.Call(ctx,?"UserService.Test.Hello",?&Hello)
var?GetUserById?func(id?int)?(User,?error)
_,?err?:=?cli.Call(ctx,?"UserService.User.GetUserById",?&GetUserById)
u,?err?:=?GetUserById(2)
定義客戶端
定義客戶端接口,其中 Invoke?代理執(zhí)行 RPC 請(qǐng)求。
type?Client?interface?{
????Connect(string)?error
????Invoke(context.Context,?*Service,?interface{},?...interface{})?(interface{},?error)
????Close()
}
consumer/client.go
定義連接參數(shù),設(shè)置重試次數(shù)、超時(shí)時(shí)間、序列化協(xié)議、壓縮類型等。type?Option?struct?{
????Retries???????????int
????ConnectionTimeout?time.Duration
????SerializeType?????protocol.SerializeType
????CompressType??????protocol.CompressType
}
var?DefaultOption?=?Option{
????Retries:???????????3,
????ConnectionTimeout:?5?*?time.Second,
????SerializeType:?????protocol.Gob,
????CompressType:??????protocol.None,
}
type?RPCClient?struct?{
????conn???net.Conn
????option?Option
}
func?NewClient(option?Option)?Client?{
????return?&RPCClient{option:?option}
}
consumer/client.go
執(zhí)行請(qǐng)求
實(shí)現(xiàn)網(wǎng)絡(luò)連接、關(guān)閉以及執(zhí)行部分。
func?(cli?*RPCClient)?Connect(addr?string)?error?{
????conn,?err?:=?net.DialTimeout(config.NET_TRANS_PROTOCOL,?addr,?cli.option.ConnectionTimeout)
????if?err?!=?nil?{
????????return?err
????}
????cli.conn?=?conn
????return?nil
}
func?(cli?*RPCClient)?Invoke(ctx?context.Context,?service?*Service,?stub?interface{},?params?...interface{})?(interface{},?error)?{
????cli.makeCall(service,?stub)
????return?cli.wrapCall(ctx,?stub,?params...)
}
func?(cli?*RPCClient)?Close()?{
????if?cli.conn?!=?nil?{
????????cli.conn.Close()
????}
}
consumer/client.go
執(zhí)行代理過(guò)程,主要依賴反射實(shí)現(xiàn)。這里 cli.makeCall()?主要是通過(guò)反射來(lái)生成代理函數(shù),在代理函數(shù)中完成網(wǎng)絡(luò)連接、請(qǐng)求數(shù)據(jù)序列化、網(wǎng)絡(luò)傳輸、響應(yīng)返回?cái)?shù)據(jù)解析的工作,然后通過(guò) cli.wrapCall() 發(fā)起實(shí)際調(diào)用。func?(cli?*RPCClient)?makeCall(service?*Service,?methodPtr?interface{})?{
????container?:=?reflect.ValueOf(methodPtr).Elem()?
????coder?:=?global.Codecs[cli.option.SerializeType]
????handler?:=?func(req?[]reflect.Value)?[]reflect.Value?{??
????????numOut?:=?container.Type().NumOut()
????????errorHandler?:=?func(err?error)?[]reflect.Value?{
????????????outArgs?:=?make([]reflect.Value,?numOut)
????????????for?i?:=?0;?i?len(outArgs)-1;?i++?{
????????????????outArgs[i]?=?reflect.Zero(container.Type().Out(i))
????????????}
????????????outArgs[len(outArgs)-1]?=?reflect.ValueOf(&err).Elem()
????????????return?outArgs
????????}
????????inArgs?:=?make([]interface{},?0,?len(req))
????????for?_,?arg?:=?range?req?{
????????????inArgs?=?append(inArgs,?arg.Interface())
????????}
????????payload,?err?:=?coder.Encode(inArgs)?//[]byte
????????if?err?!=?nil?{
????????????log.Printf("encode?err:%v\n",?err)
????????????return?errorHandler(err)
????????}
????????msg?:=?protocol.NewRPCMsg()
????????msg.SetVersion(config.Protocol_MsgVersion)
????????msg.SetMsgType(protocol.Request)
????????msg.SetCompressType(cli.option.CompressType)
????????msg.SetSerializeType(cli.option.SerializeType)
????????msg.ServiceClass?=?service.Class
????????msg.ServiceMethod?=?service.Method
????????msg.Payload?=?payload
????????err?=?msg.Send(cli.conn)
????????if?err?!=?nil?{
????????????log.Printf("send?err:%v\n",?err)
????????????return?errorHandler(err)
????????}
????????respMsg,?err?:=?protocol.Read(cli.conn)
????????if?err?!=?nil?{
????????????return?errorHandler(err)
????????}
????????respDecode?:=?make([]interface{},?0)
????????err?=?coder.Decode(respMsg.Payload,?&respDecode)
????????if?err?!=?nil?{
????????????log.Printf("decode?err:%v\n",?err)
????????????return?errorHandler(err)
????????}
????????if?len(respDecode)?==?0?{
????????????respDecode?=?make([]interface{},?numOut)
????????}
????????outArgs?:=?make([]reflect.Value,?numOut)
????????for?i?:=?0;?i?????????????if?i?!=?numOut?{
????????????????if?respDecode[i]?==?nil?{
????????????????????outArgs[i]?=?reflect.Zero(container.Type().Out(i))
????????????????}?else?{
????????????????????outArgs[i]?=?reflect.ValueOf(respDecode[i])
????????????????}
????????????}?else?{
????????????????outArgs[i]?=?reflect.Zero(container.Type().Out(i))
????????????}
????????}
????????return?outArgs
????}
????container.Set(reflect.MakeFunc(container.Type(),?handler))
}
consumer/client.go
wrapCall 執(zhí)行實(shí)際函數(shù)調(diào)用。func?(cli?*RPCClient)?wrapCall(ctx?context.Context,?stub?interface{},?params?...interface{})?(interface{},?error)?{
????f?:=?reflect.ValueOf(stub).Elem()
????if?len(params)?!=?f.Type().NumIn()?{
????????return?nil,?errors.New(fmt.Sprintf("params?not?adapted:?%d-%d",?len(params),?f.Type().NumIn()))
????}
????in?:=?make([]reflect.Value,?len(params))
????for?idx,?param?:=?range?params?{
????????in[idx]?=?reflect.ValueOf(param)
????}
????result?:=?f.Call(in)
????return?result,?nil
}
consumer/client.go
代理實(shí)現(xiàn)
到目前為止,客戶端主要實(shí)現(xiàn)邏輯有了,但是客戶端在發(fā)起調(diào)用前是需要先連接到服務(wù)端,然后執(zhí)行調(diào)用,還有長(zhǎng)連接管理、超時(shí)、重試甚至鑒權(quán)等功能沒(méi)有實(shí)現(xiàn),因此需要有一個(gè)代理類完成以上動(dòng)作。
type?RPCClientProxy?struct?{
????option?Option
}
func?(cp?*RPCClientProxy)?Call(ctx?context.Context,?servicePath?string,?stub?interface{},?params?...interface{})?(interface{},?error)?{
????service,?err?:=?NewService(servicePath)
????if?err?!=?nil?{
????????return?nil,?err
????}???????
????client?:=?NewClient(cp.option)
????addr?:=?service.SelectAddr()
????err?=?client.Connect(addr)?//TODO 長(zhǎng)連接管理
????if?err?!=?nil?{
????????return?nil,?err
????}
????retries?:=?cp.option.Retries
????for?retries?>?0?{
????????retries--
????????return?client.Invoke(ctx,?service,?stub,?params...)
????}
????return?nil,?errors.New("error")
}
consumer/client_proxy.go
這里通過(guò)服務(wù)路徑拆分依次獲取類名、方法名、服務(wù) AppId,然后根據(jù)AppId 查找服務(wù)注冊(cè)中心獲取到服務(wù)端(服務(wù)提供者)地址。由于篇幅限制,這部分將在下一篇實(shí)現(xiàn)(包括注冊(cè)發(fā)現(xiàn)、負(fù)載均衡、長(zhǎng)連接管理等),測(cè)試方便這里直接寫死服務(wù)端地址。type?Service?struct?{
????AppId??string
????Class??string
????Method?string
????Addrs??[]string
}
//demo:?UserService.user.GetUser
func?NewService(servicePath?string)?(*Service,?error)?{
????arr?:=?strings.Split(servicePath,?".")
????service?:=?&Service{}
????if?len(arr)?!=?3?{?
????????return?service,?errors.New("service?path?inlegal")
????}???
????service.AppId?=?arr[0]
????service.Class?=?arr[1]
????service.Method?=?arr[2]
????return?service,?nil?
}
func?(service?*Service)?SelectAddr()?string?{
????return?"ip:8811"
}
consumer/service.go
測(cè)試客戶端
客戶端通過(guò) stub 發(fā)起調(diào)用,執(zhí)行過(guò)程看到發(fā)起了遠(yuǎn)程執(zhí)行并從服務(wù)端獲取到了結(jié)果。
func?main()?{
????gob.Register(User{})
????cli?:=?consumer.NewClientProxy(consumer.DefaultOption)
????ctx?:=?context.Background()
????var?GetUserById?func(id?int)?(User,?error)
????cli.Call(ctx,?"UserService.User.GetUserById",?&GetUserById)
????u,?err?:=?GetUserById(2)
????log.Println("result:",?u,?err)
????var?Hello?func()?string
????r,?err?:=?cli.Call(ctx,?"UserService.Test.Hello",?&Hello)
????log.Println("result:",?r,?err)
}
client.go

總結(jié)與補(bǔ)充
至此實(shí)現(xiàn)了簡(jiǎn)單的“P2P?RPC”,后續(xù)可以迭代加入注冊(cè)發(fā)現(xiàn)能力、長(zhǎng)連接管理、異步調(diào)用、插件化擴(kuò)展、負(fù)載均衡、認(rèn)證授權(quán)、容錯(cuò)治理等能力,希望大家多多支持。
感謝您的閱讀,歡迎點(diǎn)贊、轉(zhuǎn)發(fā)
