Kindling 之 Dubbo2 協(xié)議開發(fā)流程
1 項目概覽
Kindling collector 項目作為 Go 端分析器,使用類似 opentelmetry 的 pinpeline 進行數(shù)據(jù)分析。其中涉及 5 個組件:
UdsReceiver - Unix Domain Socket 接收器,接收探針事件并傳遞給后續(xù)的網(wǎng)絡分析器。 NetworkAnalyzer - 網(wǎng)絡事件分析器,將接收的 Read / Write 事件匹配成單次請求后,根據(jù)協(xié)議解析請求內(nèi)容生成關(guān)鍵指標,傳遞給后續(xù)的分析器。 K8sMetadataProcessor - K8S 指標處理器,補充 K8S 指標并傳遞給后續(xù)的聚合處理器 AggregateProcessor - 數(shù)據(jù)聚合處理器,將接收的指標數(shù)據(jù)生成 Trace 和 Metric,傳遞給給后續(xù)的轉(zhuǎn)發(fā)器。 OtelExporter - Opentelmetry 轉(zhuǎn)發(fā)器,將 Trace / Metric 數(shù)據(jù)轉(zhuǎn)發(fā)給 Prometheus 進行展示。
其中協(xié)議解析流程主要在 NetworkAnalyzer 組件中進行,將接收的請求 / 響應事件成對匹配后,交由 parseProtocols() 函數(shù)解析出協(xié)議指標。

1.1 協(xié)議解析流程
NetworkAnnalyzer.parseProtocols() 方法定義了整體解析流程,根據(jù)協(xié)議解析器分別解析請求和響應,當最終都成功時輸出指標。

1.2 協(xié)議解析設計思路
正常的協(xié)議解析只負責逐幀解析指標功能。

現(xiàn)已支持 5 種協(xié)議解析,當協(xié)議越來越多時,遍歷引起的解析會越來越耗時,那么需引入 fastfail 快速識別協(xié)議

對于復雜的多報文協(xié)議,如 Kafka 有不同的 API 報文,而相同 API 也有不同的版本報文。將所有報文解析邏輯都寫在一起會使整個類過于臃腫且不易維護。為此引入樹形多報文結(jié)構(gòu)用于快速且低耦合地實現(xiàn)開發(fā)。

1.2.1 報文解析結(jié)構(gòu)體
在樹形報文解析過程中,有如下 2 個場景需要考慮
當父協(xié)議解析了指標 A,子協(xié)議解析可能會用到 A 指標,那么父子協(xié)議解析的指標需要透傳。 父協(xié)議已解析了部分報文長度的內(nèi)容,那么子協(xié)議在開始解析時可直接跳過相應長度的內(nèi)容進行解析,此處引入偏移量用于下一個協(xié)議跳過解析。
定義 PayloadMessage,封裝報文內(nèi)容、讀取偏移量和指標存儲的 Map。
type PayloadMessage struct {
Data []byte
Offset int
attributeMap *model.AttributeMap
}
1.2.2 報文解析 API
由于引入?yún)f(xié)議樹,協(xié)議解析過程 parse() (ok bool) 將不再適用。協(xié)議樹中的個協(xié)議的解析成功不表示整個協(xié)議解析成功,需解析整顆樹的協(xié)議是否成功,將 API 擴展為 parse() (ok bool, complete bool)。
對于單層協(xié)議 (HTTP),返回為 (ok, true)

基于以上幾點需求,設計樹形結(jié)構(gòu)的報文解析器 PkgParser。PkgParser 定義了 fastFail(快速識別失敗) 和 parser(解析單個報文) 函數(shù);每個協(xié)議只需注冊自身的 PkgParser 即可接入整套流程。
fastFail(message *PayloadMessage) (fail bool)
- 聲明協(xié)議是否識別失敗,用于快速識別協(xié)議parser(message *PayloadMessage) (ok bool, complete bool)
- 解析協(xié)議,將解析出的指標存儲到 message 的 Attributes 中
- 返回是 2 個參數(shù):
- 是否解析成功
- 是否解析完成 (默認為 true,當為 false 主要是用于嵌套解析過程,例如先定義一個頭解析,再定義不同的消息體解析)。1.3 請求 / 響應解析
ProtocolParser 定義了請求和響應的解析器,并提供 ParseRequest() 和 ParseResponse() API 用于解析請求和響應。其中 response 的 message 攜帶了 request 解析出的 attributes 指標,用于匹配。eg. kafka 的 correlationId request 和 response 報文需一致,且 response 報文解析用到了 request 的 key。

2 開發(fā)流程

2.1 添加協(xié)議名
const (
HTTP = "http"
...
XX = "xx" // 此處替換具體協(xié)議名
...
)
2.2 創(chuàng)建協(xié)議
analyzer/network/protocol 目錄下創(chuàng)建文件夾 xx,xx 替換為具體協(xié)議,并創(chuàng)建 3 個文件 xx_parser.go、xx_request.go 和 xx_response.go
analyzer/network/protocol/xx
├── xx_parser.go 協(xié)議解析器
├── xx_request.go 實現(xiàn)請求解析流程
└── xx_response.go 實現(xiàn)響應解析流程
2.2.1 xx_request.go
實現(xiàn) fastfail() 和 parser() 函數(shù)
func fastfailXXRequest() protocol.FastFailFn {
return func(message *protocol.PayloadMessage) bool {
// 根據(jù)報文實現(xiàn)具體的fastFail()函數(shù)
return false
}
}
func parseXXRequest() protocol.ParsePkgFn {
return func(message *protocol.PayloadMessage) (bool, bool) {
// 解析報文內(nèi)容
contentKey := getContentKey(message.Data)
if contentKey == "" {
// 第一個參數(shù)false 表示解析失敗,第二個參數(shù)表示報文解析完成
return false, true
}
// 通過AddStringAttribute() 或 AttIntAttribute() 存儲解析出的屬性
message.AddStringAttribute(constlabels.ContentKey, contentKey)
// 解析成功
return true, true
}
}
2.2.2 xx_response.go
實現(xiàn) fastfail() 和 parser() 函數(shù)
func fastfailXXResponse() protocol.FastFailFn {
return func(message *protocol.PayloadMessage) bool {
// 根據(jù)報文實現(xiàn)具體的fastFail()函數(shù)
return false
}
}
func parseXXResponse() protocol.ParsePkgFn {
return func(message *protocol.PayloadMessage) (bool, bool) {
// 通過GetStringAttribute() 或 GetIntAttribute() 讀取request解析后的參數(shù)
contentKey := message.GetStringAttribute(constlabels.ContentKey)
// 解析響應報文
errorCode := getErrorCode(message)
if errorCode > 20 {
// 有errorCode或errorMsg等常見,需定義IsError為true用于后續(xù)processor生成Metric
message.AddBoolAttribute(constlabels.IsError, true)
message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
}
message.AddStringAttribute(constlabels.XXErrorCode, errorCode)
// 解析成功
return true, true
}
}
2.2.3 xx_parser.go
定義協(xié)議解析器
func NewXXParser() *protocol.ProtocolParser {
requestParser := protocol.CreatePkgParser(fastfailXXRequest(), parseXXRequest())
// 當存在嵌套的協(xié)議解析 eg. 解析頭 + 解析各類不同報文
// 可通過 Add()添加子協(xié)議,生成一顆協(xié)議樹解析,頂部是公共部分解析,分叉是各個不同報文解析
// Header
// / | \
// API1 API2 API3
// /|\
// v1 v2 v3
responseParser := protocol.CreatePkgParser(fastfailXXResponse(), parseXXResponse())
return protocol.NewProtocolParser(protocol.XX, requestParser, responseParser, nil)
}
2.2.4 factory.go
注冊協(xié)議解析器
var (
...
xx_parser *protocol.ProtocolParser = xx.NewXXParser()
)
func GetParser(key string) *protocol.ProtocolParser {
switch key {
...
case protocol.XX:
return xx_parser
...
default:
return nil
}
}
2.2.5 kindling-collector-config.yml
配置新增協(xié)議
analyzers:
networkanalyzer:
protocol_parser: [ http, mysql, dns, redis, kafka, xx ]
3 開發(fā)案例 - Dubbo2 協(xié)議
3.1 dubbo2 協(xié)議分析

根據(jù)官網(wǎng)提供的協(xié)議規(guī)范,解析網(wǎng)絡抓包的數(shù)據(jù)。
前 2 個 byte 為魔數(shù),可用于 fastfail() 方法 第 3 個 byte 包含 Req/Resp、序列化方式等信息,可用于解析協(xié)議中判斷是否合法報文。 第 4 個 byte 用于返回報文的錯誤碼 第 16 個 byte 開始需通過指定的序列化方式解析報文內(nèi)容,service name + method name 可用于 contentKey 標識該請求的 URL

3.2 聲明協(xié)議名
const (
...
DUBBO2 = "dubbo2"
...
)
3.3 實現(xiàn) dubbo2 解析
創(chuàng)建協(xié)議相關(guān)文件
kindling/collector/analyzer/network/protocol/dubbo2
├── dubbo2_parser.go Dubbo2解析器
├── dubbo2_request.go 實現(xiàn)請求解析流程
├── dubbo2_response.go 實現(xiàn)響應解析流程
└── dubbo2_serialize.go Dubbo2反序列化器
3.3.1 dubbo2_request.go
聲明 request 請求的 fastFail 函數(shù)
dubbo2 有魔數(shù) 0xdabb 可快速識別
func fastfailDubbo2Request() protocol.FastFailFn {
return func(message *protocol.PayloadMessage) bool {
return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow
}
}
聲明 request 請求的解析函數(shù)
將解析出 服務 / 方法作為 類似于 URL 的 Key 存儲報文內(nèi)容
func parseDubbo2Request() protocol.ParsePkgFn {
return func(message *protocol.PayloadMessage) (bool, bool) {
contentKey := getContentKey(message.Data)
if contentKey == "" {
return false, true
}
message.AddStringAttribute(constlabels.ContentKey, contentKey)
message.AddStringAttribute(constlabels.Dubbo2RequestPayload, getAsciiString(message.GetData(16, protocol.GetDubbo2PayLoadLength())))
return true, true
}
}
解析 Dubbo2 請求
過濾非法請求 考慮到 dubbo2 存在單向和心跳請求,這些請求不做解析 根據(jù)報文結(jié)構(gòu)解析相應指標
func getContentKey(requestData []byte) string {
serialID := requestData[2] & SerialMask
if serialID == Zero {
return ""
}
if (requestData[2] & FlagEvent) != Zero {
return "Heartbeat"
}
if (requestData[2] & FlagRequest) == Zero {
// Invalid Data
return ""
}
if (requestData[2] & FlagTwoWay) == Zero {
// Ignore Oneway Data
return "Oneway"
}
serializer := GetSerializer(serialID)
if serializer == serialUnsupport {
// Unsupport Serial. only support hessian and fastjson.
return "UnSupportSerialFormat"
}
var (
service string
method string
)
// version
offset := serializer.eatString(requestData, 16)
// service name
offset, service = serializer.getStringValue(requestData, offset)
// service version
offset = serializer.eatString(requestData, offset)
// method name
_, method = serializer.getStringValue(requestData, offset)
return service + "#" + method
}
3.3.2 dubbo2_serialize.go
由于 dubbo2 內(nèi)置了多套序列化方式,先定義接口 dubbo2Serializer
type dubbo2Serializer interface {
eatString(data []byte, offset int) int
getStringValue(data []byte, offset int) (int, string)
}
dubbo2 默認的序列化方式是 hessian2,此處實現(xiàn) hessian2 方式
type dubbo2Hessian struct{}
func (dh *dubbo2Hessian) eatString(data []byte, offset int) int {
dataLength := len(data)
if offset >= dataLength {
return dataLength
}
tag := data[offset]
if tag >= 0x30 && tag <= 0x33 {
if offset+1 == dataLength {
return dataLength
}
// [x30-x34] <utf8-data>
return offset + 2 + int(tag-0x30)<<8 + int(data[offset+1])
} else {
return offset + 1 + int(tag)
}
}
func (dh *dubbo2Hessian) getStringValue(data []byte, offset int) (int, string) {
dataLength := len(data)
if offset >= dataLength {
return dataLength, ""
}
var stringValueLength int
tag := data[offset]
if tag >= 0x30 && tag <= 0x33 {
if offset+1 == dataLength {
return dataLength, ""
}
// [x30-x34] <utf8-data>
stringValueLength = int(tag-0x30)<<8 + int(data[offset+1])
offset += 2
} else {
stringValueLength = int(tag)
offset += 1
}
if offset+stringValueLength >= len(data) {
return dataLength, string(data[offset:])
}
return offset + stringValueLength, string(data[offset : offset+stringValueLength])
}
對外暴露公共方法,用于獲取序列化方式
var (
serialHessian2 = &dubbo2Hessian{}
serialUnsupport = &dubbo2Unsupport{}
)
func GetSerializer(serialID byte) dubbo2Serializer {
switch serialID {
case SerialHessian2:
return serialHessian2
default:
return serialUnsupport
}
}
3.3.3 dubbo2_response.go
聲明 response 響應的 fastFail 函數(shù)
與 request 類似,采用魔數(shù) 0xdabb 可快速識別
func fastfailDubbo2Response() protocol.FastFailFn {
return func(message *protocol.PayloadMessage) bool {
return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow
}
}
聲明 response 響應的解析函數(shù)
根據(jù) status 解析出對應的 errorCode 存儲報文內(nèi)容
func parseDubbo2Response() protocol.ParsePkgFn {
return func(message *protocol.PayloadMessage) (bool, bool) {
errorCode := getErrorCode(message.Data)
if errorCode == -1 {
return false, true
}
message.AddIntAttribute(constlabels.Dubbo2ErrorCode, errorCode)
if errorCode > 20 {
// 有errorCode或errorMsg等常見,需定義IsError為true用于后續(xù)processor生成Metric
message.AddBoolAttribute(constlabels.IsError, true)
message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
}
message.AddStringAttribute(constlabels.Dubbo2ResponsePayload, getAsciiString(message.GetData(16, protocol.GetDubbo2PayLoadLength())))
return true, true
}
}
解析 Dubbo2 響應
過濾非法響應 根據(jù)報文結(jié)構(gòu)解析相應指標
func getErrorCode(responseData []byte) int64 {
SerialID := responseData[2] & SerialMask
if SerialID == Zero {
return -1
}
if (responseData[2] & FlagEvent) != Zero {
return 20
}
if (responseData[2] & FlagRequest) != Zero {
// Invalid Data
return -1
}
return int64(responseData[3])
}
3.3.4 dubbo2_parser.go
聲明 dubbo2 解析器
通過 CreatePkgParser() 分別定義 Reques / Response 解析器 通過 NewProtocolParser() 將 Request / Response 解析器生成 Dubbo2 解析器
func NewDubbo2Parser() *protocol.ProtocolParser {
requestParser := protocol.CreatePkgParser(fastfailDubbo2Request(), parseDubbo2Request())
responseParser := protocol.CreatePkgParser(fastfailDubbo2Response(), parseDubbo2Response())
return protocol.NewProtocolParser(protocol.DUBBO2, requestParser, responseParser, nil)
}
3.4 注冊 dubbo2 解析器
在 factory.go 中注冊 dubbo2 協(xié)議的解析器
var (
...
dubbo2_parser *protocol.ProtocolParser = dubbo2.NewDubbo2Parser()
)
func GetParser(key string) *protocol.ProtocolParser {
switch key {
...
case protocol.DUBBO2:
return dubbo2_parser
...
default:
return nil
}
}
3.5 聲明支持協(xié)議
在 deploy/kindling-collector-config.yml 中聲明 dubbo2 協(xié)議
analyzers:
networkanalyzer:
protocol_parser: [ http, mysql, dns, redis, kafka, dubbo2 ]
protocol_config:
- key: "dubbo2"
payload_length: 200


你可能還喜歡
點擊下方圖片即可閱讀
2022-06-08
2022-06-07
2022-06-06
2022-06-01

云原生是一種信仰 ??
關(guān)注公眾號
后臺回復?k8s?獲取史上最方便快捷的 Kubernetes 高可用部署工具,只需一條命令,連 ssh 都不需要!


點擊 "閱讀原文" 獲取更好的閱讀體驗!
發(fā)現(xiàn)朋友圈變“安靜”了嗎?

