深入淺出 RxJS 核心原理(源碼實(shí)現(xiàn))
大廠技術(shù) 堅(jiān)持周更 精選好文
基礎(chǔ)概念
js事件庫,通過observable進(jìn)行異步事件管理 基于觀察者模式的響應(yīng)式編程,生產(chǎn)者主動(dòng)推送多個(gè)數(shù)據(jù)給訂閱的消費(fèi)者處理 使用純函數(shù)保證應(yīng)用狀態(tài)的隔離,保證數(shù)據(jù)純凈性 通過observable、subject、observer的鏈?zhǔn)接嗛?,以及operators提供的數(shù)據(jù)轉(zhuǎn)換,保證數(shù)據(jù)在observable中的流動(dòng)性
核心模塊和基本原理
下面通過實(shí)現(xiàn)一個(gè)簡易的rxjs來解析下核心原理
Observable
概念:可觀察對象,一個(gè)可調(diào)用的未來值或事件的集合
基本用法
// 創(chuàng)建observable
let observable = new Observable(function publish(observer) {
observer.next("hello");
var id = setTimeout(() => {
observer.next("world");
observer.complete();
}, 1000);
});
// 訂閱observable
observable.subscribe({
next: (value) => console.log(value),
error: (err) => console.log(err),
complete: () => console.log("done"),
});
// 輸出:hello->world->done
實(shí)現(xiàn)原理
根據(jù)基本用法,Observable可以執(zhí)行同步或異步任務(wù),并向observer推送數(shù)據(jù),要實(shí)現(xiàn)核心功能,只需要如下兩個(gè)步驟:
創(chuàng)建:作為發(fā)布者,observable需要設(shè)置一個(gè)可執(zhí)行的publish方法,其入?yún)⑹莖bserver對象,該方法在構(gòu)造實(shí)例的時(shí)候傳入,在執(zhí)行該方法的時(shí)候就可以調(diào)用observer對象的回調(diào)方法進(jìn)行傳值; 訂閱:publish方法執(zhí)行的時(shí)機(jī)是在observable被subscribe的時(shí)候,因此observable是惰性推送值,且對于每個(gè)觀察者來說是獨(dú)立執(zhí)行的;
class Observable {
constructor(publishFn) {
this.publish = publishFn;
}
subscribe(observer) {
this.publish(observer);
return observer;
}
}
靜態(tài)構(gòu)造方法實(shí)現(xiàn)
為了方便創(chuàng)建一些既定publish任務(wù)的Observable實(shí)例,Observable類提供了一些靜態(tài)構(gòu)造方法,常用方法包括of/from/fromEvent/interval等;
// 每隔200ms推送由0開始遞增的number
const observable = Observable.interval(200);
observable.subscribe(value => console.log(value));
// 輸出:0->1->2->....
// 監(jiān)聽document的click事件,推送事件回調(diào)的event對象
const observable = Observable.fromEvent(document, "click");
observable.subscribe(event => console.log(event));
// 輸出:MouseEvent {isTrusted: true, screenX: 435, screenY: 386, clientX: 435, clientY: 275, …}
實(shí)現(xiàn)原理:通過調(diào)用構(gòu)造函數(shù)返回一個(gè)既定publish方法的observable實(shí)例;
fromEvent返回的observable在被訂閱時(shí),就會調(diào)用 target.addEventListener開始事件監(jiān)聽,然后將回調(diào)返回的event對象傳遞給observerinterval返回的observable在被訂閱時(shí),就會調(diào)用 window.setInterval開始定時(shí)任務(wù),累加number,并傳遞給observer
Observable.fromEvent = function (target, eventName) {
return new Observable(function (observer) {
const handler = function (e) {
observer.next(e);
};
target.addEventListener(eventName, handler);
return () => {
target.removeEventListener(eventName, handler);
};
});
};
Observable.interval = function (delay) {
return new Observable(function (observer) {
let index = 0;
const id = window.setInterval(() => {
observer.next(index++);
}, delay);
return () => {
clearInterval(id);
};
});
};
Observer
概念:觀察者, 一個(gè)回調(diào)函數(shù)的集合,它知道如何去監(jiān)聽由 Observable 提供的值
實(shí)現(xiàn)原理
作為觀察者,需要包含next/error/complete回調(diào)方法,用于監(jiān)聽成功/失敗/完成返回的值,最簡單的observer就是包含回調(diào)方法的object 為了維護(hù)observer的訂閱狀態(tài),我們可以封裝一個(gè)observer類,isStopped屬性代表當(dāng)前是否停止訂閱,傳入回調(diào)方法,并對外提供封裝過的回調(diào); Observer類對外提供unsubscribe方法,用于解除訂閱;調(diào)用該方法后isStopped為true,數(shù)據(jù)推送停止,并執(zhí)行unsubscribe的回調(diào)函數(shù) unsubscribeCb,該回調(diào)函數(shù)由對外方法onUnsubscribe傳入;
class Observer {
isStopped = false;
unsubscribeCb;
constructor(next, error, complete) {
this._next = next || noop;
this._error = error || noop;
this._complete = complete || noop;
}
next(value) {
if (!this.isStopped) {
this._next(value);
}
}
error(err) {
if (!this.isStopped) {
this._error(err);
this.unsubscribe();
}
}
complete() {
if (!this.isStopped) {
this._complete();
this.unsubscribe();
}
}
onUnsubscribe(unsubscribeCb) {
this.unsubscribeCb = unsubscribeCb;
}
unsubscribe() {
this.isStopped = true;
this.unsubscribeCb && this.unsubscribeCb();
}
}
根據(jù)封裝的Observer類,可以進(jìn)一步優(yōu)化Observable類的實(shí)現(xiàn)
改造subscribe方法,支持傳入observer對象或回調(diào)函數(shù) 將publish方法返回的清理函數(shù)傳遞給observer的 onUnsubscribe方法
class Observable {
constructor(publishFn) {
this.publish = publishFn;
}
subscribe(observerOrNext, error, complete) {
// 封裝observer
let observer;
if (
observerOrNext instanceof Observer ||
observerOrNext instanceof Subject
) {
observer = observerOrNext;
} else if (typeof observerOrNext === "function") {
observer = new Observer(observerOrNext, error, complete);
} else {
observer = new Observer(
observerOrNext.next,
observerOrNext.error,
observerOrNext.complete
);
}
// 傳遞unsubscribe回調(diào)清理函數(shù)
const unsubscribeCb = this.publish(observer);
observer.onUnsubscribe(unsubscribeCb);
return observer;
}
}
}
// 示例
let observable = new Observable(function publish(observer) {
var id = setTimeout(() => {
observer.next("helloworld");
observer.complete();
}, 1000);
return () => {
console.log("clear");
clearInterval(id);
};
});
const observer = observable.subscribe(value => console.log(value));
setTimeout(() => observer.unsubscribe(), 2000);
// 輸出:helloworld->done->clear
Subject
概念:相當(dāng)于 EventEmitter,并且是將值或事件多路推送給多個(gè) Observer 的唯一方式
上面說到Observable對于每個(gè)觀察者都會執(zhí)行一遍publish方法,訂閱的數(shù)據(jù)是獨(dú)立的,因此它是單播的;subject可以作為observable和observer的中介,通過訂閱observable的數(shù)據(jù)然后分發(fā)給observer實(shí)現(xiàn)多播
無法復(fù)制加載中的內(nèi)容
下面是observable單播和subject多播的簡單示例
// 每隔200ms推送從0開始遞增的num,取前6個(gè)推送
const observable = Observable.interval(200).pipe(take(6));
const observerA = new Observer((x) => console.log(`A next ${x}`)),
const observerB = new Observer((x) => console.log(`B next ${x}`)),
// observable單播模式,500ms后observerB訂閱,重新執(zhí)行一遍publish
observable.subscribe(observerA);
setTimeout(() => {
observable.subscribe(observerB);
}, 500);
// 輸出:A next 0 -> 1 -> 2 -> 3 -> 4 -> 5
// B next 0 -> 1 -> 2 -> 3 -> 4 -> 5
// subject 多播模式,500ms后observerB開始接收subject分發(fā)的數(shù)據(jù),錯(cuò)過了前2個(gè)數(shù)據(jù)
const subject = new Subject();
observable.subscribe(subject);
subject.subscribe(observerA);
setTimeout(() => {
subject.subscribe(observerB);
}, 500);
// 輸出:A next 0 -> 1 -> 2 -> 3 -> 4 -> 5
// B next 2 -> 3 -> 4 -> 5
實(shí)現(xiàn)原理
Subject繼承自O(shè)bservable,同時(shí)又實(shí)現(xiàn)了Observer的回調(diào)方法(next/complete/error) Subject類維護(hù)一個(gè)subscribers數(shù)組,當(dāng)Subject被observer訂閱時(shí),會執(zhí)行publish方法將observer push到subscribers數(shù)組中; Subject訂閱Observable后,Observable向Subject推送數(shù)據(jù),Subject再分發(fā)給數(shù)組中每個(gè)observer
class Subject extends Observable {
subscribers = [];
isStopped = false;
publish(observer) {
if (this.isStopped) {
observer.complete();
}
// 添加訂閱item
this.subscribers.push(observer);
}
next(value) {
if (this.isStopped) return;
// 分發(fā)數(shù)據(jù)
this.subscribers.forEach((observer) => {
observer.next(value);
});
}
error(error) {
this.subscribers.forEach((observer) => {
observer.error(error);
});
this.isStopped = true;
this.subscribers = [];
}
complete() {
this.subscribers.forEach((observer) => {
observer.complete();
});
this.isStopped = true;
this.subscribers = [];
}
}
BehaviorSubject
繼承Subject,維護(hù)當(dāng)前最新值lastValue,observer訂閱時(shí)立即傳遞最新值,防止訂閱過晚引起的狀態(tài)丟失;
示例
// 示例
const observable = Observable.interval(200).pipe(take(6));
const observerA = new Observer((x) => console.log(`A next ${x}`)),
const observerB = new Observer((x) => console.log(`B next ${x}`)),
// 500ms后observerB開始接收subject分發(fā)的數(shù)據(jù),能獲取到最新數(shù)據(jù)1
const subject = new BehaviorSubject();
observable.subscribe(subject);
subject.subscribe(observerA);
setTimeout(() => {
subject.subscribe(observerB);
}, 500);
// 輸出:A next 0 -> 1 -> 2 -> 3 -> 4 -> 5
// B next 1 -> 2 -> 3 -> 4 -> 5
原理
class BehaviorSubject extends Subject {
lastValue;
constructor(value) {
super();
this.lastValue = value;
}
publish(observer) {
if (!observer.isStopped) {
// 被訂閱時(shí)立即推送最新值
observer.next(this.lastValue);
}
super.publish(observer);
}
next(value) {
this.lastValue = value;
super.next(value);
}
}
ReplaySubject
和BehaviorSubject類似,根據(jù)bufferSize和windowSize,緩存某個(gè)時(shí)間段內(nèi)多個(gè)最新值;若windowSize缺省,則最多緩存bufferSize個(gè)最近值;若windowSize存在,則緩存最近的windowSize時(shí)間窗口內(nèi)的不超過bufferSize個(gè)值;
示例
const observable = Observable.interval(200).pipe(take(6));
const observerA = new Observer((x) => console.log(`A next ${x}`)),
const observerB = new Observer((x) => console.log(`B next ${x}`)),
// 500ms后observerB開始接收subject分發(fā)的數(shù)據(jù),能獲取到最新的3個(gè)緩存值
const subject = new ReplaySubject(3);
observable.subscribe(subject);
subject.subscribe(observerA);
setTimeout(() => {
subject.subscribe(observerB);
}, 500);
// 輸出:A next 0 -> 1 -> 2 -> 3 -> 4 -> 5
// B next 0 -> 1 -> 2 -> 3 -> 4 -> 5
// 500ms后observerB開始接收subject分發(fā)的數(shù)據(jù),能獲取到最新的200ms內(nèi)的緩存值
const subject = new ReplaySubject(100, 200);
observable.subscribe(subject);
subject.subscribe(observerA);
setTimeout(() => {
subject.subscribe(observerB);
}, 500);
// 輸出:A next 0 -> 1 -> 2 -> 3 -> 4 -> 5
// B next 1 -> 2 -> 3 -> 4 -> 5
原理
class ReplaySubject extends Subject {
bufferSize = 1;
windowSize;
events = []; // 緩存數(shù)組,格式為[[time, value], ....]
constructor(bufferSize, windowSize) {
super();
this.bufferSize = Math.max(1, bufferSize);
this.windowSize = windowSize || 0;
}
// 計(jì)算緩存數(shù)組
getEvents() {
let spliceIndex = 0;
let len = this.events.length;
if (this.windowSize > 0) {
let beginTime = Date.now() - this.windowSize;
while (spliceIndex < len && this.events[spliceIndex][0] <= beginTime) {
spliceIndex++;
}
}
spliceIndex = Math.max(spliceIndex, len - this.bufferSize);
spliceIndex > 0 && this.events.splice(0, spliceIndex);
}
publish(observer) {
this.getEvents();
// 被訂閱后立即推送當(dāng)前所有緩存值
this.events.forEach((event) => {
!observer.isStopped && observer.next(event[1]);
});
super.publish(observer);
}
next(value) {
// 緩存推送值和時(shí)間戳
this.events.push([Date.now(), value]);
// 更新緩存數(shù)組
this.getEvents();
super.next(value);
}
}
AsyncSubject
只有在事件完成時(shí),才會廣播最終的值
示例
const observable = Observable.interval(200).pipe(take(6));
const observerA = new Observer((x) => console.log(`A next ${x}`)),
const observerB = new Observer((x) => console.log(`B next ${x}`)),
// observerA和observerB 接收最終數(shù)據(jù)5
const subject = new AsyncSubject();
observable.subscribe(subject);
subject.subscribe(observerA);
setTimeout(() => {
subject.subscribe(observerB);
}, 500);
// 輸出:A next 5
// B next 5
原理
class AsyncSubject extends Subject {
hasNext = false;
hasComplete = false;
value;
publish(observer) {
if (this.hasComplete && this.hasNext) {
observer.next(this.value);
}
super.publish(observer);
}
next(value) {
// 還未結(jié)束就不推送,僅保存值
if (!this.hasComplete) {
this.value = value;
this.hasNext = true;
}
}
error(err) {
if (!this.hasComplete) {
super.error(err);
}
}
complete() {
this.hasComplete = true;
if (this.hasNext) {
// 任務(wù)完成則推送最終值
super.next(this.value);
}
super.complete();
}
}
比較
根據(jù)以上Observable單播和Subject多播,以及Subject子類BehaviorSubject 、AsyncSubject、ReplaySubject的多播示例,可以對比ObserverA和ObserverB接收的數(shù)據(jù)流;
對于ObserverA,因?yàn)樗峭接嗛?,因此除了AsyncSubject僅接收最后一個(gè)值之外,其他方式訂閱的數(shù)據(jù)流表現(xiàn)一致,即在200ms接收到第一個(gè)數(shù)據(jù)0,每隔200ms以此類推; 對于ObserverB,因?yàn)樗钱惒接嗛啠?00ms后開始訂閱),對于不同的訂閱方式數(shù)據(jù)的流動(dòng)也表現(xiàn)不同,具體的表現(xiàn)可結(jié)合上面的示例和以下數(shù)據(jù)流動(dòng)圖進(jìn)行對比;
數(shù)據(jù)流動(dòng)圖如下
無法復(fù)制加載中的內(nèi)容
Operator
采用函數(shù)式編程風(fēng)格的純函數(shù) (pure function),使用像
map、filter、concat、flatMap等這樣的操作符來處理集合
基本用法
使用pipe方法,傳入operator函數(shù),可以對原始推送值進(jìn)行一定的轉(zhuǎn)換、攔截等處理;如下示例中,take operator實(shí)現(xiàn)獲取前幾個(gè)原始值的功能,map operator實(shí)現(xiàn)對原始值進(jìn)行轉(zhuǎn)換映射的功能;
const observable = Observable.interval(200).pipe(
take(6),
map((item) => item * 2)
);
observable.subscribe(value => console.log(value));
// 輸出:0 -> 2 -> 4 -> 6 -> 8 -> 10
實(shí)現(xiàn)原理
以map為例, map((item) => item * 2)返回的是一個(gè)帶source入?yún)⒌膐peration function,operation function將調(diào)用source.lift 返回一個(gè)新的source指向原observable,帶operator的observable實(shí)例;通過Observable 的pipe方法傳入operation function,pipe方法使用reduce完成多個(gè)operation function的鏈?zhǔn)秸{(diào)用,初始source值是當(dāng)前Observable,最終pipe返回的一個(gè)新的Observable實(shí)例; 當(dāng)創(chuàng)建的帶operator的Observable實(shí)例被subscribe時(shí),會調(diào)用operator.call 方法; operator.call方法中會對observer的next回調(diào)進(jìn)行封裝,返回新的轉(zhuǎn)換值給原observer,最后將新封裝的observer傳遞給source.subscribe;
class Observable {
source;
operator;
....
subscribe(observerOrNext, error, complete) {
....
if (this.operator) {
return this.operator.call(observer, this.source);
}
....
}
lift(operator) {
const observable = new Observable();
observable.source = this;
observable.operator = operator;
return observable;
}
pipe(...args) {
const operations = args.slice(0);
if (operations.length === 0) {
return this;
} else if (operations.length === 1) {
return operations[0](this);
} else {
return operations.reduce((source, func) => func(source), this);
}
}
}
// map operator
function map(mapFn) {
return function mapOperation(source) {
// 返回帶operator的新的observable實(shí)例
return source.lift(new mapOperator(mapFn, thisArg));
};
}
// operator類
class mapOperator {
constructor(mapFn) {
this.mapFn = mapFn;
}
// call 方法最終調(diào)用的是source observable的 subscribe方法
// 對傳入的observer進(jìn)行一層封裝
call(observer, source) {
return source.subscribe(
new mapObserver(observer, this.mapFn);
);
}
}
// 對原始o(jì)bserver進(jìn)行數(shù)據(jù)攔截處理
class mapObserver extends Observer {
constructor(destination, mapFn, thisArg) {
super();
this.destination = destination;
this.mapFn = mapFn;
}
next(value) {
const result = this.mapFn(value);
this.destination.next(result);
}
complete() {
this.destination.complete();
}
}
同理可實(shí)現(xiàn)filter、take、scan等常用operator;
takeUtil的實(shí)現(xiàn)稍有不同,需要傳入一個(gè)notifyObservable,當(dāng)notifyObservable首次發(fā)出值或complete的時(shí),提示當(dāng)前訂閱結(jié)束
示例
// takeUntil示例,當(dāng)點(diǎn)擊了document后,停止每秒數(shù)據(jù)推送
const notifier = Observable.fromEvent(document, "click");
const observable = Observable.interval(1000).pipe(takeUntil(notifier));
原理
新增notifierObserver類,訂閱notifyObservable,當(dāng)notifyObservable數(shù)據(jù)到達(dá)時(shí),notifierObserver就會通知outerObserver(原observer),這樣原來的observer就可以知道notifyObservable的狀態(tài);
function takeUntil(notifier) {
return function takeUntilOperation(source) {
return source.lift(new takeUntilOperator(notifier));
};
}
class takeUntilOperator {
constructor(notifier) {
this.notifier = notifier;
}
// notifierObserver訂閱notifyObservable
//當(dāng)notifyObservable推送第一個(gè)值時(shí),notifierObserver將調(diào)用outerObserver.notifyNext
call(observer, source) {
const outerObserver = new takeUntilObserver(observer, this.notifier);
const notifierObserver = new NotifierObserver(outerObserver);
this.notifier.subscribe(notifierObserver);
if (!outerObserver.seenValue) {
return source.subscribe(outerObserver);
}
}
}
class NotifierObserver extends Observer {
constructor(outerObserver) {
super();
this.outerObserver = outerObserver;
}
// 接受到值就通知outerObserver
next(value) {
this.outerObserver.notifyNext(value);
}
error(err) {
this.outerObserver.notifyError(err);
this.unsubscribe();
}
complete() {
this.outerObserver.notifyComplete();
this.unsubscribe();
}
}
class takeUntilObserver extends Observer {
constructor(destination) {
super();
this.destination = destination;
this.seenValue = false;
}
// 接收到notifyNext的值或notifyComplete時(shí)就完成訂閱
notifyNext(value) {
this.seenValue = true;
this.destination.complete();
}
notifyComplete() {
this.seenValue = true;
this.destination.complete();
}
next(value) {
if (!this.seenValue) {
this.destination.next(value);
}
}
}
參考
RxJS中文文檔
?? 謝謝支持
以上便是本次分享的全部內(nèi)容,希望對你有所幫助^_^
喜歡的話別忘了 分享、點(diǎn)贊、收藏 三連哦~。
歡迎關(guān)注公眾號 前端Sharing 持續(xù)分享高質(zhì)量文章~
