RxJS 入門及應用
大廠技術??堅持周更??精選好文
RxJS 入門及應用
RxJS為什么被稱為是函數(shù)響應式編程?
函數(shù)式編程(Functional Programming)
函數(shù)式編程的特點是允許把函數(shù)當成一個實參或返回值,主要思想是想將復雜的運算分解成一系列嵌套的函數(shù),逐層推導,不斷漸進,直至完成運算。常用的數(shù)組方法(map,filter等)就運用了函數(shù)式編程的思想。
const?arr?=?[4,1,5,2,3];
const?newArr?=?arr
??.sort((a,b)?=>?a-b)
??.filter(value?=>?value>2);
console.log(newArr);???//?[3,4,5]
函數(shù)式編程還有一個最重要的特性,那就是純凈性Purity(純函數(shù))
純函數(shù)是不會產(chǎn)生副作用的函數(shù),其中輸出完全由輸入決定,也就是說無論調(diào)用多少次,調(diào)用f(x)都會得到相同的結果。
函數(shù)副作用:指當調(diào)用函數(shù)時,除了返回函數(shù)值之外,還對主調(diào)用函數(shù)產(chǎn)生附加的影響。例如修改全局變量(函數(shù)外的變量),修改參數(shù)或改變外部存儲。
純函數(shù)滿足以下兩個條件
①函數(shù)的執(zhí)行過程完全由輸入?yún)?shù)決定,不會受除參數(shù)之外的任何數(shù)據(jù)的影響。
②函數(shù)不會修改任何外部狀態(tài),比如修改全局變量或傳入的參數(shù)對象。
通常如果創(chuàng)建一個非純函數(shù),在這個函數(shù)之外使用了共享變量的代碼,會使應用狀態(tài)混亂難以維護。
var?count?=?0;
var?button?=?document.querySelector('button');
button.addEventListener('click',?
??()?=>?console.log(`Clicked?${++count}?times`)
);
而使用RxJS,可以將應用狀態(tài)隔離出來,不會被外部環(huán)境影響也不會影響外部環(huán)境。
import?{?fromEvent,?scan?}?from?'rxjs';?//?將事件轉換成 observable 序列。
var?button?=?document.querySelector('button');
const?example?=?fromEvent(button,?'click').pipe(
??scan(count?=>?count?+?1,?0);?//工作原理與數(shù)組的?reduce?類似,隨著時間的推移進行歸并
);
example.subscribe(count?=>?console.log(`Clicked?${count}?times`));
響應式編程(Reactive Programming)
wiki百科中的解釋:
在計算中,響應式編程或反應式編程(Reactive programming)是一種面向數(shù)據(jù)流和變化傳播的聲明式編程范式。這意味著可以在編程語言中很方便地表達靜態(tài)或動態(tài)的數(shù)據(jù)流,而相關的計算模型會自動將變化的值通過數(shù)據(jù)流進行傳播。
什么是數(shù)據(jù)流?
數(shù)據(jù)流(data stream)是數(shù)據(jù)在系統(tǒng)內(nèi)傳播的路徑,表示在一定時間范圍內(nèi)發(fā)生的一系列事件。
任何東西都可以是一個 Stream:變量、用戶輸入、網(wǎng)絡響應、定時器、數(shù)據(jù)結構等等。
什么是變化傳播?
在數(shù)據(jù)流傳播的過程中,可能會有一些事件去組合、創(chuàng)建、過濾這些 Streams,從一個舊的stream映射成一個新的stream。我們不需要去輪詢變化,而是對事件進行監(jiān)聽,在執(zhí)行一個事件后,會自動做出相應的響應,這就是變化傳播。
前端框架與rxjs的結合
前端框架的職責(例如react):數(shù)據(jù)與UI視圖的同步,數(shù)據(jù)發(fā)生更新時,視圖隨之更新;
UI?=?f(data);
響應式編程的職責(例如rxjs):聚焦于數(shù)據(jù),從數(shù)據(jù)的源頭開始,到數(shù)據(jù)的處理變化,再到數(shù)據(jù)流的訂閱,數(shù)據(jù)的消費;
data?=?g(origin?data)
兩者關系看起并不沖突,并且在某些場景下結合使用可能會為我們帶來便捷,前端框架可以作為響應式編程數(shù)據(jù)的一個消費者;
UI?=?f(g(origin?data))
RxJS是用來干什么的?
RxJS是一個用于處理異步事件流的庫,通過使用 observable 序列來編寫異步和基于事件的程序,實際應用場景就是把請求封裝成observerable,通過一些基本操作符(map、filter等等)將返回的數(shù)據(jù)處理并且catch錯誤,將異步事件作為集合來處理。RxJS實際上是將開發(fā)過程中遇到的異步(多為異步,同步也可以)操作看為一個事件流,RxJS內(nèi)部封裝了對一個事件流的操作符(創(chuàng)建、轉換、組合、過濾、錯誤異常處理等),組合使用這些操作符來以更便利的方式來管理事件。
為什么用RxJS,摘自知乎回答:
思考一下,異步的本質(zhì)是什么?
異步操作和同步操作最大的區(qū)別就是異步有時序。
我們可以把同步操作理解為:數(shù)據(jù)+函數(shù)
那么異步操作就是:數(shù)據(jù)+函數(shù)+時序
Rx就是把時序抽離成一根時間軸,在這根時間軸上進行同步操作,而異步相關的時序處理就交給Rx提供的各種operator操作符。
所以問題就很簡單了,如果你的應用是一個時序密集的應用,那么使用Rx能幫你理清復雜的異步邏輯。反之,如果異步操作之間沒有太多的聯(lián)系,時序分散, 則不那么需要使用Rx。
RxJS中解決異步事件管理的基本概念
1. Observable(可觀察對象)
將一個數(shù)據(jù)流看作一個可觀察對象,表示這個數(shù)據(jù)流變化傳播過程中發(fā)生的一些事件的集合。
| 單個值 | 多個值 | |
|---|---|---|
| 拉取(pull) | Function | Iterator |
| 推送(push) | Promise | Observable |
拉取和推送是兩種不同的協(xié)議,用來描述數(shù)據(jù)生產(chǎn)者 (Producer) 與數(shù)據(jù)消費者 (Consumer) 如何通信。
拉取體系
js中每個函數(shù)function都屬于拉取體系,函數(shù)來生產(chǎn)數(shù)據(jù),消費者通過調(diào)用該函數(shù)的代碼來從函數(shù)中獲取單個返回值來對該函數(shù)進行消費,而迭代器Iterator則是消費者調(diào)用iterator.next()來獲取多個返回值進行消費。
拉取的過程中,生產(chǎn)者是一個被動的過程,在消費者請求調(diào)用自己時才產(chǎn)生數(shù)據(jù),消費者是一個主動的過程,消費者自己來決定何時調(diào)用生產(chǎn)者來獲取收據(jù)。
推送體系
在如今的js中,Promise是最常見的推送體系,Promise作為生產(chǎn)者,將解析過的
resolved值傳給消費者注冊過的一個回調(diào)函數(shù)。推送的過程中,生產(chǎn)者是一個主動的過程,在生產(chǎn)者獲取
resolved值的時候,生產(chǎn)者可以決定何時把值推送給消費者,而消費者并不知道什么時候可以從生產(chǎn)者這里獲取到值。在RxJS中,observable也屬于推送體系,并且可以推送一個或多個值。
上面這些術語有些抽象,舉個??更容易理解什么是Observable
Observable就像是一個function函數(shù)
Function
function?foo()?{
????console.log('Hello')
????return?'world';
}
const?x?=?foo();
console.log(x);
const?y?=?foo();
console.log(y);
Observable
const?foo?=?Observable.create(function?(observer)?{
????console.log('Hello');
????observer.next('world');
});
//?.subscribe()類似于調(diào)用函數(shù)
foo.subscribe(function?(x)?{
????console.log(x);
});
foo.subscribe(function?(y)?{
????console.log(y);
});
//?控制臺輸出是相同的:?
'Hello'
'world'
'Hello'
'world'
Observable和function的區(qū)別是什么?
Observable可以隨著時間推移返回(推送)多個值 ,這一點是函數(shù)做不到的。
Function
function?foo()?{
????return?'Hello';
????return?'world';?//?永遠不會執(zhí)行
}
const?a?=?foo();
console.log(a);
//控制臺輸出
'Hello'
Observable
const?foo?=?Observable.create(function?(observer)?{
????observer.next('Hello');
????observer.next('world');
});
foo.subscribe(function?(x)?{
????console.log(x);
});
//?控制臺輸出
'Hello'
'world'
//?也可以異步推送一些值
const?foo?=?Observable.create(function?(observer)?{
????observer.next('Hello');
????setTimeout(()?=>?{
????????observer.next('rxjs');
????},0)
????observer.next('world');
});
//?控制臺輸出
'Hello'
'world'
'rxjs'
冰墩墩占位

1.1 創(chuàng)建Observable
Observable可以使用Observable.create來創(chuàng)建,但通常我們使用創(chuàng)建操作符[1]來創(chuàng)建Observable。
1.2 ? 訂閱Observable
訂閱Observable像是調(diào)用函數(shù),并提供接收數(shù)據(jù)的回調(diào)函數(shù)。
observable.subscribe(value?=>?{
?//?do?something
})
不同觀察者通過subscribe調(diào)用同一observable數(shù)據(jù)不共享。
每一次調(diào)用,等于重新執(zhí)行一遍函數(shù)。
1.3 執(zhí)行Observable
Observable執(zhí)行可以傳遞三種類型的值:
Next:推送一個值,可以是任意類型;
Error:推送一個錯誤或者異常;
Complete:推送一個「已完成」的消息,表明不會再發(fā)送任何值;
next()方法中的值代表要推送給觀察者的實際數(shù)據(jù),可以執(zhí)行多次;
error()和complete()會在Observable執(zhí)行期間至多執(zhí)行一次,并且只會執(zhí)行其中一個;
Observable.create(observer?=>?{
????try?{
????????observer.next(1);
????????observer.next(2);
????????observer.complete();
????????observer.next(3);?//?前面已經(jīng)通知觀察者已經(jīng)完成,所以這個值不會發(fā)送
????}?catch?(e)?{
????????observer.error(e);?//?捕獲到異常發(fā)送一個錯誤
????}
})
1.4 ? 銷毀Observable執(zhí)行
Observable的執(zhí)行可能會是無限的,通常觀察者希望在一個有限的時間里終止Observable執(zhí)行,以避免浪費計算資源和內(nèi)存消耗。
類似于清除定時器,var timer = setInterval(() => {},1000); clearInterval(timer);
//?調(diào)用subscribe時,觀察者會被附加到新創(chuàng)建的Observable執(zhí)行中,
//?會返回一個對象,即Subscription(訂閱)
var?subscription?=?observable.subscribe();
// Subscription表示正在進行中的執(zhí)行,調(diào)用unsubscribe()來取消observable執(zhí)行;
subscription.unsubscribe();
2. Observer (觀察者)
Observer(觀察者)是一組回調(diào)函數(shù)的集合,每一個回調(diào)函數(shù)對應Observable發(fā)送通知的類型:next、error、complete。
const?observer?=?{
????next:?()?=>?{},?//?觀察者接收到next()消息執(zhí)行的回調(diào)函數(shù)
????error:?()?=>?{},?//?觀察者接收到error()消息執(zhí)行的回調(diào)函數(shù)
????complete:?()?=>?{},?//?接收到complete()消息執(zhí)行的回調(diào)函數(shù)
}
// observer中的觀察者可能是部分的,沒有提供某個回調(diào),observable還是可以執(zhí)行的。
//?方法1:將observer觀察者傳入subscribe
observable.subscribe(observer)
//?方法2:subscribe按順序(next,error,complete)傳入三個回調(diào)函數(shù)
observable.subscribe((value)?=>?{},(error)?=>?{},?()?=>?{})
3. ?Subscription (訂閱)
Subscription是一個可清理資源的對象,代表Observable的執(zhí)行。
基本用處就是使用unsubscribe來釋放資源或取消Observable的執(zhí)行。
4. Subject (主體)
引入一個新的概念,Cold Observable / Hot Observable。
Observable對象就是一個數(shù)據(jù)流,在一個時間范圍內(nèi)推送一系列數(shù)據(jù)。
在只存在一個observer的情況下很簡單,但是對于存在多個observer的場景,會變得復雜。
假設一個場景:
兩個observer觀察者A和B訂閱同一個Observable對象,但他們不是同時訂閱,第一個觀察者A訂閱N秒后,第二個觀察者B才訂閱這個Observable對象。并且在這N秒期間,Observable已經(jīng)推送了一些數(shù)據(jù),那么第二個觀察者B應不應該收到已經(jīng)被推送給第一個觀察者A的那些數(shù)據(jù)呢?
Selection 1 :已經(jīng)推送給觀察者A的值就不給B了,B只從訂閱那一時間點接收Observable推送的數(shù)據(jù)就行了。
Selection 2:已經(jīng)推送給觀察者A的值還是要給B,B訂閱時從頭開始獲取Observable推送的數(shù)據(jù)。
RxJS考慮到這兩種不同的場景,讓Observable支持這兩種不同的需求,Selection 1這樣的Observable就是Hot Observable,而Selection 2這樣的Observable就是Cold Observable。
RxJS Subject是一種特殊類型的Observable,允許將值多播給多個觀察者(每個已訂閱的觀察者從訂閱時間點開始接收當前Observable推送的值,非獨立),而普通的Observable是單播的(每個已訂閱的觀察者是獨立執(zhí)行Observable的)。
對于多個訂閱Subject的觀察者,subscribe不會重新從頭發(fā)送值,他只是將觀察者注冊到觀察者列表中,后續(xù)有新值發(fā)送的時候,將值多播給觀察者列表中的所有觀察者。
RxJS的四種不同類型Subject
| Observable | Subject | BehaviorSubject | AsyncSubject | ReplaySubject |
|---|---|---|---|---|
| 每次從源頭開始將值推送給觀察者 | 將值多播給已訂閱的該Subject的觀察者列表 | 把最后一個值(當前值)發(fā)送給觀察者(需要一個初始值) | 執(zhí)行的最后一個值發(fā)給觀察者 | 可以把之前錯過的值發(fā)給觀察者 |
4.1.1 ? BehaviorSubject
BS有一個“當前值”的概念,它保存了發(fā)送給觀察者的最后一個值(當前值),當有新的觀察者訂閱時,會立即接收到“當前值”;
而如果用Subject,在觀察者訂閱時,之前已發(fā)送的值不會再發(fā)給觀察者包括最近的一個值,后續(xù)再有值發(fā)送的時候,新注冊的觀察者才會接收到新的值。
var?subject?=?new?BehaviorSubject(0);?//?0是初始值
subject.subscribe({
??next:?(v)?=>?console.log('observerA:?'?+?v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
??next:?(v)?=>?console.log('observerB:?'?+?v)
});
subject.next(3);
//?輸出:
observerA:?0?//line3 :A訂閱時立即收到當前值(初始值)0
observerA:?1?//line7?:?BS推送新的值1,訂閱者A接收到值1
observerA:?2?//line8?:?BS推送新的值2,訂閱者A接收到值2
observerB:?2?//line?10?:?B訂閱時立即收到變化后的當前值2
observerA: 3 //line 14:BS推送新的值3,訂閱者A和B一起收到值3
observerB:?3
4.1.2 AsyncSubject
AS只有當Observable執(zhí)行完成時【執(zhí)行complete()】,才會將執(zhí)行的最后一個值發(fā)送給觀察者
var?subject?=?new?Rx.AsyncSubject();
subject.subscribe({
??next:?(v)?=>?console.log('observerA:?'?+?v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
??next:?(v)?=>?console.log('observerB:?'?+?v)
});
subject.next(5);
subject.complete();
//?輸出:
//?line?17?執(zhí)行complete()后兩個訂閱者A和B才會收到最后的一個值(5)
observerA:?5
observerB:?5
4.1.3 ReplaySubject
RS類似BS,它可以發(fā)送舊值給新的觀察者,還可以記錄Observable的執(zhí)行的一部分,將Observable執(zhí)行過程中的多個值回放給新的觀察者。
var?subject?=?new?ReplaySubject(3);?//?為新的訂閱者緩沖3個值
subject.subscribe({
??next:?(v)?=>?console.log('observerA:?'?+?v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
??next:?(v)?=>?console.log('observerB:?'?+?v)
});
subject.next(5);
//?輸出
observerA:?1?//?line?7:?RS推送值1,訂閱者A收到值1
observerA:?2?//?line?8:?RS推送值2,訂閱者A收到值2
observerA:?3?//?Line?9:?RS推送值3,訂閱者A收到值3
observerA:?4?//?line?10:?RS推送值4,訂閱者A收到值4
observerB:?2?//?line?12:?新的訂閱者訂閱RS
observerB:?3?//?訂閱時按順序收到了RS緩沖的三個值
observerB:?4
observerA: 5 // line 16:RS推送值5,觀察者A和B收到值5
observerB:?5
Tips
//RS除了可以指定緩沖數(shù)量,還可以指定時間(單位毫秒)來確定多久之前的值要記錄
var subject = new ReplaySubject(3,500)?記錄3個值,500ms前。
5. Operators (操作符)
操作符是允許復雜的異步代碼以聲明式的方式進行輕松組合的基礎代碼單元。
操作符本質(zhì)就是一個純函數(shù),當操作符被調(diào)用時,不會改變已經(jīng)存在的Observable實例,會基于當前Observable創(chuàng)建一個新的Observable。
一個Observable對象代表的是一個數(shù)據(jù)流,實際場景中,產(chǎn)生Observable對象并不是每次都通過直接調(diào)用Observable構造函數(shù)來創(chuàng)造數(shù)據(jù)流對象。于現(xiàn)實中復雜的問題,并不會創(chuàng)造一個數(shù)據(jù)流之后就直接通過 subscribe接上一個Observer,往往需要對這個數(shù)據(jù)流做一系列處理,然后才交給Observer。就像一個管道,數(shù)據(jù)從管道的一段流入,途徑管道各個環(huán)節(jié),當數(shù)據(jù)到達Observer的時候,已經(jīng)被管道操作過,有的數(shù)據(jù)已經(jīng)被中途過濾拋棄掉了,有的數(shù)據(jù)已經(jīng)被改變了原來的形態(tài),而且最后的數(shù)據(jù)可能來自多個數(shù)據(jù)源,最后Observer只需要處理能夠走到終點的數(shù)據(jù),而這個數(shù)據(jù)管道就是pipe。

而對于每一個操作符,鏈接的就是上游(upstream)和下游 (downstream)?

Marble diagrams (彈珠圖)
為了能夠解釋流是如何變化的,文字通常不足以能夠描述清楚,我們常常使用彈珠圖來對流的時序變化(操作符的運行方式)進行描述。
一個彈珠圖??:

參考資料
https://cn.rx.js.org/manual/overview.html
https://rxjs-cn.github.io/learn-rxjs-operators/
參考資料
創(chuàng)建操作符:?https://rxjs-cn.github.io/learn-rxjs-operators/operators/creation/
?? 謝謝支持
以上便是本次分享的全部內(nèi)容,希望對你有所幫助^_^
喜歡的話別忘了?分享、點贊、收藏?三連哦~。
歡迎關注公眾號?趣談前端?收獲大廠一手好文章~
LowCode可視化低代碼社區(qū)介紹
LowCode低代碼社區(qū)(http://lowcode.dooring.cn)是由在一線互聯(lián)網(wǎng)公司深耕技術多年的技術專家創(chuàng)辦,意在為企業(yè)技術人員提供低代碼可視化相關的技術交流和分享,并且鼓勵國內(nèi)擁有相關業(yè)務的企業(yè)積極推薦自身產(chǎn)品,為國內(nèi)B端技術領域積累知識資產(chǎn)。同時我們還歡迎開源大牛們分享自己的開源項目和技術視頻。
如需入駐請加下方小編微信:?lowcode-dooring
