<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          深入淺出 RxJS 核心原理(源碼實(shí)現(xiàn))

          共 23953字,需瀏覽 48分鐘

           ·

          2021-07-21 09:39

           大廠技術(shù)  堅(jiān)持周更  精選好文

          基礎(chǔ)概念

          • js事件庫,通過observable進(jìn)行異步事件管理
          • 基于觀察者模式的響應(yīng)式編程,生產(chǎn)者主動(dòng)推送多個(gè)數(shù)據(jù)給訂閱的消費(fèi)者處理
          • 使用純函數(shù)保證應(yīng)用狀態(tài)的隔離,保證數(shù)據(jù)純凈性
          • 通過observable、subject、observer的鏈?zhǔn)接嗛?,以及operators提供的數(shù)據(jù)轉(zhuǎn)換,保證數(shù)據(jù)在observable中的流動(dòng)性

          核心模塊和基本原理

          下面通過實(shí)現(xiàn)一個(gè)簡易的rxjs來解析下核心原理

          Observable

          概念:可觀察對象,一個(gè)可調(diào)用的未來值或事件的集合

          基本用法

          // 創(chuàng)建observable

          let observable = new Observable(function publish(observer{

            observer.next("hello");

            var id = setTimeout(() => {

              observer.next("world");

              observer.complete();

            }, 1000);

          });

          // 訂閱observable

          observable.subscribe({

            next(value) => console.log(value),

            error(err) => console.log(err),

            complete() => console.log("done"),

          });

          // 輸出:hello->world->done

          實(shí)現(xiàn)原理

          根據(jù)基本用法,Observable可以執(zhí)行同步或異步任務(wù),并向observer推送數(shù)據(jù),要實(shí)現(xiàn)核心功能,只需要如下兩個(gè)步驟:

          1. 創(chuàng)建:作為發(fā)布者,observable需要設(shè)置一個(gè)可執(zhí)行的publish方法,其入?yún)⑹莖bserver對象,該方法在構(gòu)造實(shí)例的時(shí)候傳入,在執(zhí)行該方法的時(shí)候就可以調(diào)用observer對象的回調(diào)方法進(jìn)行傳值;
          2. 訂閱:publish方法執(zhí)行的時(shí)機(jī)是在observable被subscribe的時(shí)候,因此observable是惰性推送值,且對于每個(gè)觀察者來說是獨(dú)立執(zhí)行的;
          class Observable {

            constructor(publishFn) {

              this.publish = publishFn;

            }

            subscribe(observer) {

              this.publish(observer);

              return observer;

            }

          }

          靜態(tài)構(gòu)造方法實(shí)現(xiàn)

          1. 為了方便創(chuàng)建一些既定publish任務(wù)的Observable實(shí)例,Observable類提供了一些靜態(tài)構(gòu)造方法,常用方法包括of/from/fromEvent/interval等;
          // 每隔200ms推送由0開始遞增的number

          const observable = Observable.interval(200);

          observable.subscribe(value => console.log(value));

          // 輸出:0->1->2->....



          // 監(jiān)聽document的click事件,推送事件回調(diào)的event對象

          const observable = Observable.fromEvent(document"click");

          observable.subscribe(event => console.log(event));

          // 輸出:MouseEvent {isTrusted: true, screenX: 435, screenY: 386, clientX: 435, clientY: 275, …}
          1. 實(shí)現(xiàn)原理:通過調(diào)用構(gòu)造函數(shù)返回一個(gè)既定publish方法的observable實(shí)例;
          • fromEvent返回的observable在被訂閱時(shí),就會調(diào)用target.addEventListener開始事件監(jiān)聽,然后將回調(diào)返回的event對象傳遞給observer
          • interval返回的observable在被訂閱時(shí),就會調(diào)用window.setInterval 開始定時(shí)任務(wù),累加number,并傳遞給observer


          Observable.fromEvent = function (target, eventName{

            return new Observable(function (observer{

              const handler = function (e{

                observer.next(e);

              };

              target.addEventListener(eventName, handler);

              return () => {

                target.removeEventListener(eventName, handler);

              };

            });

          };



          Observable.interval = function (delay{

            return new Observable(function (observer{

              let index = 0;

              const id = window.setInterval(() => {

                observer.next(index++);

              }, delay);

              return () => {

                clearInterval(id);

              };

            });

          };

          Observer

          概念:觀察者, 一個(gè)回調(diào)函數(shù)的集合,它知道如何去監(jiān)聽由 Observable 提供的值

          實(shí)現(xiàn)原理

          1. 作為觀察者,需要包含next/error/complete回調(diào)方法,用于監(jiān)聽成功/失敗/完成返回的值,最簡單的observer就是包含回調(diào)方法的object
          2. 為了維護(hù)observer的訂閱狀態(tài),我們可以封裝一個(gè)observer類,isStopped屬性代表當(dāng)前是否停止訂閱,傳入回調(diào)方法,并對外提供封裝過的回調(diào);
          3. Observer類對外提供unsubscribe方法,用于解除訂閱;調(diào)用該方法后isStopped為true,數(shù)據(jù)推送停止,并執(zhí)行unsubscribe的回調(diào)函數(shù)unsubscribeCb,該回調(diào)函數(shù)由對外方法onUnsubscribe傳入;
          class Observer {

            isStopped = false;

            unsubscribeCb;

            constructor(next, error, complete) {

              this._next = next || noop;

              this._error = error || noop;

              this._complete = complete || noop;

            }

            next(value) {

              if (!this.isStopped) {

                this._next(value);

              }

            }

            error(err) {

              if (!this.isStopped) {

                this._error(err);

                this.unsubscribe();

              }

            }

            complete() {

              if (!this.isStopped) {

                this._complete();

                this.unsubscribe();

              }

            }

            onUnsubscribe(unsubscribeCb) {

              this.unsubscribeCb = unsubscribeCb;

            }

            unsubscribe() {

              this.isStopped = true;

              this.unsubscribeCb && this.unsubscribeCb();

            }

          }

          根據(jù)封裝的Observer類,可以進(jìn)一步優(yōu)化Observable類的實(shí)現(xiàn)

          1. 改造subscribe方法,支持傳入observer對象或回調(diào)函數(shù)
          2. 將publish方法返回的清理函數(shù)傳遞給observer的onUnsubscribe方法
          class Observable {

            constructor(publishFn) {

              this.publish = publishFn;

            }

           subscribe(observerOrNext, error, complete) {

              // 封裝observer

              let observer;

              if (

                observerOrNext instanceof Observer ||

                observerOrNext instanceof Subject

              ) {

                observer = observerOrNext;

              } else if (typeof observerOrNext === "function") {

                observer = new Observer(observerOrNext, error, complete);

              } else {

                observer = new Observer(

                  observerOrNext.next,

                  observerOrNext.error,

                  observerOrNext.complete

                );

              }

              // 傳遞unsubscribe回調(diào)清理函數(shù)

                const unsubscribeCb = this.publish(observer);

                observer.onUnsubscribe(unsubscribeCb);

                return observer;

              }

            }

          }

          // 示例

          let observable = new Observable(function publish(observer{

            var id = setTimeout(() => {

              observer.next("helloworld");

              observer.complete();

            }, 1000);

            return () => {

              console.log("clear");

              clearInterval(id);

            };

          });



          const observer = observable.subscribe(value => console.log(value));

          setTimeout(() => observer.unsubscribe(), 2000);

          // 輸出:helloworld->done->clear

          Subject

          概念:相當(dāng)于 EventEmitter,并且是將值或事件多路推送給多個(gè) Observer 的唯一方式

          上面說到Observable對于每個(gè)觀察者都會執(zhí)行一遍publish方法,訂閱的數(shù)據(jù)是獨(dú)立的,因此它是單播的;subject可以作為observable和observer的中介,通過訂閱observable的數(shù)據(jù)然后分發(fā)給observer實(shí)現(xiàn)多播

          無法復(fù)制加載中的內(nèi)容

          • 下面是observable單播和subject多播的簡單示例
          // 每隔200ms推送從0開始遞增的num,取前6個(gè)推送

          const observable = Observable.interval(200).pipe(take(6));

          const observerA = new Observer((x) => console.log(`A next ${x}`)),

          const observerB = new Observer((x) => console.log(`B next ${x}`)),



          // observable單播模式,500ms后observerB訂閱,重新執(zhí)行一遍publish

           observable.subscribe(observerA);

           setTimeout(() => {

             observable.subscribe(observerB);

           }, 500);

          // 輸出:A next 0 -> 1 -> 2 -> 3 -> 4 -> 5

          //      B next 0 -> 1 -> 2 -> 3 -> 4 -> 5





          // subject 多播模式,500ms后observerB開始接收subject分發(fā)的數(shù)據(jù),錯(cuò)過了前2個(gè)數(shù)據(jù)

          const subject = new Subject();

          observable.subscribe(subject);

          subject.subscribe(observerA);

          setTimeout(() => {

            subject.subscribe(observerB);

          }, 500);

          // 輸出:A next 0 -> 1 -> 2 -> 3 -> 4 -> 5

          //      B next 2 -> 3 -> 4 -> 5

          實(shí)現(xiàn)原理

          1. Subject繼承自O(shè)bservable,同時(shí)又實(shí)現(xiàn)了Observer的回調(diào)方法(next/complete/error)
          2. Subject類維護(hù)一個(gè)subscribers數(shù)組,當(dāng)Subject被observer訂閱時(shí),會執(zhí)行publish方法將observer push到subscribers數(shù)組中;
          3. Subject訂閱Observable后,Observable向Subject推送數(shù)據(jù),Subject再分發(fā)給數(shù)組中每個(gè)observer
          class Subject extends Observable {

            subscribers = [];

            isStopped = false;

            publish(observer) {

              if (this.isStopped) {

                observer.complete();

              }

              // 添加訂閱item

              this.subscribers.push(observer);

            }

            next(value) {

              if (this.isStopped) return;

              // 分發(fā)數(shù)據(jù)

              this.subscribers.forEach((observer) => {

                observer.next(value);

              });

            }

            error(error) {

              this.subscribers.forEach((observer) => {

                observer.error(error);

              });

              this.isStopped = true;

              this.subscribers = [];

            }

            complete() {

              this.subscribers.forEach((observer) => {

                observer.complete();

              });

              this.isStopped = true;

              this.subscribers = [];

            }

          }

          BehaviorSubject

          繼承Subject,維護(hù)當(dāng)前最新值lastValue,observer訂閱時(shí)立即傳遞最新值,防止訂閱過晚引起的狀態(tài)丟失;

          • 示例
           // 示例

          const observable = Observable.interval(200).pipe(take(6));

          const observerA = new Observer((x) => console.log(`A next ${x}`)),

          const observerB = new Observer((x) => console.log(`B next ${x}`)),

          // 500ms后observerB開始接收subject分發(fā)的數(shù)據(jù),能獲取到最新數(shù)據(jù)1

          const subject = new BehaviorSubject();

          observable.subscribe(subject);

          subject.subscribe(observerA);

          setTimeout(() => {

            subject.subscribe(observerB); 

          }, 500);

          // 輸出:A next 0 -> 1 -> 2 -> 3 -> 4 -> 5

          //      B next 1 -> 2 -> 3 -> 4 -> 5
          • 原理
          class BehaviorSubject extends Subject {

            lastValue;

            constructor(value) {

              super();

              this.lastValue = value;

            }

            publish(observer) {

              if (!observer.isStopped) {

                // 被訂閱時(shí)立即推送最新值

                observer.next(this.lastValue);

              }

              super.publish(observer);

            }

            next(value) {

              this.lastValue = value;

              super.next(value);

            }

          }

          ReplaySubject

          和BehaviorSubject類似,根據(jù)bufferSize和windowSize,緩存某個(gè)時(shí)間段內(nèi)多個(gè)最新值;若windowSize缺省,則最多緩存bufferSize個(gè)最近值;若windowSize存在,則緩存最近的windowSize時(shí)間窗口內(nèi)的不超過bufferSize個(gè)值;

          • 示例
          const observable = Observable.interval(200).pipe(take(6));

          const observerA = new Observer((x) => console.log(`A next ${x}`)),

          const observerB = new Observer((x) => console.log(`B next ${x}`)),

          // 500ms后observerB開始接收subject分發(fā)的數(shù)據(jù),能獲取到最新的3個(gè)緩存值

          const subject = new ReplaySubject(3);

          observable.subscribe(subject);

          subject.subscribe(observerA);

          setTimeout(() => {

            subject.subscribe(observerB); 

          }, 500);

          // 輸出:A next 0 -> 1 -> 2 -> 3 -> 4 -> 5

          //      B next 0 -> 1 -> 2 -> 3 -> 4 -> 5



          // 500ms后observerB開始接收subject分發(fā)的數(shù)據(jù),能獲取到最新的200ms內(nèi)的緩存值

          const subject = new ReplaySubject(100200);

          observable.subscribe(subject);

          subject.subscribe(observerA);

          setTimeout(() => {

            subject.subscribe(observerB); 

          }, 500);

          // 輸出:A next 0 -> 1 -> 2 -> 3 -> 4 -> 5

          //      B next 1 -> 2 -> 3 -> 4 -> 5
          • 原理
          class ReplaySubject extends Subject {

            bufferSize = 1;

            windowSize;

            events = []; // 緩存數(shù)組,格式為[[time, value], ....]

            constructor(bufferSize, windowSize) {

              super();

              this.bufferSize = Math.max(1, bufferSize);

              this.windowSize = windowSize || 0;

            }

            // 計(jì)算緩存數(shù)組

            getEvents() {

              let spliceIndex = 0;

              let len = this.events.length;

              if (this.windowSize > 0) {

                let beginTime = Date.now() - this.windowSize;

                while (spliceIndex < len && this.events[spliceIndex][0] <= beginTime) {

                  spliceIndex++;

                }

              }

              spliceIndex = Math.max(spliceIndex, len - this.bufferSize);

              spliceIndex > 0 && this.events.splice(0, spliceIndex);

            }

            publish(observer) {

              this.getEvents();

              // 被訂閱后立即推送當(dāng)前所有緩存值

              this.events.forEach((event) => {

                !observer.isStopped && observer.next(event[1]);

              });

              super.publish(observer);

            }

            next(value) {

              // 緩存推送值和時(shí)間戳

              this.events.push([Date.now(), value]);

              // 更新緩存數(shù)組

              this.getEvents();

              super.next(value);

            }

          }

          AsyncSubject

          只有在事件完成時(shí),才會廣播最終的值

          • 示例
          const observable = Observable.interval(200).pipe(take(6));

          const observerA = new Observer((x) => console.log(`A next ${x}`)),

          const observerB = new Observer((x) => console.log(`B next ${x}`)),

          // observerA和observerB 接收最終數(shù)據(jù)5

          const subject = new AsyncSubject();

          observable.subscribe(subject);

          subject.subscribe(observerA);

          setTimeout(() => {

            subject.subscribe(observerB); 

          }, 500);

          // 輸出:A next 5

          //      B next 5
          • 原理
          class AsyncSubject extends Subject {

            hasNext = false;

            hasComplete = false;

            value;

            publish(observer) {

              if (this.hasComplete && this.hasNext) {

                observer.next(this.value);

              }

              super.publish(observer);

            }

            next(value) {

              // 還未結(jié)束就不推送,僅保存值

              if (!this.hasComplete) {

                this.value = value;

                this.hasNext = true;

              }

            }

            error(err) {

              if (!this.hasComplete) {

                super.error(err);

              }

            }

            complete() {

              this.hasComplete = true;

              if (this.hasNext) {

                 // 任務(wù)完成則推送最終值

                super.next(this.value);

              }

              super.complete();

            }

          }

          比較

          根據(jù)以上Observable單播和Subject多播,以及Subject子類BehaviorSubject 、AsyncSubject、ReplaySubject的多播示例,可以對比ObserverA和ObserverB接收的數(shù)據(jù)流;

          • 對于ObserverA,因?yàn)樗峭接嗛?,因此除了AsyncSubject僅接收最后一個(gè)值之外,其他方式訂閱的數(shù)據(jù)流表現(xiàn)一致,即在200ms接收到第一個(gè)數(shù)據(jù)0,每隔200ms以此類推;
          • 對于ObserverB,因?yàn)樗钱惒接嗛啠?00ms后開始訂閱),對于不同的訂閱方式數(shù)據(jù)的流動(dòng)也表現(xiàn)不同,具體的表現(xiàn)可結(jié)合上面的示例和以下數(shù)據(jù)流動(dòng)圖進(jìn)行對比;

          數(shù)據(jù)流動(dòng)圖如下

          無法復(fù)制加載中的內(nèi)容

          Operator

          采用函數(shù)式編程風(fēng)格的純函數(shù) (pure function),使用像 map、filter、concat、flatMap 等這樣的操作符來處理集合

          基本用法

          使用pipe方法,傳入operator函數(shù),可以對原始推送值進(jìn)行一定的轉(zhuǎn)換、攔截等處理;如下示例中,take operator實(shí)現(xiàn)獲取前幾個(gè)原始值的功能,map operator實(shí)現(xiàn)對原始值進(jìn)行轉(zhuǎn)換映射的功能;

          const observable = Observable.interval(200).pipe(

            take(6),

            map((item) => item * 2)

          );



          observable.subscribe(value => console.log(value));

          // 輸出:0 -> 2 -> 4 -> 6 -> 8 -> 10

          實(shí)現(xiàn)原理

          1. 以map為例,map((item) => item * 2) 返回的是一個(gè)帶source入?yún)⒌膐peration function,operation function將調(diào)用source.lift 返回一個(gè)新的source指向原observable,帶operator的observable實(shí)例;
          2. 通過Observable 的pipe方法傳入operation function,pipe方法使用reduce完成多個(gè)operation function的鏈?zhǔn)秸{(diào)用,初始source值是當(dāng)前Observable,最終pipe返回的一個(gè)新的Observable實(shí)例;
          3. 當(dāng)創(chuàng)建的帶operator的Observable實(shí)例被subscribe時(shí),會調(diào)用operator.call 方法;
          4. operator.call方法中會對observer的next回調(diào)進(jìn)行封裝,返回新的轉(zhuǎn)換值給原observer,最后將新封裝的observer傳遞給source.subscribe;
          class Observable {

            source;

            operator;

            ....

            subscribe(observerOrNext, error, complete) {

            ....

              if (this.operator) {

                return this.operator.call(observer, this.source);

              }

              ....

            }

            lift(operator) {

              const observable = new Observable();

              observable.source = this;

              observable.operator = operator;

              return observable;

            }

            pipe(...args) {

              const operations = args.slice(0);

              if (operations.length === 0) {

                return this;

              } else if (operations.length === 1) {

                return operations[0](this);

              } else {

                return operations.reduce((source, func) => func(source), this);

              }

            }

          }



          // map operator

          function map(mapFn{

            return function mapOperation(source{

             // 返回帶operator的新的observable實(shí)例

              return source.lift(new mapOperator(mapFn, thisArg));

            };

          }

          // operator類

          class mapOperator {

            constructor(mapFn) {

              this.mapFn = mapFn;

            }

            // call 方法最終調(diào)用的是source observable的 subscribe方法

            // 對傳入的observer進(jìn)行一層封裝

            call(observer, source) {

              return source.subscribe(

                new mapObserver(observer, this.mapFn);

              );

            }

          }

          // 對原始o(jì)bserver進(jìn)行數(shù)據(jù)攔截處理

          class mapObserver extends Observer {

            constructor(destination, mapFn, thisArg) {

              super();

              this.destination = destination;

              this.mapFn = mapFn;

            }



            next(value) {

              const result = this.mapFn(value);

              this.destination.next(result);

            }

            complete() {

              this.destination.complete();

            }

          }

          同理可實(shí)現(xiàn)filter、take、scan等常用operator;

          takeUtil的實(shí)現(xiàn)稍有不同,需要傳入一個(gè)notifyObservable,當(dāng)notifyObservable首次發(fā)出值或complete的時(shí),提示當(dāng)前訂閱結(jié)束

          • 示例
          // takeUntil示例,當(dāng)點(diǎn)擊了document后,停止每秒數(shù)據(jù)推送

          const notifier = Observable.fromEvent(document"click");

          const observable = Observable.interval(1000).pipe(takeUntil(notifier));
          • 原理

          新增notifierObserver類,訂閱notifyObservable,當(dāng)notifyObservable數(shù)據(jù)到達(dá)時(shí),notifierObserver就會通知outerObserver(原observer),這樣原來的observer就可以知道notifyObservable的狀態(tài);

          function takeUntil(notifier{

            return function takeUntilOperation(source{

              return source.lift(new takeUntilOperator(notifier));

            };

          }



          class takeUntilOperator {

            constructor(notifier) {

              this.notifier = notifier;

            }



          // notifierObserver訂閱notifyObservable

          //當(dāng)notifyObservable推送第一個(gè)值時(shí),notifierObserver將調(diào)用outerObserver.notifyNext

            call(observer, source) {

              const outerObserver = new takeUntilObserver(observer, this.notifier);

              const notifierObserver = new NotifierObserver(outerObserver);

              this.notifier.subscribe(notifierObserver);

              if (!outerObserver.seenValue) {

                return source.subscribe(outerObserver);

              }

            }

          }

          class NotifierObserver extends Observer {

            constructor(outerObserver) {

              super();

              this.outerObserver = outerObserver;

            }

            // 接受到值就通知outerObserver

            next(value) {

              this.outerObserver.notifyNext(value);

            }

            error(err) {

              this.outerObserver.notifyError(err);

              this.unsubscribe();

            }

            complete() {

              this.outerObserver.notifyComplete();

              this.unsubscribe();

            }

          }

          class takeUntilObserver extends Observer {

            constructor(destination) {

              super();

              this.destination = destination;

              this.seenValue = false;

            }

            // 接收到notifyNext的值或notifyComplete時(shí)就完成訂閱

            notifyNext(value) {

              this.seenValue = true;

              this.destination.complete();

            }

            notifyComplete() {

              this.seenValue = true;

              this.destination.complete();

            }

            next(value) {

              if (!this.seenValue) {

                this.destination.next(value);

              }

            }

          }

          參考

          RxJS中文文檔

          ?? 謝謝支持

          以上便是本次分享的全部內(nèi)容,希望對你有所幫助^_^

          喜歡的話別忘了 分享、點(diǎn)贊、收藏 三連哦~。

          歡迎關(guān)注公眾號 前端Sharing 持續(xù)分享高質(zhì)量文章~


          瀏覽 123
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  91视频做爱 | 四虎成人在线网址 | 91人妻人人澡人人爽 | 狠狠色综合网站 | 日本人妻与黑人XXX |