GoRxReactiveX 的 Golang 實(shí)現(xiàn)
- 代碼精簡(jiǎn)、可讀性強(qiáng)(盡可能用最少的代碼實(shí)現(xiàn))
- 設(shè)計(jì)精妙、實(shí)現(xiàn)優(yōu)雅(盡可能利用golang的語言特點(diǎn)和優(yōu)勢(shì))
- 可擴(kuò)展性強(qiáng)(可自定義Observable以及Operator)
- 占用系統(tǒng)資源低(盡一切可能減少創(chuàng)建goroutine和其他對(duì)象)
- 性能強(qiáng)(盡一切可能減少計(jì)算量)
每一行代碼都是深思熟慮……
已實(shí)現(xiàn)的功能
Observable
FromSlice FromChan Of Range Subject Timeout Interval Merge Concat Race CombineLatest Empty Never Throw
Operator
Do Take TakeWhile TakeUntil Skip SkipWhile SkipUntil IgnoreElements Share StartWith Zip Filter Distinct DistinctUntilChanged Debounce DebounceTime Throttle ThrottleTime First Last Count Max Min Reduce Map MapTo MergeMap MergeMapTo SwitchMap SwitchMapTo
使用方法
鏈?zhǔn)秸{(diào)用方式
import (
. "github.com/langhuihui/RxGo/rx"
)
func main(){
err := Of(1, 2, 3, 4).Take(2).Subscribe(ObserverFunc(func(event *Event) {
}))
}
管道模式
import (
. "github.com/langhuihui/RxGo/rx"
. "github.com/langhuihui/RxGo/pipe"
)
func main(){
err := Of(1, 2, 3, 4).Pipe(Skip(1),Take(2)).Subscribe(ObserverFunc(func(event *Event) {
}))
}
管道模式相比鏈?zhǔn)侥J剑哂胁僮鞣?strong>可擴(kuò)展性,用戶可以按照規(guī)則創(chuàng)建屬于自己的操作符
type Operator func(Observable) Observable
操作符只需要返回Operator這個(gè)類型即可,例如 實(shí)現(xiàn)一個(gè)happy為false就立即完成的操作符
func MyOperator(happy bool) Operator {
return func(source Observable) Observable {
return func (sink *Observer) error {
if happy{
return source(sink)
}
return nil
}
}
}
創(chuàng)建自定義Observable
在任何時(shí)候,您都可以創(chuàng)建自定義的Observable,用來發(fā)送任何事件
import (
. "github.com/langhuihui/RxGo/rx"
)
func MyObservable (sink *Control) error {
sink.Next("hello")
return nil
}
func main(){
ob := Observable(MyObservable)
ob.Subscribe(ObserverFunc(func(event *Event) {
}))
}
設(shè)計(jì)思想
基本知識(shí)
所謂Observable,就是一個(gè)可以被訂閱,然后不斷發(fā)送事件的事件源,見如下示意圖
time --> (*)-------------(o)--------------(o)---------------(x)----------------|> | | | | | Start value value error Done
該示意圖代表了,事件被訂閱后(Start)開始不停發(fā)送事件的過程,直到發(fā)出error或者Done(完成)為止
有的Observable并不會(huì)發(fā)出完成事件,比如Never
參考網(wǎng)站: rxmarbles
總體方案
實(shí)現(xiàn)Rx的關(guān)鍵要素,是要問幾個(gè)問題
- 如何定義
Observable,?(一個(gè)結(jié)構(gòu)體?一個(gè)函數(shù)?一個(gè)接口?一個(gè)Channel?) - 如何實(shí)現(xiàn)訂閱邏輯?(調(diào)用函數(shù)?發(fā)送數(shù)據(jù)?)
- 如何實(shí)現(xiàn)接受數(shù)據(jù)?(如何實(shí)現(xiàn)
Observer?) - 如何實(shí)現(xiàn)完成/錯(cuò)誤的傳遞?
- 如何實(shí)現(xiàn)取消訂閱?(難點(diǎn):在事件響應(yīng)中取消,以及在任何其他goroutine中取消訂閱)
- 如何實(shí)現(xiàn)操作符?
- 操作符如何處理連鎖反應(yīng),比如后面描述的情況
- 如何實(shí)現(xiàn)鏈?zhǔn)胶凸艿纼煞N編程模式
- 如何讓用戶擴(kuò)展(自定義)
Observable和操作符 - 如何向普通用戶解釋復(fù)雜的概念
- 當(dāng)用戶需要訂閱或者終止事件流,則進(jìn)行鏈路傳遞,訂閱或者終止所有中間過程中的事件源
Observable---------Operator----------Operator-----------Observer
<| <| <|
訂閱/取消 訂閱/取消 訂閱/取消
- 當(dāng)事件流完成或者報(bào)錯(cuò)時(shí),需要通知下游事件流的完成或者報(bào)錯(cuò)
Observable---------Operator----------Operator-----------Observer
|> |> |>
完成/錯(cuò)誤 完成/錯(cuò)誤 完成/錯(cuò)誤
實(shí)際情況遠(yuǎn)比這個(gè)復(fù)雜,后面會(huì)進(jìn)行分析
可觀察對(duì)象(事件源)Observable
Observable 被定義成為一個(gè)函數(shù),該函數(shù)含有一個(gè)類型為*Observer的參數(shù)。
type Observable func(*Observer) error
任何事件源都是這樣的一個(gè)函數(shù),當(dāng)調(diào)用該函數(shù)即意味著訂閱了該事件源,入?yún)橐粋€(gè)Observer,具體功能見下面
如果該函數(shù)返回nil,即意味著事件流完成
否則意味著事件流異常
觀察者對(duì)象Observer
type Stop chan bool
type Observer struct {
next NextHandler //緩存當(dāng)前的NextHandler,后續(xù)可以被替換
dispose Stop //取消訂閱的信號(hào),只用來close
complete Stop //用于發(fā)出完成信號(hào)
err error //緩存當(dāng)前的錯(cuò)誤
}
該控制器為一個(gè)結(jié)構(gòu)體,其中next記錄了當(dāng)前的NextHandler,
在任何時(shí)候,如果關(guān)閉了dispose這個(gè)channel,就意味著取消訂閱。
//Dispose 取消訂閱
func (c *Observer) Dispose() {
select {
case <-c.dispose:
default:
close(c.dispose)
}
}
//Aborted 判斷是否已經(jīng)取消訂閱或者已完成
func (c *Observer) Aborted() bool {
select {
case <-c.dispose:
return true
case <-c.complete:
return true
default:
return false
}
}
由于Channel的close可以引發(fā)所有讀取該Channel的阻塞行為喚醒,所以可以在不同層級(jí)復(fù)用該channel
并且,由于已經(jīng)close的channel可以反復(fù)讀取以取得是否close的狀態(tài)信息,所以不需要再額外記錄
Observer對(duì)象為Observable和事件處理邏輯共同持有,是二者溝通的橋梁
NextHandler
type Event struct {
Data interface{}
Target *Observer
}
NextHandler interface {
OnNext(*Event)
}
NextHandler是一個(gè)接口,實(shí)現(xiàn)OnNext函數(shù),當(dāng)Observable數(shù)據(jù)推送到Observer中時(shí),即調(diào)用了該函數(shù)
Target屬性用于存儲(chǔ)當(dāng)前發(fā)送事件的Observer對(duì)象,有兩大重要使命
- 更換NextHandler,用于減少數(shù)據(jù)傳遞過程
- 在NextHandler過程中終止事件流
這樣做的好處是可以實(shí)現(xiàn)不同的觀察者,比如函數(shù)或者channel
type(
NextFunc func(*Event)
NextChan chan *Event
)
func (next NextFunc) OnNext(event *Event) {
next(event)
}
func (next NextChan) OnNext(event *Event) {
next <- event
}
實(shí)現(xiàn)案例TakeUntil
//TakeUntil 一直獲取事件直到unitl傳來事件為止
func (ob Observable) TakeUntil(until Observable) Observable {
return func(sink *Observer) error {
go until(sink.New3(NextFunc(func(event *Event) {
//獲取到任何數(shù)據(jù)就讓下游完成
sink.Complete() //由于復(fù)用了complete信號(hào),所以會(huì)導(dǎo)致所有復(fù)用complete的事件流完成
})))
return ob(sink)
}
}
TakeUnitl的用途是,傳入一個(gè)until事件源,當(dāng)這個(gè)until事件源接受到事件時(shí),就會(huì)導(dǎo)致當(dāng)前的事件源"完成”。相當(dāng)于某種中斷信號(hào)。
看似簡(jiǎn)短的代碼,確考慮各種不同的情形
幾大實(shí)現(xiàn)細(xì)節(jié):
- 訂閱until事件源,通過go關(guān)鍵字創(chuàng)建goroutine防止阻塞當(dāng)前goroutine
- 使用函數(shù)式NextHandler,用戶接受來自u(píng)ntil事件源的事件,一旦接受任何事件,就調(diào)用sink.Complete()來使得當(dāng)前事件流完成
- 訂閱until事件源的Observer(sink.New3創(chuàng)建)復(fù)用了sink.dispose和sink.complete兩個(gè)信號(hào),當(dāng)用戶在代碼中取消了訂閱,就會(huì)引發(fā)該until事件源的取消訂閱行為
- 最后一步是訂閱上游事件源,我們不創(chuàng)建新的Observer,而直接把下游的Observer傳入,避免了不必要的轉(zhuǎn)發(fā)邏輯
- 任何情況取消訂閱,或者上游事件源完成都可以使得事件源函數(shù)返回,接著TakeUntil函數(shù)也會(huì)返回,即意味著完成
- until事件源的完成或者錯(cuò)誤,都將忽略,所以我們沒有去獲取until函數(shù)返回值
