<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          使用Go語言,25秒讀取16GB文件

          共 16402字,需瀏覽 33分鐘

           ·

          2021-07-14 20:29

          原文鏈接:https://medium.com/swlh/processing-16gb-file-in-seconds-go-lang-3982c235dfa2

          當(dāng)今世界的任何計算機(jī)系統(tǒng)每天都會生成大量的日志或數(shù)據(jù)。隨著系統(tǒng)的發(fā)展,將調(diào)試數(shù)據(jù)存儲到數(shù)據(jù)庫中是不可行的,因為它們是不可變的,并且只能用于分析和解決故障。所以大部分公司傾向于將日志存儲在文件中,而這些文件通常位于本地磁盤中。

          我們將使用Go語言,從一個大小為16GB的.txt或.log文件中提取日志。

          讓我們開始編碼……

          首先,我們打開文件。對于任何文件的IO,我們都將使用標(biāo)準(zhǔn)的Go os.File。

          f, err := os.Open(fileName)
           if err != nil {
             fmt.Println("cannot able to read the file", err)
             return
           }
          // UPDATE: close after checking error
          defer file.Close()  //Do not forget to close the file

          打開文件后,我們有以下兩個選項可以選擇:

          逐行讀取文件,這有助于減少內(nèi)存緊張,但需要更多的時間。一次將整個文件讀入內(nèi)存并處理該文件,這將消耗更多內(nèi)存,但會顯著減少時間。

          由于文件太大,即16 GB,因此無法將整個文件加載到內(nèi)存中。但是第一種選擇對我們來說也是不可行的,因為我們希望在幾秒鐘內(nèi)處理文件。

          但你猜怎么著,還有第三種選擇。瞧……相比于將整個文件加載到內(nèi)存中,在Go語言中,我們還可以使用bufio.NewReader()將文件分塊加載。

          r := bufio.NewReader(f)
          for {
          buf := make([]byte,4*1024//the chunk size
          n, err := r.Read(buf) //loading chunk into buffer
             buf = buf[:n]
          if n == 0 {
             
               if err != nil {
                 fmt.Println(err)
                 break
               }
               if err == io.EOF {
                 break
               }
               return err
            }
          }

          一旦我們將文件分塊,我們就可以分叉一個線程,即Go routine,同時處理多個文件區(qū)塊。上述代碼將修改為:

          //sync pools to reuse the memory and decrease the preassure on Garbage Collector
          linesPool := sync.Pool{New: func() interface{} {
                  lines := make([]byte500*1024)
                  return lines
          }}
          stringPool := sync.Pool{New: func() interface{} {
                    lines := ""
                    return lines
          }}
          slicePool := sync.Pool{New: func() interface{} {
                     lines := make([]string100)
                     return lines
          }}
          r := bufio.NewReader(f)
          var wg sync.WaitGroup //wait group to keep track off all threads
          for {
               
               buf := linesPool.Get().([]byte)
               n, err := r.Read(buf)
               buf = buf[:n]
          if n == 0 {
                  if err != nil {
                      fmt.Println(err)
                      break
                  }
                  if err == io.EOF {
                      break
                  }
                  return err
               }
          nextUntillNewline, err := r.ReadBytes('\n')//read entire line
               
               if err != io.EOF {
                   buf = append(buf, nextUntillNewline...)
               }
               
               wg.Add(1)
               go func() { 
                
                  //process each chunk concurrently
                  //start -> log start time, end -> log end time
                  
                  ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
          wg.Done()
               
               }()
          }
          wg.Wait()
          }

          上面的代碼,引入了兩個優(yōu)化點:

          sync.Pool是一個強(qiáng)大的對象池,可以重用對象來減輕垃圾收集器的壓力。我們將重用各個分片的內(nèi)存,以減少內(nèi)存消耗,大大加快我們的工作。Go Routines幫助我們同時處理緩沖區(qū)塊,這大大提高了處理速度。

          現(xiàn)在讓我們實現(xiàn)ProcessChunk函數(shù),它將處理以下格式的日志行。

          2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

          我們將根據(jù)命令行提供的時間戳提取日志。

          func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
          //another wait group to process every chunk further                             
                var wg2 sync.WaitGroup
          logs := stringPool.Get().(string)
          logs = string(chunk)
          linesPool.Put(chunk) //put back the chunk in pool
          //split the string by "\n", so that we have slice of logs
                logsSlice := strings.Split(logs, "\n")
          stringPool.Put(logs) //put back the string pool
          chunkSize := 100 //process the bunch of 100 logs in thread
          n := len(logsSlice)
          noOfThread := n / chunkSize
          if n%chunkSize != 0 { //check for overflow 
                   noOfThread++
                }
          length := len(logsSlice)
          //traverse the chunk
               for i := 0; i < length; i += chunkSize {
                   
                   wg2.Add(1)
          //process each chunk in saperate chunk
                   go func(s int, e int) {
                      for i:= s; i<e;i++{
                         text := logsSlice[i]
          if len(text) == 0 {
                            continue
                         }
                     
                      logParts := strings.SplitN(text, ","2)
                      logCreationTimeString := logParts[0]
                      logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
          if err != nil {
                           fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
                           return
                      }
          // check if log's timestamp is inbetween our desired period
                    if logCreationTime.After(start) && logCreationTime.Before(end) {
                    
                      fmt.Println(text)
                     }
                  }
                  textSlice = nil
                  wg2.Done()
               
               }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
             //passing the indexes for processing
          }  
             wg2.Wait() //wait for a chunk to finish
             logsSlice = nil
          }

          對上面的代碼進(jìn)行基準(zhǔn)測試。以16 GB的日志文件為例,提取日志所需的時間約為25秒。

          完整的代碼示例如下:

          func main() {

           s := time.Now()
           args := os.Args[1:]
           if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
            fmt.Println("Please give proper command line arguments")
            return
           }
           startTimeArg := args[1]
           finishTimeArg := args[3]
           fileName := args[5]

           file, err := os.Open(fileName)
           
           if err != nil {
            fmt.Println("cannot able to read the file", err)
            return
           }
           
           defer file.Close() //close after checking err
           
           queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
           if err != nil {
            fmt.Println("Could not able to parse the start time", startTimeArg)
            return
           }

           queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
           if err != nil {
            fmt.Println("Could not able to parse the finish time", finishTimeArg)
            return
           }

           filestat, err := file.Stat()
           if err != nil {
            fmt.Println("Could not able to get the file stat")
            return
           }

           fileSize := filestat.Size()
           offset := fileSize - 1
           lastLineSize := 0

           for {
            b := make([]byte1)
            n, err := file.ReadAt(b, offset)
            if err != nil {
             fmt.Println("Error reading file ", err)
             break
            }
            char := string(b[0])
            if char == "\n" {
             break
            }
            offset--
            lastLineSize += n
           }

           lastLine := make([]byte, lastLineSize)
           _, err = file.ReadAt(lastLine, offset+1)

           if err != nil {
            fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
            return
           }

           logSlice := strings.SplitN(string(lastLine), ","2)
           logCreationTimeString := logSlice[0]

           lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
           if err != nil {
            fmt.Println("can not able to parse time : ", err)
           }

           if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
            Process(file, queryStartTime, queryFinishTime)
           }

           fmt.Println("\nTime taken - ", time.Since(s))
          }

          func Process(f *os.File, start time.Time, end time.Time) error {

           linesPool := sync.Pool{New: func() interface{} {
            lines := make([]byte250*1024)
            return lines
           }}

           stringPool := sync.Pool{New: func() interface{} {
            lines := ""
            return lines
           }}

           r := bufio.NewReader(f)

           var wg sync.WaitGroup

           for {
            buf := linesPool.Get().([]byte)

            n, err := r.Read(buf)
            buf = buf[:n]

            if n == 0 {
             if err != nil {
              fmt.Println(err)
              break
             }
             if err == io.EOF {
              break
             }
             return err
            }

            nextUntillNewline, err := r.ReadBytes('\n')

            if err != io.EOF {
             buf = append(buf, nextUntillNewline...)
            }

            wg.Add(1)
            go func() {
             ProcessChunk(buf, &linesPool, &stringPool, start, end)
             wg.Done()
            }()

           }

           wg.Wait()
           return nil
          }

          func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {

           var wg2 sync.WaitGroup

           logs := stringPool.Get().(string)
           logs = string(chunk)

           linesPool.Put(chunk)

           logsSlice := strings.Split(logs, "\n")

           stringPool.Put(logs)

           chunkSize := 300
           n := len(logsSlice)
           noOfThread := n / chunkSize

           if n%chunkSize != 0 {
            noOfThread++
           }

           for i := 0; i < (noOfThread); i++ {

            wg2.Add(1)
            go func(s int, e int) {
             defer wg2.Done() //to avaoid deadlocks
             for i := s; i < e; i++ {
              text := logsSlice[i]
              if len(text) == 0 {
               continue
              }
              logSlice := strings.SplitN(text, ","2)
              logCreationTimeString := logSlice[0]

              logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
              if err != nil {
               fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
               return
              }

              if logCreationTime.After(start) && logCreationTime.Before(end) {
               //fmt.Println(text)
              }
             }
             

            }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
           }

           wg2.Wait()
           logsSlice = nil
          }

          - END -

           推薦閱讀 

          最新Kubernetes實戰(zhàn)指南:從零到架構(gòu)師的進(jìn)階之路 
          用 Python 實現(xiàn)快速 Ping 一個 IP 網(wǎng)段地址!
          企業(yè)級日志平臺新秀Graylog,比ELK輕量~
          下一代Docker鏡像構(gòu)建神器 BuildKit
          面試數(shù)十家Linux運維工程師,總結(jié)了這些面試題(含答案)
          七年老運維實戰(zhàn)中的 Shell 開發(fā)經(jīng)驗總結(jié)
          快速入門 Ansible 自動化運維工具 | 16張圖
          最強(qiáng)整理!常用正則表達(dá)式速查手冊
          搭建一套完整的企業(yè)級 K8s 集群(v1.20,二進(jìn)制方式)
          12年資深運維老司機(jī)的成長感悟



          點亮,服務(wù)器三年不宕機(jī)

          瀏覽 75
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产精品久久久久久久久久中字幕 | 超碰国内自拍 | 成人影片亚洲 | 逼视频欧美 | 久久成人人人人精品欧 |