RxGoGo 語言的 Reactive 擴(kuò)展
RxGo 是 Go 語言的 Reactive 擴(kuò)展。
安裝
go get -u github.com/jochasinga/rxgo
用法
watcher := observer.Observer{
// Register a handler function for every next available item.
NextHandler: func(item interface{}) {
fmt.Printf("Processing: %v\n", item)
},
// Register a handler for any emitted error.
ErrHandler: func(err error) {
fmt.Printf("Encountered error: %v\n", err)
},
// Register a handler when a stream is completed.
DoneHandler: func() {
fmt.Println("Done!")
},
}
it, _ := iterable.New([]interface{}{1, 2, 3, 4, errors.New("bang"), 5})
source := observable.From(it)
sub := source.Subscribe(watcher)
// wait for the async operation
<-sub
以上將:
將切片中每個(gè)數(shù)字的格式字符串 print 為4。
print 錯(cuò)誤“bang”
重要的是要記住,只有一個(gè) OnError 或 OnDone 可以在 stream 中調(diào)用。 如果 stream 中有錯(cuò)誤,處理停止,OnDone 將永遠(yuǎn)不會(huì)被調(diào)用,反之亦然。
概念是將所有“side effects”分組到這些處理程序中,讓一個(gè) Observer 或任何 EventHandler 處理它們。
package main
import (
"fmt"
"time"
"github.com/jochasinga/rx"
"github.com/jochasinga/rx/handlers"
)
func main() {
score := 9
onNext := handlers.NextFunc(func(item interface{}) {
if num, ok := item.(int); ok {
score += num
}
})
onDone := handlers.DoneFunc(func() {
score *= 2
})
watcher := observer.New(onNext, onDone)
// Create an `Observable` from a single item and subscribe to the observer.
sub := observable.Just(1).Subscribe(watcher)
<-sub
fmt.Println(score) // 20
}評(píng)論
圖片
表情
