SpringCloudRPC核心原理:RxJava響應式編程框架Scheduler調度器
RxJava的Scheduler調度器
顧名思義,Scheduler是一種用來對RxJava流操作進行調度的類,從Scheduler的工廠方法可以獲取現(xiàn)有調度器的實現(xiàn),如下:
(1)Schedulers.io():用于獲取內部的ioScheduler調度器實例。
(2)Schedulers.newThread():用于獲取內部的newThreadScheduler調度器實例,該調度器為RxJava流操作創(chuàng)建一個新線程。
(3)Schedulers.computation():用于獲取內部的computationScheduler調度器實例。
(4)Schedulers.trampoline():使用當前線程立即執(zhí)行RxJava流操作。
(5)Schedulers.single():使用RxJava內置的單例線程執(zhí)行RxJava流操作。
關于以上5個獲取調度器的方法具體介紹如下:
(1)Schedulers.io():獲取內部的ioScheduler調度器實例主要用于IO密集型的流操作,例如讀寫SD卡文件、查詢數(shù)據(jù)庫、訪問網(wǎng)絡等。此調度器具有線程緩存機制,在接收到任務后,先檢查線程緩存池中是否有空閑的線程,如果有就復用,如果沒有就創(chuàng)建新的線程,并加入IO專用線程池中,如果專用線程池每次都沒有空閑線程可用,就可以無上限地創(chuàng)建新線程。
(2)Schedulers.newThread():每執(zhí)行一個RxJava流操作創(chuàng)建一個新的線程,不具有線程緩存機制,因為創(chuàng)建一個新的線程比復用一個線程更耗時耗力,Schedulers.newThread()的效率沒有Schedulers.io()的效率高。
(3)Schedulers.computation():獲取內部的具有固定線程池的內部computationScheduler調度器實例,用于執(zhí)行CPU密集型的流操作,線程數(shù)大小為CPU的核數(shù)。不可以用于I/O操作,例如不能用于XML/JSON文件的解析、Bitmap圖片的壓縮取樣等,因為I/O操作會浪費CPU時間。
(4)Schedulers.trampoline():如果要在當前線程執(zhí)行流操作,而當前線程有任務在執(zhí)行,就會等當前任務執(zhí)行完之后再接著執(zhí)行流操作。
(5)Schedulers.single():RxJava擁有一個專用的線程單例,此調度器負責的所有流操作都在這個線程中執(zhí)行,當此線程中有任務執(zhí)行時,其他任務將會按照先進先出的順序依次排隊。

一個簡單的調度器使用實例的代碼如下:
package com.crazymaker.demo.rxJava.basic;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import rx.Observable;import rx.Subscriber;import rx.schedulers.Schedulers;@Slf4jpublic class SchedulerDemo { /** *演示Schedulers的基本使用 */ @Test public void testScheduler() throws InterruptedException { //被觀察者 Observable observable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { for (int i = 0; i < 5; i++) { log.info("produce ->" + i); subscriber.onNext(String.valueOf(i)); } subscriber.onCompleted(); } }); //訂閱Observable與Subscriber之間依然通過subscribe()進行關聯(lián) observable //使用具有線程緩存機制的可復用線程 .subscribeOn(Schedulers.io()) //每執(zhí)行一個任務創(chuàng)建一個新的線程 .observeOn(Schedulers.newThread()) .subscribe(s -> { log.info("consumer ->" + s); }); Thread.sleep(Integer.MAX_VALUE); }}運行這個演示程序,輸出的部分結果如下:
17:04:17.922 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->017:04:17.932 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->117:04:17.932 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->017:04:17.933 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->217:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->117:04:17.933 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->317:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->217:04:17.933 [RxIoScheduler-2] INFO c.c.d.r.b.SchedulerDemo - produce ->417:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->317:04:17.933 [RxNewThreadScheduler-1] INFO c.c.d.r.b.SchedulerDemo - consumer ->4通過上面的代碼可以看出,RxJava提供了兩個方法來改變流操作的調度器:
(1)subscribeOn():主要改變的是彈射的線程。
(2)observeOn():主要改變的是訂閱的線程。
在RxJava中,創(chuàng)建操作符創(chuàng)建的Observable主題的彈射任務,將由其后最近的subscribeOn()所設置的調度器負責執(zhí)行。
在RxJava中,Observable主題的下游消費型操作(如流轉換等)的線程調度,將由其前面最近的observeOn()所設置的調度器負責。observeOn()可以多次設置,每一次設置都對下一次observeOn()設置之前的流操作產(chǎn)生作用。

本文給大家講解的內容是SpringCloudRPC遠程調用核心原理:RxJava響應式編程框架,RxJava的Scheduler調度器
下篇文章給大家講解的是SpringCloudRPC遠程調用核心原理:RxJava響應式編程框架,背壓問題的幾種應對模式;
覺得文章不錯的朋友可以轉發(fā)此文關注小編;
感謝大家的支持!
本文就是愿天堂沒有BUG給大家分享的內容,大家有收獲的話可以分享下,想學習更多的話可以到微信公眾號里找我,我等你哦。
