<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          GoRxReactiveX 的 Golang 實(shí)現(xiàn)

          聯(lián)合創(chuàng)作 · 2023-09-19 10:42

          • 代碼精簡(jiǎn)、可讀性強(qiáng)(盡可能用最少的代碼實(shí)現(xiàn))
          • 設(shè)計(jì)精妙、實(shí)現(xiàn)優(yōu)雅(盡可能利用golang的語言特點(diǎn)和優(yōu)勢(shì))
          • 可擴(kuò)展性強(qiáng)(可自定義Observable以及Operator)
          • 占用系統(tǒng)資源低(盡一切可能減少創(chuàng)建goroutine和其他對(duì)象)
          • 性能強(qiáng)(盡一切可能減少計(jì)算量)

          每一行代碼都是深思熟慮……

          已實(shí)現(xiàn)的功能

          Observable

          FromSlice FromChan Of Range Subject Timeout Interval Merge Concat Race CombineLatest Empty Never Throw

          Operator

          Do Take TakeWhile TakeUntil Skip SkipWhile SkipUntil IgnoreElements Share StartWith Zip Filter Distinct DistinctUntilChanged Debounce DebounceTime Throttle ThrottleTime First Last Count Max Min Reduce Map MapTo MergeMap MergeMapTo SwitchMap SwitchMapTo

          使用方法

          鏈?zhǔn)秸{(diào)用方式

          import (
              . "github.com/langhuihui/RxGo/rx"
          )
          func main(){
              err := Of(1, 2, 3, 4).Take(2).Subscribe(ObserverFunc(func(event *Event) {
                  
              }))
          }

          管道模式

          import (
              . "github.com/langhuihui/RxGo/rx"
              . "github.com/langhuihui/RxGo/pipe"
          )
          func main(){
              err := Of(1, 2, 3, 4).Pipe(Skip(1),Take(2)).Subscribe(ObserverFunc(func(event *Event) {
                  
              }))
          }

          管道模式相比鏈?zhǔn)侥J剑哂胁僮鞣?strong>可擴(kuò)展性,用戶可以按照規(guī)則創(chuàng)建屬于自己的操作符

          type Operator func(Observable) Observable

          操作符只需要返回Operator這個(gè)類型即可,例如 實(shí)現(xiàn)一個(gè)happy為false就立即完成的操作符

          func MyOperator(happy bool) Operator {
          	return func(source Observable) Observable {
          		return func (sink *Observer) error {
                      if happy{
                          return source(sink)
                      }
                      return nil
          		}
          	}
          }
          

          創(chuàng)建自定義Observable

          在任何時(shí)候,您都可以創(chuàng)建自定義的Observable,用來發(fā)送任何事件

          import (
              . "github.com/langhuihui/RxGo/rx"
          )
          func MyObservable (sink *Control) error {
              sink.Next("hello")
              return nil
          }
          func main(){
              ob := Observable(MyObservable)
              ob.Subscribe(ObserverFunc(func(event *Event) {
                  
              }))
          }

          設(shè)計(jì)思想

          基本知識(shí)

          所謂Observable,就是一個(gè)可以被訂閱,然后不斷發(fā)送事件的事件源,見如下示意圖

                                          time -->
          
          (*)-------------(o)--------------(o)---------------(x)----------------|>
           |               |                |                 |                 |
          Start          value            value             error              Done

          該示意圖代表了,事件被訂閱后(Start)開始不停發(fā)送事件的過程,直到發(fā)出error或者Done(完成)為止

          有的Observable并不會(huì)發(fā)出完成事件,比如Never

          參考網(wǎng)站: rxmarbles

          總體方案

          實(shí)現(xiàn)Rx的關(guān)鍵要素,是要問幾個(gè)問題

          1. 如何定義Observable,?(一個(gè)結(jié)構(gòu)體?一個(gè)函數(shù)?一個(gè)接口?一個(gè)Channel?)
          2. 如何實(shí)現(xiàn)訂閱邏輯?(調(diào)用函數(shù)?發(fā)送數(shù)據(jù)?)
          3. 如何實(shí)現(xiàn)接受數(shù)據(jù)?(如何實(shí)現(xiàn)Observer?)
          4. 如何實(shí)現(xiàn)完成/錯(cuò)誤的傳遞?
          5. 如何實(shí)現(xiàn)取消訂閱?(難點(diǎn):在事件響應(yīng)中取消,以及在任何其他goroutine中取消訂閱)
          6. 如何實(shí)現(xiàn)操作符
          7. 操作符如何處理連鎖反應(yīng),比如后面描述的情況
          8. 如何實(shí)現(xiàn)鏈?zhǔn)胶凸艿纼煞N編程模式
          9. 如何讓用戶擴(kuò)展(自定義)Observable操作符
          10. 如何向普通用戶解釋復(fù)雜的概念
          • 當(dāng)用戶需要訂閱或者終止事件流,則進(jìn)行鏈路傳遞,訂閱或者終止所有中間過程中的事件源
          Observable---------Operator----------Operator-----------Observer
                       <|                <|                <|          
                     訂閱/取消          訂閱/取消          訂閱/取消         
          • 當(dāng)事件流完成或者報(bào)錯(cuò)時(shí),需要通知下游事件流的完成或者報(bào)錯(cuò)
          Observable---------Operator----------Operator-----------Observer
                       |>                |>                |>          
                     完成/錯(cuò)誤          完成/錯(cuò)誤          完成/錯(cuò)誤         

          實(shí)際情況遠(yuǎn)比這個(gè)復(fù)雜,后面會(huì)進(jìn)行分析

          可觀察對(duì)象(事件源)Observable

          Observable 被定義成為一個(gè)函數(shù),該函數(shù)含有一個(gè)類型為*Observer的參數(shù)。

          type Observable func(*Observer) error

          任何事件源都是這樣的一個(gè)函數(shù),當(dāng)調(diào)用該函數(shù)即意味著訂閱了該事件源,入?yún)橐粋€(gè)Observer,具體功能見下面

          如果該函數(shù)返回nil,即意味著事件流完成

          否則意味著事件流異常

          觀察者對(duì)象Observer

          type Stop chan bool
          type Observer struct {
          	next     NextHandler //緩存當(dāng)前的NextHandler,后續(xù)可以被替換
          	dispose  Stop        //取消訂閱的信號(hào),只用來close
          	complete Stop        //用于發(fā)出完成信號(hào)
          	err      error       //緩存當(dāng)前的錯(cuò)誤
          }

          該控制器為一個(gè)結(jié)構(gòu)體,其中next記錄了當(dāng)前的NextHandler,

          在任何時(shí)候,如果關(guān)閉了dispose這個(gè)channel,就意味著取消訂閱

          //Dispose 取消訂閱
          func (c *Observer) Dispose() {
          	select {
          	case <-c.dispose:
          	default:
          		close(c.dispose)
          	}
          }
          
          //Aborted 判斷是否已經(jīng)取消訂閱或者已完成
          func (c *Observer) Aborted() bool {
          	select {
          	case <-c.dispose:
          		return true
          	case <-c.complete:
          		return true
          	default:
          		return false
          	}
          }

          由于Channel的close可以引發(fā)所有讀取該Channel的阻塞行為喚醒,所以可以在不同層級(jí)復(fù)用該channel

          并且,由于已經(jīng)close的channel可以反復(fù)讀取以取得是否close的狀態(tài)信息,所以不需要再額外記錄

          Observer對(duì)象為Observable和事件處理邏輯共同持有,是二者溝通的橋梁

          NextHandler

          type Event struct {
              Data    interface{}
              Target  *Observer
          }
          NextHandler interface {
              OnNext(*Event)
          }

          NextHandler是一個(gè)接口,實(shí)現(xiàn)OnNext函數(shù),當(dāng)Observable數(shù)據(jù)推送到Observer中時(shí),即調(diào)用了該函數(shù)

          Target屬性用于存儲(chǔ)當(dāng)前發(fā)送事件的Observer對(duì)象,有兩大重要使命

          1. 更換NextHandler,用于減少數(shù)據(jù)傳遞過程
          2. 在NextHandler過程中終止事件流

          這樣做的好處是可以實(shí)現(xiàn)不同的觀察者,比如函數(shù)或者channel

          type(
              NextFunc func(*Event)
              NextChan chan *Event
          )
          func (next NextFunc) OnNext(event *Event) {
          	next(event)
          }
          func (next NextChan) OnNext(event *Event) {
          	next <- event
          }

          實(shí)現(xiàn)案例TakeUntil

          //TakeUntil 一直獲取事件直到unitl傳來事件為止
          func (ob Observable) TakeUntil(until Observable) Observable {
          	return func(sink *Observer) error {
          		go until(sink.New3(NextFunc(func(event *Event) {
          			//獲取到任何數(shù)據(jù)就讓下游完成
          			sink.Complete() //由于復(fù)用了complete信號(hào),所以會(huì)導(dǎo)致所有復(fù)用complete的事件流完成
          		})))
          		return ob(sink)
          	}
          }

          TakeUnitl的用途是,傳入一個(gè)until事件源,當(dāng)這個(gè)until事件源接受到事件時(shí),就會(huì)導(dǎo)致當(dāng)前的事件源"完成”。相當(dāng)于某種中斷信號(hào)。

          看似簡(jiǎn)短的代碼,確考慮各種不同的情形

          幾大實(shí)現(xiàn)細(xì)節(jié):

          1. 訂閱until事件源,通過go關(guān)鍵字創(chuàng)建goroutine防止阻塞當(dāng)前goroutine
          2. 使用函數(shù)式NextHandler,用戶接受來自u(píng)ntil事件源的事件,一旦接受任何事件,就調(diào)用sink.Complete()來使得當(dāng)前事件流完成
          3. 訂閱until事件源的Observer(sink.New3創(chuàng)建)復(fù)用了sink.dispose和sink.complete兩個(gè)信號(hào),當(dāng)用戶在代碼中取消了訂閱,就會(huì)引發(fā)該until事件源的取消訂閱行為
          4. 最后一步是訂閱上游事件源,我們不創(chuàng)建新的Observer,而直接把下游的Observer傳入,避免了不必要的轉(zhuǎn)發(fā)邏輯
          5. 任何情況取消訂閱,或者上游事件源完成都可以使得事件源函數(shù)返回,接著TakeUntil函數(shù)也會(huì)返回,即意味著完成
          6. until事件源的完成或者錯(cuò)誤,都將忽略,所以我們沒有去獲取until函數(shù)返回值
          瀏覽 36
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          編輯 分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          編輯 分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  青草午夜| 成人激情五月 | 欧美性精品性爱第一页 | 水多多精品视频 | 久久视频成人 |