<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          流數(shù)據(jù)處理利器

          共 753字,需瀏覽 2分鐘

           ·

          2020-10-14 08:20

          流處理 (Stream processing) 是一種計算機編程范式,其允許給定一個數(shù)據(jù)序列 (流處理數(shù)據(jù)源),一系列數(shù)據(jù)操作 (函數(shù)) 被應用到流中的每個元素。同時流處理工具可以顯著提高程序員的開發(fā)效率,允許他們編寫有效、干凈和簡潔的代碼。

          流數(shù)據(jù)處理在我們的日常工作中非常常見,舉個例子,我們在業(yè)務開發(fā)中往往會記錄許多業(yè)務日志,這些日志一般是先發(fā)送到 Kafka,然后再由 Job 消費 Kafaka 寫到 elasticsearch,在進行日志流處理的過程中,往往還會對日志做一些處理,比如過濾無效的日志,做一些計算以及重新組合日志等等,示意圖如下:


          流處理工具 fx

          go-zero是一個功能完備的微服務框架,框架中內(nèi)置了很多非常實用的工具,其中就包含流數(shù)據(jù)處理工具fx,下面我們通過一個簡單的例子來認識下該工具:

          package main

          import (
          "fmt"
          "os"
          "os/signal"
          "syscall"
          "time"

          "github.com/tal-tech/go-zero/core/fx"
          )

          func main() {
          ch := make(chan int)

          go inputStream(ch)
          go outputStream(ch)

          c := make(chan os.Signal, 1)
          signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
          <-c
          }

          func inputStream(ch chan int) {
          count := 0
          for {
          ch <- count
          time.Sleep(time.Millisecond * 500)
          count++
          }
          }

          func outputStream(ch chan int) {
          fx.From(func(source chan<- interface{}) {
          for c := range ch {
          source <- c
          }
          }).Walk(func(item interface{}, pipe chan<- interface{}) {
          count := item.(int)
          pipe <- count
          }).Filter(func(item interface{}) bool {
          itemInt := item.(int)
          if itemInt%2 == 0 {
          return true
          }
          return false
          }).ForEach(func(item interface{}) {
          fmt.Println(item)
          })
          }

          inputStream 函數(shù)模擬了流數(shù)據(jù)的產(chǎn)生,outputStream 函數(shù)模擬了流數(shù)據(jù)的處理過程,其中 From 函數(shù)為流的輸入,Walk 函數(shù)并發(fā)的作用在每一個 item 上,F(xiàn)ilter 函數(shù)對 item 進行過濾為 true 保留為 false 不保留,F(xiàn)orEach 函數(shù)遍歷輸出每一個 item 元素。

          流數(shù)據(jù)處理中間操作

          一個流的數(shù)據(jù)處理可能存在許多的中間操作,每個中間操作都可以作用在流上。就像流水線上的工人一樣,每個工人操作完零件后都會返回處理完成的新零件,同理流處理中間操作完成后也會返回一個新的流。



          fx 的流處理中間操作:

          操作函數(shù)功能輸入
          Distinct去除重復的 itemKeyFunc,返回需要去重的 key
          Filter過濾不滿足條件的 itemFilterFunc,Option 控制并發(fā)量
          Group對 item 進行分組KeyFunc,以 key 進行分組
          Head取出前 n 個 item,返回新 streamint64 保留數(shù)量
          Map對象轉(zhuǎn)換MapFunc,Option 控制并發(fā)量
          Merge合并 item 到 slice 并生成新 stream
          Reverse反轉(zhuǎn) item
          Sort對 item 進行排序LessFunc 實現(xiàn)排序算法
          Tail與 Head 功能類似,取出后 n 個 item 組成新 streamint64 保留數(shù)量
          Walk作用在每個 item 上WalkFunc,Option 控制并發(fā)量

          下圖展示了每個步驟和每個步驟的結(jié)果:



          用法與原理分析

          From

          通過 From 函數(shù)構(gòu)建流并返回 Stream,流數(shù)據(jù)通過 channel 進行存儲:

          // 例子
          s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
          fx.From(func(source chan<- interface{}) {
          for _, v := range s {
          source <- v
          }
          })

          // 源碼
          func From(generate GenerateFunc) Stream {
          source := make(chan interface{})

          go func() {
          defer close(source)
          // 構(gòu)造流數(shù)據(jù)寫入channel
          generate(source)
          }()

          return Range(source)
          }

          Filter

          Filter 函數(shù)提供過濾 item 的功能,F(xiàn)ilterFunc 定義過濾邏輯 true 保留 item,false 則不保留:

          // 例子 保留偶數(shù)
          s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
          fx.From(func(source chan<- interface{}) {
          for _, v := range s {
          source <- v
          }
          }).Filter(func(item interface{}) bool {
          if item.(int)%2 == 0 {
          return true
          }
          return false
          })

          // 源碼
          func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
          return p.Walk(func(item interface{}, pipe chan<- interface{}) {
          // 執(zhí)行過濾函數(shù)true保留,false丟棄
          if fn(item) {
          pipe <- item
          }
          }, opts...)
          }

          Group

          Group 對流數(shù)據(jù)進行分組,需定義分組的 key,數(shù)據(jù)分組后以 slice 存入 channel:


          // 例子 按照首字符"g"或者"p"分組,沒有則分到另一組
          ss := []string{"golang", "google", "php", "python", "java", "c++"}
          fx.From(func(source chan<- interface{}) {
          for _, s := range ss {
          source <- s
          }
          }).Group(func(item interface{}) interface{} {
          if strings.HasPrefix(item.(string), "g") {
          return "g"
          } else if strings.HasPrefix(item.(string), "p") {
          return "p"
          }
          return ""
          }).ForEach(func(item interface{}) {
          fmt.Println(item)
          })

          // 源碼
          func (p Stream) Group(fn KeyFunc) Stream {
          // 定義分組存儲map
          groups := make(map[interface{}][]interface{})
          for item := range p.source {
          // 用戶自定義分組key
          key := fn(item)
          // key相同分到一組
          groups[key] = append(groups[key], item)
          }

          source := make(chan interface{})
          go func() {
          for _, group := range groups {
          // 相同key的一組數(shù)據(jù)寫入到channel
          source <- group
          }
          close(source)
          }()

          return Range(source)
          }

          Reverse

          reverse 可以對流中元素進行反轉(zhuǎn)處理:



          // 例子
          fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {
          fmt.Println(item)
          })

          // 源碼
          func (p Stream) Reverse() Stream {
          var items []interface{}
          // 獲取流中數(shù)據(jù)
          for item := range p.source {
          items = append(items, item)
          }
          // 反轉(zhuǎn)算法
          for i := len(items)/2 - 1; i >= 0; i-- {
          opp := len(items) - 1 - i
          items[i], items[opp] = items[opp], items[i]
          }

          // 寫入流
          return Just(items...)
          }

          Distinct

          distinct 對流中元素進行去重,去重在業(yè)務開發(fā)中比較常用,經(jīng)常需要對用戶 id 等做去重操作:

          // 例子
          fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {
          return item
          }).ForEach(func(item interface{}) {
          fmt.Println(item)
          })
          // 結(jié)果為 1,2,3,4,5,6

          // 源碼
          func (p Stream) Distinct(fn KeyFunc) Stream {
          source := make(chan interface{})

          threading.GoSafe(func() {
          defer close(source)
          // 通過key進行去重,相同key只保留一個
          keys := make(map[interface{}]lang.PlaceholderType)
          for item := range p.source {
          key := fn(item)
          // key存在則不保留
          if _, ok := keys[key]; !ok {
          source <- item
          keys[key] = lang.Placeholder
          }
          }
          })

          return Range(source)
          }

          Walk

          Walk 函數(shù)并發(fā)的作用在流中每一個 item 上,可以通過 WithWorkers 設(shè)置并發(fā)數(shù),默認并發(fā)數(shù)為 16,最小并發(fā)數(shù)為 1,如設(shè)置 unlimitedWorkers 為 true 則并發(fā)數(shù)無限制,但并發(fā)寫入流中的數(shù)據(jù)由 defaultWorkers 限制,WalkFunc 中用戶可以自定義后續(xù)寫入流中的元素,可以不寫入也可以寫入多個元素:

          // 例子
          fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {
          newItem := strings.ToUpper(item.(string))
          pipe <- newItem
          }).ForEach(func(item interface{}) {
          fmt.Println(item)
          })

          // 源碼
          func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
          pipe := make(chan interface{}, option.workers)

          go func() {
          var wg sync.WaitGroup
          pool := make(chan lang.PlaceholderType, option.workers)

          for {
          // 控制并發(fā)數(shù)量
          pool <- lang.Placeholder
          item, ok := <-p.source
          if !ok {
          <-pool
          break
          }

          wg.Add(1)
          go func() {
          defer func() {
          wg.Done()
          <-pool
          }()
          // 作用在每個元素上
          fn(item, pipe)
          }()
          }

          // 等待處理完成
          wg.Wait()
          close(pipe)
          }()

          return Range(pipe)
          }

          并發(fā)處理

          fx 工具除了進行流數(shù)據(jù)處理以外還提供了函數(shù)并發(fā)功能,在微服務中實現(xiàn)某個功能往往需要依賴多個服務,并發(fā)的處理依賴可以有效的降低依賴耗時,提升服務的性能。



          fx.Parallel(func() {
          userRPC() // 依賴1
          }, func() {
          accountRPC() // 依賴2
          }, func() {
          orderRPC() // 依賴3
          })

          注意 fx.Parallel 進行依賴并行處理的時候不會有 error 返回,如需有 error 返回或者有一個依賴報錯需要立馬結(jié)束依賴請求請使用MapReduce工具進行處理。

          總結(jié)

          本篇文章介紹了流處理的基本概念和 go-zero 中的流處理工具 fx,在實際的生產(chǎn)中流處理場景應用也非常多,希望本篇文章能給大家?guī)硪欢ǖ膯l(fā),更好的應對工作中的流處理場景。

          項目地址

          https://github.com/tal-tech/go-zero

          組件地址

          https://github.com/tal-tech/go-zero/tree/master/core/fx

          Example

          https://github.com/tal-tech/go-zero/tree/master/example/fx

          微信交流群

          瀏覽 51
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  欧美18禁黄免费网站 | 日韩成人激情视频 | 玖玖在线视频 | 91 国产 爽 黄 | 国产又粗又黄 |