<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Kindling 之 Dubbo2 協(xié)議開發(fā)流程

          共 17181字,需瀏覽 35分鐘

           ·

          2022-06-13 13:58

          1 項目概覽

          Kindling collector 項目作為 Go 端分析器,使用類似 opentelmetry 的 pinpeline 進行數(shù)據(jù)分析。其中涉及 5 個組件:

          • UdsReceiver - Unix Domain Socket 接收器,接收探針事件并傳遞給后續(xù)的網(wǎng)絡分析器。
          • NetworkAnalyzer - 網(wǎng)絡事件分析器,將接收的 Read / Write 事件匹配成單次請求后,根據(jù)協(xié)議解析請求內(nèi)容生成關(guān)鍵指標,傳遞給后續(xù)的分析器。
          • K8sMetadataProcessor - K8S 指標處理器,補充 K8S 指標并傳遞給后續(xù)的聚合處理器
          • AggregateProcessor - 數(shù)據(jù)聚合處理器,將接收的指標數(shù)據(jù)生成 Trace 和 Metric,傳遞給給后續(xù)的轉(zhuǎn)發(fā)器。
          • OtelExporter - Opentelmetry 轉(zhuǎn)發(fā)器,將 Trace / Metric 數(shù)據(jù)轉(zhuǎn)發(fā)給 Prometheus 進行展示。

          其中協(xié)議解析流程主要在 NetworkAnalyzer 組件中進行,將接收的請求 / 響應事件成對匹配后,交由 parseProtocols() 函數(shù)解析出協(xié)議指標。

          1.1 協(xié)議解析流程

          NetworkAnnalyzer.parseProtocols() 方法定義了整體解析流程,根據(jù)協(xié)議解析器分別解析請求和響應,當最終都成功時輸出指標。

          1.2 協(xié)議解析設計思路

          正常的協(xié)議解析只負責逐幀解析指標功能。

          現(xiàn)已支持 5 種協(xié)議解析,當協(xié)議越來越多時,遍歷引起的解析會越來越耗時,那么需引入 fastfail 快速識別協(xié)議

          對于復雜的多報文協(xié)議,如 Kafka 有不同的 API 報文,而相同 API 也有不同的版本報文。將所有報文解析邏輯都寫在一起會使整個類過于臃腫且不易維護。為此引入樹形多報文結(jié)構(gòu)用于快速且低耦合地實現(xiàn)開發(fā)。

          1.2.1 報文解析結(jié)構(gòu)體

          在樹形報文解析過程中,有如下 2 個場景需要考慮

          • 當父協(xié)議解析了指標 A,子協(xié)議解析可能會用到 A 指標,那么父子協(xié)議解析的指標需要透傳。
          • 父協(xié)議已解析了部分報文長度的內(nèi)容,那么子協(xié)議在開始解析時可直接跳過相應長度的內(nèi)容進行解析,此處引入偏移量用于下一個協(xié)議跳過解析。

          定義 PayloadMessage,封裝報文內(nèi)容、讀取偏移量和指標存儲的 Map。

          type PayloadMessage struct {
              Data         []byte
              Offset       int
              attributeMap *model.AttributeMap
          }

          1.2.2 報文解析 API

          由于引入?yún)f(xié)議樹,協(xié)議解析過程 parse() (ok bool) 將不再適用。協(xié)議樹中的個協(xié)議的解析成功不表示整個協(xié)議解析成功,需解析整顆樹的協(xié)議是否成功,將 API 擴展為 parse() (ok bool, complete bool)。

          • 對于單層協(xié)議 (HTTP),返回為 (ok, true)

          基于以上幾點需求,設計樹形結(jié)構(gòu)的報文解析器 PkgParser。PkgParser 定義了 fastFail(快速識別失敗) 和 parser(解析單個報文) 函數(shù);每個協(xié)議只需注冊自身的 PkgParser 即可接入整套流程。

          fastFail(message *PayloadMessage) (fail bool)

            - 聲明協(xié)議是否識別失敗,用于快速識別協(xié)議

          parser(message *PayloadMessage) (ok bool, complete bool)

            - 解析協(xié)議,將解析出的指標存儲到 message 的 Attributes 中
          - 返回是 2 個參數(shù):
          - 是否解析成功
          - 是否解析完成 (默認為 true,當為 false 主要是用于嵌套解析過程,例如先定義一個頭解析,再定義不同的消息體解析)。

          1.3 請求 / 響應解析

          ProtocolParser 定義了請求和響應的解析器,并提供 ParseRequest() 和 ParseResponse() API 用于解析請求和響應。其中 response 的 message 攜帶了 request 解析出的 attributes 指標,用于匹配。eg. kafka 的 correlationId request 和 response 報文需一致,且 response 報文解析用到了 request 的 key。

          2 開發(fā)流程

          2.1 添加協(xié)議名

          const (
           HTTP      = "http"
            ...
           XX        = "xx" // 此處替換具體協(xié)議名
           ...
          )

          2.2 創(chuàng)建協(xié)議

          analyzer/network/protocol 目錄下創(chuàng)建文件夾 xx,xx 替換為具體協(xié)議,并創(chuàng)建 3 個文件 xx_parser.go、xx_request.go 和 xx_response.go

          analyzer/network/protocol/xx
          ├── xx_parser.go           協(xié)議解析器
          ├── xx_request.go          實現(xiàn)請求解析流程
          └── xx_response.go          實現(xiàn)響應解析流程

          2.2.1 xx_request.go

          實現(xiàn) fastfail() 和 parser() 函數(shù)

          func fastfailXXRequest() protocol.FastFailFn {
           return func(message *protocol.PayloadMessage) bool {
                  // 根據(jù)報文實現(xiàn)具體的fastFail()函數(shù)
            return false
           }
          }

          func parseXXRequest() protocol.ParsePkgFn {
           return func(message *protocol.PayloadMessage) (boolbool) {
                  // 解析報文內(nèi)容
            contentKey := getContentKey(message.Data)
            if contentKey == "" {
                      // 第一個參數(shù)false 表示解析失敗,第二個參數(shù)表示報文解析完成
             return falsetrue
            }

                  // 通過AddStringAttribute() 或 AttIntAttribute() 存儲解析出的屬性
            message.AddStringAttribute(constlabels.ContentKey, contentKey)
                  // 解析成功
            return truetrue
           }
          }

          2.2.2 xx_response.go

          實現(xiàn) fastfail() 和 parser() 函數(shù)

          func fastfailXXResponse() protocol.FastFailFn {
           return func(message *protocol.PayloadMessage) bool {
                  // 根據(jù)報文實現(xiàn)具體的fastFail()函數(shù)
            return false
           }
          }

          func parseXXResponse() protocol.ParsePkgFn {
           return func(message *protocol.PayloadMessage) (boolbool) {
                  // 通過GetStringAttribute() 或 GetIntAttribute() 讀取request解析后的參數(shù)
            contentKey := message.GetStringAttribute(constlabels.ContentKey)
            
                  // 解析響應報文
                  errorCode := getErrorCode(message)
                  if errorCode > 20 {
                      // 有errorCode或errorMsg等常見,需定義IsError為true用于后續(xù)processor生成Metric
             message.AddBoolAttribute(constlabels.IsError, true)
             message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
            }
            message.AddStringAttribute(constlabels.XXErrorCode, errorCode)
                  
                  // 解析成功
            return truetrue
           }
          }

          2.2.3 xx_parser.go

          定義協(xié)議解析器

          func NewXXParser() *protocol.ProtocolParser {
           requestParser := protocol.CreatePkgParser(fastfailXXRequest(), parseXXRequest())
              // 當存在嵌套的協(xié)議解析 eg. 解析頭 + 解析各類不同報文
              // 可通過 Add()添加子協(xié)議,生成一顆協(xié)議樹解析,頂部是公共部分解析,分叉是各個不同報文解析
              //             Header
              //             / | \
              //         API1 API2 API3
              //         /|\
              //       v1 v2 v3
           responseParser := protocol.CreatePkgParser(fastfailXXResponse(), parseXXResponse())
           return protocol.NewProtocolParser(protocol.XX, requestParser, responseParser, nil)
          }

          2.2.4 factory.go

          注冊協(xié)議解析器

          var (
              ...
              xx_parser   *protocol.ProtocolParser = xx.NewXXParser()
          )

          func GetParser(key string) *protocol.ProtocolParser {
              switch key {
                  ...
                  case protocol.XX:
                      return xx_parser
                  ...
                  default:
                      return nil
              }
          }

          2.2.5 kindling-collector-config.yml

          配置新增協(xié)議

          analyzers:
            networkanalyzer:
              protocol_parser: [ http, mysql, dns, redis, kafka, xx ]

          3 開發(fā)案例 - Dubbo2 協(xié)議

          3.1 dubbo2 協(xié)議分析

          根據(jù)官網(wǎng)提供的協(xié)議規(guī)范,解析網(wǎng)絡抓包的數(shù)據(jù)。

          • 前 2 個 byte 為魔數(shù),可用于 fastfail() 方法
          • 第 3 個 byte 包含 Req/Resp、序列化方式等信息,可用于解析協(xié)議中判斷是否合法報文。
          • 第 4 個 byte 用于返回報文的錯誤碼
          • 第 16 個 byte 開始需通過指定的序列化方式解析報文內(nèi)容,service name + method name 可用于 contentKey 標識該請求的 URL

          3.2 聲明協(xié)議名

          const (
           ...
           DUBBO2    = "dubbo2"
           ...
          )

          3.3 實現(xiàn) dubbo2 解析

          創(chuàng)建協(xié)議相關(guān)文件

          kindling/collector/analyzer/network/protocol/dubbo2
          ├── dubbo2_parser.go            Dubbo2解析器
          ├── dubbo2_request.go           實現(xiàn)請求解析流程
          ├── dubbo2_response.go          實現(xiàn)響應解析流程
          └── dubbo2_serialize.go         Dubbo2反序列化器

          3.3.1 dubbo2_request.go

          聲明 request 請求的 fastFail 函數(shù)

          • dubbo2 有魔數(shù) 0xdabb 可快速識別
          func fastfailDubbo2Request() protocol.FastFailFn {
           return func(message *protocol.PayloadMessage) bool {
            return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow
           }
          }

          聲明 request 請求的解析函數(shù)

          • 將解析出 服務 / 方法作為 類似于 URL 的 Key
          • 存儲報文內(nèi)容
          func parseDubbo2Request() protocol.ParsePkgFn {
           return func(message *protocol.PayloadMessage) (boolbool) {
            contentKey := getContentKey(message.Data)
            if contentKey == "" {
             return falsetrue
            }

            message.AddStringAttribute(constlabels.ContentKey, contentKey)
            message.AddStringAttribute(constlabels.Dubbo2RequestPayload, getAsciiString(message.GetData(16, protocol.GetDubbo2PayLoadLength())))
            return truetrue
           }
          }

          解析 Dubbo2 請求

          • 過濾非法請求
          • 考慮到 dubbo2 存在單向和心跳請求,這些請求不做解析
          • 根據(jù)報文結(jié)構(gòu)解析相應指標
          func getContentKey(requestData []byte) string {
           serialID := requestData[2] & SerialMask
           if serialID == Zero {
            return ""
           }
           if (requestData[2] & FlagEvent) != Zero {
            return "Heartbeat"
           }
           if (requestData[2] & FlagRequest) == Zero {
            // Invalid Data
            return ""
           }
           if (requestData[2] & FlagTwoWay) == Zero {
            // Ignore Oneway Data
            return "Oneway"
           }

           serializer := GetSerializer(serialID)
           if serializer == serialUnsupport {
            // Unsupport Serial. only support hessian and fastjson.
            return "UnSupportSerialFormat"
           }

           var (
            service string
            method  string
           )
           // version
           offset := serializer.eatString(requestData, 16)

           // service name
           offset, service = serializer.getStringValue(requestData, offset)

           // service version
           offset = serializer.eatString(requestData, offset)

           // method name
           _, method = serializer.getStringValue(requestData, offset)

           return service + "#" + method
          }

          3.3.2 dubbo2_serialize.go

          由于 dubbo2 內(nèi)置了多套序列化方式,先定義接口 dubbo2Serializer

          type dubbo2Serializer interface {
           eatString(data []byte, offset intint

           getStringValue(data []byte, offset int) (intstring)
          }

          dubbo2 默認的序列化方式是 hessian2,此處實現(xiàn) hessian2 方式

          type dubbo2Hessian struct{}

          func (dh *dubbo2Hessian) eatString(data []byte, offset int) int {
           dataLength := len(data)
           if offset >= dataLength {
            return dataLength
           }

           tag := data[offset]
           if tag >= 0x30 && tag <= 0x33 {
            if offset+1 == dataLength {
             return dataLength
            }
            // [x30-x34] <utf8-data>
            return offset + 2 + int(tag-0x30)<<8 + int(data[offset+1])
           } else {
            return offset + 1 + int(tag)
           }
          }

          func (dh *dubbo2Hessian) getStringValue(data []byte, offset int) (intstring) {
           dataLength := len(data)
           if offset >= dataLength {
            return dataLength, ""
           }

           var stringValueLength int
           tag := data[offset]
           if tag >= 0x30 && tag <= 0x33 {
            if offset+1 == dataLength {
             return dataLength, ""
            }
            // [x30-x34] <utf8-data>
            stringValueLength = int(tag-0x30)<<8 + int(data[offset+1])
            offset += 2
           } else {
            stringValueLength = int(tag)
            offset += 1
           }

           if offset+stringValueLength >= len(data) {
            return dataLength, string(data[offset:])
           }
           return offset + stringValueLength, string(data[offset : offset+stringValueLength])
          }

          對外暴露公共方法,用于獲取序列化方式

          var (
           serialHessian2  = &dubbo2Hessian{}
           serialUnsupport = &dubbo2Unsupport{}
          )

          func GetSerializer(serialID byte) dubbo2Serializer {
           switch serialID {
           case SerialHessian2:
            return serialHessian2
           default:
            return serialUnsupport
           }
          }

          3.3.3 dubbo2_response.go

          聲明 response 響應的 fastFail 函數(shù)

          • 與 request 類似,采用魔數(shù) 0xdabb 可快速識別
          func fastfailDubbo2Response() protocol.FastFailFn {
           return func(message *protocol.PayloadMessage) bool {
            return len(message.Data) < 16 || message.Data[0] != MagicHigh || message.Data[1] != MagicLow
           }
          }

          聲明 response 響應的解析函數(shù)

          • 根據(jù) status 解析出對應的 errorCode
          • 存儲報文內(nèi)容
          func parseDubbo2Response() protocol.ParsePkgFn {
           return func(message *protocol.PayloadMessage) (boolbool) {
            errorCode := getErrorCode(message.Data)
            if errorCode == -1 {
             return falsetrue
            }

            message.AddIntAttribute(constlabels.Dubbo2ErrorCode, errorCode)
            if errorCode > 20 {
                      // 有errorCode或errorMsg等常見,需定義IsError為true用于后續(xù)processor生成Metric
             message.AddBoolAttribute(constlabels.IsError, true)
             message.AddIntAttribute(constlabels.ErrorType, int64(constlabels.ProtocolError))
            }
            message.AddStringAttribute(constlabels.Dubbo2ResponsePayload, getAsciiString(message.GetData(16, protocol.GetDubbo2PayLoadLength())))
            return truetrue
           }
          }

          解析 Dubbo2 響應

          • 過濾非法響應
          • 根據(jù)報文結(jié)構(gòu)解析相應指標
          func getErrorCode(responseData []byte) int64 {
           SerialID := responseData[2] & SerialMask
           if SerialID == Zero {
            return -1
           }
           if (responseData[2] & FlagEvent) != Zero {
            return 20
           }
           if (responseData[2] & FlagRequest) != Zero {
            // Invalid Data
            return -1
           }

           return int64(responseData[3])
          }

          3.3.4 dubbo2_parser.go

          聲明 dubbo2 解析器

          • 通過 CreatePkgParser() 分別定義 Reques / Response 解析器
          • 通過 NewProtocolParser() 將 Request / Response 解析器生成 Dubbo2 解析器
          func NewDubbo2Parser() *protocol.ProtocolParser {
           requestParser := protocol.CreatePkgParser(fastfailDubbo2Request(), parseDubbo2Request())
           responseParser := protocol.CreatePkgParser(fastfailDubbo2Response(), parseDubbo2Response())
           return protocol.NewProtocolParser(protocol.DUBBO2, requestParser, responseParser, nil)
          }

          3.4 注冊 dubbo2 解析器

          在 factory.go 中注冊 dubbo2 協(xié)議的解析器

          var (
              ...
              dubbo2_parser   *protocol.ProtocolParser = dubbo2.NewDubbo2Parser()
          )

          func GetParser(key string) *protocol.ProtocolParser {
              switch key {
                  ...
                  case protocol.DUBBO2:
                      return dubbo2_parser
                  ...
                  default:
                      return nil
              }
          }

          3.5 聲明支持協(xié)議

          在 deploy/kindling-collector-config.yml 中聲明 dubbo2 協(xié)議

          analyzers:
            networkanalyzer:
              protocol_parser: [ http, mysql, dns, redis, kafka, dubbo2 ]
              protocol_config:
                - key: "dubbo2"
                  payload_length: 200



          你可能還喜歡

          點擊下方圖片即可閱讀

          理解 Kubernetes 中的 NUMA 架構(gòu)

          2022-06-08

          讓 M1 芯片的 MacBook Pro 同時支持兩個 4k 顯示器

          2022-06-07

          K8s 的核心是 API 而非容器:從理論到 CRD 實踐

          2022-06-06

          使用自動化工作流聚合信息,實現(xiàn)個人數(shù)字生活的信息聚合

          2022-06-01


          云原生是一種信仰 ??

          關(guān)注公眾號

          后臺回復?k8s?獲取史上最方便快捷的 Kubernetes 高可用部署工具,只需一條命令,連 ssh 都不需要!



          點擊 "閱讀原文" 獲取更好的閱讀體驗!


          發(fā)現(xiàn)朋友圈變“安靜”了嗎?

          瀏覽 45
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大荫蒂视频另类XX | 免费观看日本一级A片 | 人妻互换一区二区三区 | 日本中文不卡视频 | 成人av导航观看 成人不卡在线观看 |