<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          分析 kubernetes 中的事件機(jī)制

          共 8358字,需瀏覽 17分鐘

           ·

          2020-12-04 16:22

          我們通過 kubectl describe [資源]?命令,可以在看到Event輸出,并且經(jīng)常依賴event進(jìn)行問題定位,從event中可以分析整個(gè)POD的運(yùn)行軌跡,為服務(wù)的客觀測(cè)性提供數(shù)據(jù)來源,由此可見,event在Kubernetes中起著舉足輕重的作用。

          event展示

          event并不只是kubelet中都有的,關(guān)于event的操作被封裝在client-go/tools/record包,我們完全可以在寫入自定義的event。

          現(xiàn)在讓我們來一步步揭開event的面紗。

          Event定義

          其實(shí)event也是一個(gè)資源對(duì)象,并且通過apiserver將event存儲(chǔ)在etcd中,所以我們也可以通過 kubectl get event 命令查看對(duì)應(yīng)的event對(duì)象。

          以下是一個(gè)event的yaml文件:

          apiVersion: v1
          count: 1
          eventTime: null
          firstTimestamp: "2020-03-02T13:08:22Z"
          involvedObject:
          apiVersion: v1
          kind: Pod
          name: example-foo-d75d8587c-xsf64
          namespace: default
          resourceVersion: "429837"
          uid: ce611c62-6c1a-4bd8-9029-136a1adf7de4
          kind: Event
          lastTimestamp: "2020-03-02T13:08:22Z"
          message: Pod sandbox changed, it will be killed and re-created.
          metadata:
          creationTimestamp: "2020-03-02T13:08:30Z"
          name: example-foo-d75d8587c-xsf64.15f87ea1df862b64
          namespace: default
          resourceVersion: "479466"
          selfLink: /api/v1/namespaces/default/events/example-foo-d75d8587c-xsf64.15f87ea1df862b64
          uid: 9fe6f72a-341d-4c49-960b-e185982d331a
          reason: SandboxChanged
          reportingComponent: ""
          reportingInstance: ""
          source:
          component: kubelet
          host: minikube
          type: Normal


          主要字段說明:

          • involvedObject:觸發(fā)event的資源類型
          • lastTimestamp:最后一次觸發(fā)的時(shí)間
          • message:事件說明
          • metadata :event的元信息,name,namespace等
          • reason:event的原因
          • source:上報(bào)事件的來源,比如kubelet中的某個(gè)節(jié)點(diǎn)
          • type:事件類型,Normal或Warning

          event字段定義可以看這里:types.go#L5078

          接下來我們來看看,整個(gè)event是如何下入的。

          寫入事件

          1、這里以kubelet為例,看看是如何進(jìn)行事件寫入的

          2、代碼是在Kubernetes 1.17.3基礎(chǔ)上進(jìn)行分析

          先以一幅圖來看下整個(gè)的處理流程

          創(chuàng)建操作事件的客戶端:
          kubelet/app/server.go#L461

          // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
          func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
          if kubeDeps.Recorder != nil {
          return
          }
          //事件廣播
          eventBroadcaster := record.NewBroadcaster()
          //創(chuàng)建EventRecorder
          kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
          //發(fā)送event至log輸出
          eventBroadcaster.StartLogging(klog.V(3).Infof)
          if kubeDeps.EventClient != nil {
          klog.V(4).Infof("Sending events to api server.")
          //發(fā)送event至apiserver
          eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
          } else {
          klog.Warning("No api server defined - no events will be sent to API server.")
          }
          }

          通過 makeEventRecorder?創(chuàng)建了 EventRecorder?實(shí)例,這是一個(gè)事件廣播器,通過它提供了StartLogging和StartRecordingToSink兩個(gè)事件處理函數(shù),分別將event發(fā)送給log和apiserver。
          NewRecorder創(chuàng)建了 EventRecorder?的實(shí)例,它提供了 Event?,Eventf?等方法供事件記錄。


          EventBroadcaster

          我們來看下EventBroadcaster接口定義:event.go#L113

          // EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
          type EventBroadcaster interface {
          //
          StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface
          StartRecordingToSink(sink EventSink) watch.Interface
          StartLogging(logf func(format string, args ...interface{})) watch.Interface
          NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder

          Shutdown()
          }

          具體實(shí)現(xiàn)是通過 eventBroadcasterImpl ?struct來實(shí)現(xiàn)了各個(gè)方法。

          其中StartLogging 和 StartRecordingToSink 其實(shí)就是完成了對(duì)事件的消費(fèi),EventRecorder實(shí)現(xiàn)對(duì)事件的寫入,中間通過channel實(shí)現(xiàn)了生產(chǎn)者消費(fèi)者模型。

          EventRecorder

          我們先來看下EventRecorder?接口定義:event.go#L88,提供了以下4個(gè)方法

          // EventRecorder knows how to record events on behalf of an EventSource.
          type EventRecorder interface {
          // Event constructs an event from the given information and puts it in the queue for sending.
          // 'object' is the object this event is about. Event will make a reference-- or you may also
          // pass a reference to the object directly.
          // 'type' of this event, and can be one of Normal, Warning. New types could be added in future
          // 'reason' is the reason this event is generated. 'reason' should be short and unique; it
          // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
          // to automate handling of events, so imagine people writing switch statements to handle them.
          // You want to make that easy.
          // 'message' is intended to be human readable.
          //
          // The resulting event will be created in the same namespace as the reference object.
          Event(object runtime.Object, eventtype, reason, message string)

          // Eventf is just like Event, but with Sprintf for the message field.
          Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

          // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
          PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

          // AnnotatedEventf is just like eventf, but with annotations attached
          AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
          }

          主要參數(shù)說明:

          • object?對(duì)應(yīng)event資源定義中的 involvedObject
          • eventtype?對(duì)應(yīng)event資源定義中的type,可選Normal,Warning.
          • reason?:事件原因
          • message?:事件消息

          我們來看下當(dāng)我們調(diào)用 Event(object runtime.Object, eventtype, reason, message string)?的整個(gè)過程。
          發(fā)現(xiàn)最終都調(diào)用到了 generateEvent?方法:event.go#L316

          func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {	
          .....
          event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
          event.Source = recorder.source
          go func() {
          // NOTE: events should be a non-blocking operation
          defer utilruntime.HandleCrash()
          recorder.Action(watch.Added, event)
          }()
          }

          最終事件在一個(gè) goroutine?中通過調(diào)用 recorder.Action?進(jìn)入處理,這里保證了每次調(diào)用event方法都是非阻塞的。
          其中 makeEvent?的作用主要是構(gòu)造了一個(gè)event對(duì)象,事件name根據(jù)InvolvedObject中的name加上時(shí)間戳生成:

          注意看:對(duì)于一些非namespace資源產(chǎn)生的event,event的namespace是default

          func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
          t := metav1.Time{Time: recorder.clock.Now()}
          namespace := ref.Namespace
          if namespace == "" {
          namespace = metav1.NamespaceDefault
          }
          return &v1.Event{
          ObjectMeta: metav1.ObjectMeta{
          Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
          Namespace: namespace,
          Annotations: annotations,
          },
          InvolvedObject: *ref,
          Reason: reason,
          Message: message,
          FirstTimestamp: t,
          LastTimestamp: t,
          Count: 1,
          Type: eventtype,
          }
          }

          進(jìn)一步跟蹤Action方法,apimachinery/blob/master/pkg/watch/mux.go#L188:23

          // Action distributes the given event among all watchers.
          func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
          m.incoming <- Event{action, obj}
          }

          將event寫入到了一個(gè)channel里面。
          注意:
          這個(gè)Action方式是apimachinery包中的方法,因?yàn)閷?shí)現(xiàn)的sturt?recorderImpl
          *watch.Broadcaster?作為一個(gè)匿名struct,并且在 NewRecorder?進(jìn)行 Broadcaster?賦值,這個(gè)Broadcaster其實(shí)就是 eventBroadcasterImpl?中的Broadcaster

          到此,基本清楚了event最終被寫入到了 Broadcaster?中的 incoming?channel中,下面看下是怎么進(jìn)行消費(fèi)的。


          消費(fèi)事件

          makeEventRecorder?調(diào)用的 StartLogging?和 StartRecordingToSink?其實(shí)就是完成了對(duì)事件的消費(fèi)。

          • StartLogging直接將event輸出到日志
          • StartRecordingToSink將事件寫入到apiserver

          兩個(gè)方法內(nèi)部都調(diào)用了 StartEventWatcher?方法,并且傳入一個(gè) eventHandler?方法對(duì)event進(jìn)行處理

          func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
          watcher := e.Watch()
          go func() {
          defer utilruntime.HandleCrash()
          for watchEvent := range watcher.ResultChan() {
          event, ok := watchEvent.Object.(*v1.Event)
          if !ok {
          // This is all local, so there's no reason this should
          // ever happen.
          continue
          }
          eventHandler(event)
          }
          }()
          return watcher
          }

          其中 watcher.ResultChan?方法就拿到了事件,這里是在一個(gè)goroutine中通過func (m *Broadcaster) loop() ==>func (m *Broadcaster) distribute(event Event)?方法調(diào)用將event又寫入了broadcasterWatcher.result

          主要看下 StartRecordingToSink?提供的的eventHandlerrecordToSink?方法:

          func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
          // Make a copy before modification, because there could be multiple listeners.
          // Events are safe to copy like this.
          eventCopy := *event
          event = &eventCopy
          result, err := eventCorrelator.EventCorrelate(event)
          if err != nil {
          utilruntime.HandleError(err)
          }
          if result.Skip {
          return
          }
          tries := 0
          for {
          if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
          break
          }
          tries++
          if tries >= maxTriesPerEvent {
          klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
          break
          }
          // Randomize the first sleep so that various clients won't all be
          // synced up if the master goes down.
          // 第一次重試增加隨機(jī)性,防止 apiserver 重啟的時(shí)候所有的事件都在同一時(shí)間發(fā)送事件
          if tries == 1 {
          time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
          } else {
          time.Sleep(sleepDuration)
          }
          }
          }

          其中event被經(jīng)過了一個(gè) eventCorrelator.EventCorrelate(event)?方法做預(yù)處理,主要是聚合相同的事件(避免產(chǎn)生的事件過多,增加 etcd 和 apiserver 的壓力,也會(huì)導(dǎo)致查看 pod 事件很不清晰)

          下面一個(gè)for循環(huán)就是在進(jìn)行重試,最大重試次數(shù)是12次,調(diào)用 recordEvent? 方法才真正將event寫入到了apiserver。


          事件處理

          我們來看下EventCorrelate方法:

          // EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
          func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) {
          if newEvent == nil {
          return nil, fmt.Errorf("event is nil")
          }
          aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent)
          observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey)
          if c.filterFunc(observedEvent) {
          return &EventCorrelateResult{Skip: true}, nil
          }
          return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
          }

          分別調(diào)用了 aggregator.EventAggregate?,logger.eventObserve?, filterFunc?三個(gè)方法,分別作用是:

          1. aggregator.EventAggregate:聚合event,如果在最近 10 分鐘出現(xiàn)過 10 個(gè)相似的事件(除了 message 和時(shí)間戳之外其他關(guān)鍵字段都相同的事件),aggregator 會(huì)把它們的 message 設(shè)置為?(combined from similar events)+event.Message
          2. logger.eventObserve:它會(huì)把相同的事件以及包含 aggregator?被聚合了的相似的事件,通過增加 Count?字段來記錄事件發(fā)生了多少次。
          3. filterFunc: 這里實(shí)現(xiàn)了一個(gè)基于令牌桶的限流算法,如果超過設(shè)定的速率則丟棄,保證了apiserver的安全。

          我們主要來看下aggregator.EventAggregate方法:

          func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) {
          now := metav1.NewTime(e.clock.Now())
          var record aggregateRecord
          // eventKey is the full cache key for this event
          //eventKey 是將除了時(shí)間戳外所有字段結(jié)合在一起
          eventKey := getEventKey(newEvent)
          // aggregateKey is for the aggregate event, if one is needed.
          //aggregateKey 是除了message和時(shí)間戳外的字段結(jié)合在一起,localKey 是message
          aggregateKey, localKey := e.keyFunc(newEvent)

          // Do we have a record of similar events in our cache?
          e.Lock()
          defer e.Unlock()
          //從cache中根據(jù)aggregateKey查詢是否存在,如果是相同或者相類似的事件會(huì)被放入cache中
          value, found := e.cache.Get(aggregateKey)
          if found {
          record = value.(aggregateRecord)
          }

          //判斷上次事件產(chǎn)生的時(shí)間是否超過10分鐘,如何操作則重新生成一個(gè)localKeys集合(集合中存放message)
          maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
          interval := now.Time.Sub(record.lastTimestamp.Time)
          if interval > maxInterval {
          record = aggregateRecord{localKeys: sets.NewString()}
          }

          // Write the new event into the aggregation record and put it on the cache
          //將locakKey也就是message放入集合中,如果message相同就是覆蓋了
          record.localKeys.Insert(localKey)
          record.lastTimestamp = now
          e.cache.Add(aggregateKey, record)

          // If we are not yet over the threshold for unique events, don't correlate them
          //判斷l(xiāng)ocalKeys集合中存放的類似事件是否超過10個(gè),
          if uint(record.localKeys.Len()) < e.maxEvents {
          return newEvent, eventKey
          }

          // do not grow our local key set any larger than max
          record.localKeys.PopAny()

          // create a new aggregate event, and return the aggregateKey as the cache key
          // (so that it can be overwritten.)
          eventCopy := &v1.Event{
          ObjectMeta: metav1.ObjectMeta{
          Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
          Namespace: newEvent.Namespace,
          },
          Count: 1,
          FirstTimestamp: now,
          InvolvedObject: newEvent.InvolvedObject,
          LastTimestamp: now,
          //這里會(huì)對(duì)message加個(gè)前綴:(combined from similar events):
          Message: e.messageFunc(newEvent),
          Type: newEvent.Type,
          Reason: newEvent.Reason,
          Source: newEvent.Source,
          }
          return eventCopy, aggregateKey
          }

          aggregator.EventAggregate方法中其實(shí)就是判斷了通過cache和localKeys判斷事件是否相似,如果最近 10 分鐘出現(xiàn)過 10 個(gè)相似的事件就合并并加上前綴,后續(xù)通過logger.eventObserve方法進(jìn)行count累加,如果message也相同,肯定就是直接count++。


          總結(jié)

          好了,event處理的整個(gè)流程基本就是這樣,我們可以概括一下,可以結(jié)合文中的圖對(duì)比一起看下:

          1. 創(chuàng)建 EventRecorder?對(duì)象,通過其提供的 Event?等方法,創(chuàng)建好event對(duì)象
          2. 將創(chuàng)建出來的對(duì)象發(fā)送給 EventBroadcaster?中的channel中
          3. EventBroadcaster?通過后臺(tái)運(yùn)行的goroutine,從管道中取出事件,并廣播給提前注冊(cè)好的handler處理
          4. 當(dāng)輸出log的handler收到事件就直接打印事件
          5. 當(dāng) EventSink?handler收到處理事件就通過預(yù)處理之后將事件發(fā)送給apiserver
          6. 其中預(yù)處理包含三個(gè)動(dòng)作,1、限流 2、聚合 3、計(jì)數(shù)
          7. apiserver收到事件處理之后就存儲(chǔ)在etcd中

          回顧event的整個(gè)流程,可以看到event并不是保證100%事件寫入(從預(yù)處理的過程來看),這樣做是為了后端服務(wù)etcd的可用性,因?yàn)閑vent事件在整個(gè)集群中產(chǎn)生是非常頻繁的,尤其在服務(wù)不穩(wěn)定的時(shí)候,而相比Deployment,Pod等其他資源,又沒那么的重要。所以這里做了個(gè)取舍。

          參考文檔:

          • https://cizixs.com/2017/06/22/kubelet-source-code-analysis-part4-event/
          • https://github.com/kubernetes/kubernetes/blob/v1.17.3/staging/src/k8s.io/client-go/tools/record


          ?點(diǎn)擊屏末?|??|?即刻學(xué)習(xí)

          瀏覽 44
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  多人日穴无码 | 变态另类天堂 | 成人国产精品蜜臀 | 婷婷色视频 | 四季AV之日韩人妻无码 |