<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Go實(shí)現(xiàn)海量日志收集系統(tǒng)

          共 11495字,需瀏覽 23分鐘

           ·

          2021-07-08 10:35

          再次整理了一下這個(gè)日志收集系統(tǒng)的框,如下圖

          這次要實(shí)現(xiàn)的代碼的整體邏輯為:


          完整代碼地址為: https://github.com/pythonsite/logagent

          etcd介紹

          高可用的分布式key-value存儲(chǔ),可以用于配置共享和服務(wù)發(fā)現(xiàn)

          類似的項(xiàng)目:zookeeper和consul

          開(kāi)發(fā)語(yǔ)言:go

          接口:提供restful的接口,使用簡(jiǎn)單

          實(shí)現(xiàn)算法:基于raft算法的強(qiáng)一致性,高可用的服務(wù)存儲(chǔ)目錄

          etcd的應(yīng)用場(chǎng)景:

          • 服務(wù)發(fā)現(xiàn)和服務(wù)注冊(cè)

          • 配置中心(我們實(shí)現(xiàn)的日志收集客戶端需要用到)

          • 分布式鎖

          • master選舉

          官網(wǎng)對(duì)etcd的有一個(gè)非常簡(jiǎn)明的介紹:

          etcd搭建:
          下載地址:https://github.com/coreos/etcd/releases/
          根據(jù)自己的環(huán)境下載對(duì)應(yīng)的版本然后啟動(dòng)起來(lái)就可以了

          啟動(dòng)之后可以通過(guò)如下命令驗(yàn)證一下:

          [root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan 

          zhaofan
          [root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name
          zhaofan
          [root@localhost etcd-v3.2.18-linux-amd64]#

          context 介紹和使用

          其實(shí)這個(gè)東西翻譯過(guò)來(lái)就是上下文管理,那么context的作用是做什么,主要有如下兩個(gè)作用:

          • 控制goroutine的超時(shí)

          • 保存上下文數(shù)據(jù)

          通過(guò)下面一個(gè)簡(jiǎn)單的例子進(jìn)行理解:

          package main

          import (
          "fmt"
          "time"
          "net/http"
          "context"
          "io/ioutil"
          )


          type Result struct{
          r *http.Response
          err error
          }

          func process(){
          ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
          defer cancel()
          tr := &http.Transport{}
          client := &http.Client{Transport:tr}
          c := make(chan Result,1)
          req,err := http.NewRequest("GET","http://www.google.com",nil)
          if err != nil{
          fmt.Println("http request failed,err:",err)
          return
          }
          // 如果請(qǐng)求成功了會(huì)將數(shù)據(jù)存入到管道中
          go func(){
          resp,err := client.Do(req)
          pack := Result{resp,err}
          c <- pack
          }()

          select{
          case <- ctx.Done():
          tr.CancelRequest(req)
          fmt.Println("timeout!")
          case res := <-c:
          defer res.r.Body.Close()
          out,_:= ioutil.ReadAll(res.r.Body)
          fmt.Printf("server response:%s",out)
          }
          return

          }

          func main() {
          process()
          }

          寫一個(gè)通過(guò)context保存上下文,代碼例子如:

          package main

          import (
          "github.com/Go-zh/net/context"
          "fmt"
          )

          func add(ctx context.Context,a,b int) int {
          traceId := ctx.Value("trace_id").(string)
          fmt.Printf("trace_id:%v\n",traceId)
          return a+b
          }

          func calc(ctx context.Context,a, b int) int{
          traceId := ctx.Value("trace_id").(string)
          fmt.Printf("trace_id:%v\n",traceId)
          //再將ctx傳入到add中
          return add(ctx,a,b)
          }

          func main() {
          //將ctx傳遞到calc中
          ctx := context.WithValue(context.Background(),"trace_id","123456")
          calc(ctx,20,30)

          }

          結(jié)合etcd和context使用

          關(guān)于通過(guò)go連接etcd的簡(jiǎn)單例子:(這里有個(gè)小問(wèn)題需要注意就是etcd的啟動(dòng)方式,默認(rèn)啟動(dòng)可能會(huì)連接不上,尤其你是在虛擬你安裝,所以需要通過(guò)如下命令啟動(dòng):
          ./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0:2371 --listen-peer-urls http://0.0.0.0:2381
          )

          package main

          import (
          etcd_client "github.com/coreos/etcd/clientv3"
          "time"
          "fmt"
          )

          func main() {
          cli, err := etcd_client.New(etcd_client.Config{
          Endpoints:[]string{"192.168.0.118:2371"},
          DialTimeout:5*time.Second,
          })
          if err != nil{
          fmt.Println("connect failed,err:",err)
          return
          }

          fmt.Println("connect success")
          defer cli.Close()
          }

          下面一個(gè)例子是通過(guò)連接etcd,存值并取值

          package main

          import (
          "github.com/coreos/etcd/clientv3"
          "time"
          "fmt"
          "context"
          )

          func main() {
          cli,err := clientv3.New(clientv3.Config{
          Endpoints:[]string{"192.168.0.118:2371"},
          DialTimeout:5*time.Second,
          })
          if err != nil{
          fmt.Println("connect failed,err:",err)
          return
          }
          fmt.Println("connect succ")
          defer cli.Close()
          ctx,cancel := context.WithTimeout(context.Background(),time.Second)
          _,err = cli.Put(ctx,"logagent/conf/","sample_value")
          cancel()
          if err != nil{
          fmt.Println("put failed,err",err)
          return
          }
          ctx, cancel = context.WithTimeout(context.Background(),time.Second)
          resp,err := cli.Get(ctx,"logagent/conf/")
          cancel()
          if err != nil{
          fmt.Println("get failed,err:",err)
          return
          }
          for _,ev := range resp.Kvs{
          fmt.Printf("%s:%s\n",ev.Key,ev.Value)
          }
          }

          關(guān)于context官網(wǎng)也有一個(gè)例子非常有用,用于控制開(kāi)啟的goroutine的退出,代碼如下:

          package main

          import (
          "context"
          "fmt"
          )

          func main() {
          // gen generates integers in a separate goroutine and
          // sends them to the returned channel.
          // The callers of gen need to cancel the context once
          // they are done consuming generated integers not to leak
          // the internal goroutine started by gen.
          gen := func(ctx context.Context) <-chan int {
          dst := make(chan int)
          n := 1
          go func() {
          for {
          select {
          case <-ctx.Done():
          return // returning not to leak the goroutine
          case dst <- n:
          n++
          }
          }
          }()
          return dst
          }

          ctx, cancel := context.WithCancel(context.Background())
          defer cancel() // cancel when we are finished consuming integers

          for n := range gen(ctx) {
          fmt.Println(n)
          if n == 5 {
          break
          }
          }
          }

          關(guān)于官網(wǎng)文檔中的WithDeadline演示的代碼例子:

          package main


          import (
          "context"
          "fmt"
          "time"
          )

          func main() {
          d := time.Now().Add(50 * time.Millisecond)
          ctx, cancel := context.WithDeadline(context.Background(), d)

          // Even though ctx will be expired, it is good practice to call its
          // cancelation function in any case. Failure to do so may keep the
          // context and its parent alive longer than necessary.
          defer cancel()

          select {
          case <-time.After(1 * time.Second):
          fmt.Println("overslept")
          case <-ctx.Done():
          fmt.Println(ctx.Err())
          }

          }

          通過(guò)上面的代碼有了一個(gè)基本的使用,那么如果我們通過(guò)etcd來(lái)做配置管理,如果配置更改之后,我們?nèi)绾瓮ㄖ獙?duì)應(yīng)的服務(wù)器配置更改,通過(guò)下面例子演示:

          package main

          import (
          "github.com/coreos/etcd/clientv3"
          "time"
          "fmt"
          "context"
          )

          func main() {
          cli,err := clientv3.New(clientv3.Config{
          Endpoints:[]string{"192.168.0.118:2371"},
          DialTimeout:5*time.Second,
          })
          if err != nil {
          fmt.Println("connect failed,err:",err)
          return
          }
          defer cli.Close()
          // 這里會(huì)阻塞
          rch := cli.Watch(context.Background(),"logagent/conf/")
          for wresp := range rch{
          for _,ev := range wresp.Events{
          fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
          }
          }
          }

          實(shí)現(xiàn)一個(gè)kafka的消費(fèi)者代碼的簡(jiǎn)單例子:

          package main

          import (
          "github.com/Shopify/sarama"
          "strings"
          "fmt"
          "time"
          )

          func main() {
          consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
          if err != nil{
          fmt.Println("failed to start consumer:",err)
          return
          }
          partitionList,err := consumer.Partitions("nginx_log")
          if err != nil {
          fmt.Println("Failed to get the list of partitions:",err)
          return
          }
          fmt.Println(partitionList)
          for partition := range partitionList{
          pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
          if err != nil {
          fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
          return
          }
          defer pc.AsyncClose()
          go func(partitionConsumer sarama.PartitionConsumer){
          for msg := range pc.Messages(){
          fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
          }
          }(pc)
          }
          time.Sleep(time.Hour)
          consumer.Close()

          }

          但是上面的代碼并不是最佳代碼,因?yàn)槲覀冏詈笫峭ㄟ^(guò)time.sleep等待goroutine的執(zhí)行,我們可以更改為通過(guò)sync.WaitGroup方式實(shí)現(xiàn)

          package main

          import (
          "github.com/Shopify/sarama"
          "strings"
          "fmt"
          "sync"
          )

          var (
          wg sync.WaitGroup
          )

          func main() {
          consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
          if err != nil{
          fmt.Println("failed to start consumer:",err)
          return
          }
          partitionList,err := consumer.Partitions("nginx_log")
          if err != nil {
          fmt.Println("Failed to get the list of partitions:",err)
          return
          }
          fmt.Println(partitionList)
          for partition := range partitionList{
          pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
          if err != nil {
          fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
          return
          }
          defer pc.AsyncClose()
          go func(partitionConsumer sarama.PartitionConsumer){
          wg.Add(1)
          for msg := range partitionConsumer.Messages(){
          fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
          }
          wg.Done()
          }(pc)
          }

          //time.Sleep(time.Hour)
          wg.Wait()
          consumer.Close()

          }

          將客戶端需要收集的日志信息放到etcd中

          關(guān)于etcd處理的代碼為:

          package main

          import (
          "github.com/coreos/etcd/clientv3"
          "time"
          "github.com/astaxie/beego/logs"
          "context"
          "fmt"
          )

          var Client *clientv3.Client
          var logConfChan chan string


          // 初始化etcd
          func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){

          var keys []string
          for _,ip := range ipArrays{
          //keyfmt = /logagent/%s/log_config
          keys = append(keys,fmt.Sprintf(keyfmt,ip))
          }

          logConfChan = make(chan string,10)
          logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)

          Client,err = clientv3.New(clientv3.Config{
          Endpoints:addr,
          DialTimeout: timeout,
          })
          if err != nil{
          logs.Error("connect failed,err:%v",err)
          return
          }
          logs.Debug("init etcd success")
          waitGroup.Add(1)
          for _, key := range keys{
          ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
          // 從etcd中獲取要收集日志的信息
          resp,err := Client.Get(ctx,key)
          cancel()
          if err != nil {
          logs.Warn("get key %s failed,err:%v",key,err)
          continue
          }

          for _, ev := range resp.Kvs{
          logs.Debug("%q : %q\n", ev.Key, ev.Value)
          logConfChan <- string(ev.Value)
          }
          }
          go WatchEtcd(keys)
          return
          }

          func WatchEtcd(keys []string){
          // 這里用于檢測(cè)當(dāng)需要收集的日志信息更改時(shí)及時(shí)更新
          var watchChans []clientv3.WatchChan
          for _,key := range keys{
          rch := Client.Watch(context.Background(),key)
          watchChans = append(watchChans,rch)
          }

          for {
          for _,watchC := range watchChans{
          select{
          case wresp := <-watchC:
          for _,ev:= range wresp.Events{
          logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
          logConfChan <- string(ev.Kv.Value)
          }
          default:

          }
          }
          time.Sleep(time.Second)
          }
          waitGroup.Done()
          }

          func GetLogConf()chan string{
          return logConfChan
          }

          同樣的這里增加對(duì)了限速的處理,畢竟日志收集程序不能影響了當(dāng)前業(yè)務(wù)的性能,所以增加了limit.go用于限制速度:

          package main

          import (
          "time"
          "sync/atomic"
          "github.com/astaxie/beego/logs"
          )

          type SecondLimit struct {
          unixSecond int64
          curCount int32
          limit int32
          }

          func NewSecondLimit(limit int32) *SecondLimit {
          secLimit := &SecondLimit{
          unixSecond:time.Now().Unix(),
          curCount:0,
          limit:limit,
          }
          return secLimit
          }

          func (s *SecondLimit) Add(count int) {
          sec := time.Now().Unix()
          if sec == s.unixSecond {
          atomic.AddInt32(&s.curCount,int32(count))
          return
          }
          atomic.StoreInt64(&s.unixSecond,sec)
          atomic.StoreInt32(&s.curCount, int32(count))
          }

          func (s *SecondLimit) Wait()bool {
          for {
          sec := time.Now().Unix()
          if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {
          time.Sleep(time.Microsecond)
          logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)
          continue
          }

          if sec != atomic.LoadInt64(&s.unixSecond) {
          atomic.StoreInt64(&s.unixSecond,sec)
          atomic.StoreInt32(&s.curCount,0)
          }
          logs.Debug("limit is exited")
          return false
          }
          }

          小結(jié)

          這次基本實(shí)現(xiàn)了日志收集的前半段的處理,后面將把日志扔到es中,并最終在頁(yè)面上呈現(xiàn)

          來(lái)源:

          https://www.toutiao.com/a6916833750924018179/

          文章轉(zhuǎn)載:IT大咖說(shuō) 
          (版權(quán)歸原作者所有,侵刪)


          點(diǎn)擊下方“閱讀原文”查看更多

          瀏覽 41
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  无码映画在线 | 美女被大鸡吧操视频网站在线播放 | 成人色色在线 | 99视频在线观看中文字幕 | 18片毛片60分钟免费 |