RxJava源碼學習
1 RxJava使用
RxJava是響應式數(shù)據(jù)流驅動框架,Retrokit提供了對RxJava的支持。
1.1 接入
在app module的build.gradle中添加依賴:
implementation "com.squareup.retrofit2:adapter-rxjava2:2.3.0"implementation "io.reactivex.rxjava2:rxjava:2.0.6"implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
接下來在Retrofit構建的時候添加RxJava CallFactory:
Retrofit retrofit = new Retrofit.Builder().baseUrl("https://api.github.com/").callbackExecutor(Executors.newSingleThreadExecutor()).addCallAdapterFactory(RxJava2CallAdapterFactory.create()).addConverterFactory(GsonConverterFactory.create()).build();
CallAdapter.Factory是典型的工廠模式,這個類的一般實現(xiàn)模式是:
public interface CallAdapter<R, T> {Type responseType();T adapt(Call<R> call);abstract class Factory {public abstract @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit);}}
先調用get方法返回CallAdapter,然后調用adapt方法獲取請求Call執(zhí)行請求。
在正式添加RxJava代碼之前需要先看看RxJava的源碼。
可以從官方的文檔中獲取相關知識。
http://reactivex.io/RxJava/3.x/javadoc/overview-summary.html
2 基本類
2.1 io.reactivex.rxjava3.core.Flowable
0..N flows, supporting Reactive-Streams and backpressure
0到多個流,支持響應流和背壓。
http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html
The Flowable class that implements the Reactive Streams Publisher Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.
Flowable類實現(xiàn)了響應流發(fā)布模式,并提供工廠方法,中間運算符實現(xiàn),以及有能力消費響應數(shù)據(jù)流。
他實現(xiàn)了Publisher接口:
public interface Publisher<T> {public void subscribe(Subscriber<? super T> s);}
public interface Subscriber<T> {/*** Invoked after calling {@link Publisher#subscribe(Subscriber)}.* <p>* No data will start flowing until {@link Subscription#request(long)} is invoked.* <p>* It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.* <p>* The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.** @param s* {@link Subscription} that allows requesting data via {@link Subscription#request(long)}*/public void onSubscribe(Subscription s);
/*** Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.** @param t the element signaled*/public void onNext(T t);
/*** Failed terminal state.* <p>* No further events will be sent even if {@link Subscription#request(long)} is invoked again.** @param t the throwable signaled*/public void onError(Throwable t);
/*** Successful terminal state.* <p>* No further events will be sent even if {@link Subscription#request(long)} is invoked again.*/public void onComplete();}
可以認為Flowable是一個發(fā)布者,數(shù)據(jù)流從這里開始發(fā)起請求。
subscribe是一個工廠方法,可以被調用多次,每次調用都會產生一次訂閱,每次訂閱對應一個Subscriber(訂閱者)。每個訂閱者每次只能訂閱一個發(fā)布者,就是Publisher的各種實現(xiàn)類。
如果發(fā)布者拒絕訂閱或因為其他原因導致失敗,會回調一個錯誤信號給訂閱者(Subscriber.onError)。
圖片來自http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html

2.2 io.reactivex.rxjava3.core.Observable
0..N flows, no backpressure。
0到多個數(shù)據(jù)流,沒有背壓。
The Observable class is the non-backpressured, optionally multi-valued base reactive class that offers factory methods, intermediate operators and the ability to consume synchronous and/or asynchronous reactive dataflows.
Many operators in the class accept ObservableSource(s), the base reactive interface for such non-backpressured flows, which Observable itself implements as well.
Observable類是沒有背壓,可選多值的響應類,這個類能夠提供工廠方法,中間運算符實現(xiàn)以及有能力消費同步和/或異步響應數(shù)據(jù)流。
該類中的許多運算符都實現(xiàn)ObservableSource(s),它是此類非背壓流的基本響應接口,Observable本身也實現(xiàn)了該接口。
public interface ObservableSource<T> {void subscribe(@NonNull Observer<? super T> observer);}
public interface Observer<T> {
/*** Provides the Observer with the means of cancelling (disposing) the* connection (channel) with the Observable in both* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.* @param d the Disposable instance whose {@link Disposable#dispose()} can* be called anytime to cancel the connection* @since 2.0*/void onSubscribe(@NonNull Disposable d);
/*** Provides the Observer with a new item to observe.* <p>* The {@link Observable} may call this method 0 or more times.* <p>* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or* {@link #onError}.** @param t* the item emitted by the Observable*/void onNext(@NonNull T t);
/*** Notifies the Observer that the {@link Observable} has experienced an error condition.* <p>* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or* {@link #onComplete}.** @param e* the exception encountered by the Observable*/void onError(@NonNull Throwable e);
/*** Notifies the Observer that the {@link Observable} has finished sending push-based notifications.* <p>* The {@link Observable} will not call this method if it calls {@link #onError}.*/void onComplete();
}
圖片來自http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Observable.html

2.3 io.reactivex.rxjava3.core.Single
a flow of exactly 1 item or an error。
一個數(shù)據(jù)流正好對應一個item返回或一個錯誤回調。
The Single class implements the Reactive Pattern for a single value response.
Single behaves similarly to Observable except that it can only emit either a single successful value or an error (there is no onComplete notification as there is for an Observable).
The Single class implements the SingleSource base interface and the default consumer type it interacts with is the SingleObserver via the subscribe(SingleObserver) method.
Single類實現(xiàn)了單個數(shù)據(jù)返回的響應模式。
Single類的行為類似Observable,除了他只能發(fā)送一個成功的數(shù)據(jù)或一個錯誤(也沒有一個onComplete回調通知,這在Observable中是存在的)。
Single類實現(xiàn)了SingleSource基本接口,與之交互的默認消費者類型是SingleObserver,通過SingleObserver的subscribe方法產生關聯(lián)。
public interface SingleSource<T> {
/*** Subscribes the given SingleObserver to this SingleSource instance.* @param observer the SingleObserver, not null* @throws NullPointerException if {@code observer} is null*/void subscribe(@NonNull SingleObserver<? super T> observer);}
public interface SingleObserver<T> {
/*** Provides the SingleObserver with the means of cancelling (disposing) the* connection (channel) with the Single in both* synchronous (from within {@code onSubscribe(Disposable)} itself) and asynchronous manner.* @param d the Disposable instance whose {@link Disposable#dispose()} can* be called anytime to cancel the connection* @since 2.0*/void onSubscribe(@NonNull Disposable d);
/*** Notifies the SingleObserver with a single item and that the {@link Single} has finished sending* push-based notifications.* <p>* The {@link Single} will not call this method if it calls {@link #onError}.** @param t* the item emitted by the Single*/void onSuccess(@NonNull T t);
/*** Notifies the SingleObserver that the {@link Single} has experienced an error condition.* <p>* If the {@link Single} calls this method, it will not thereafter call {@link #onSuccess}.** @param e* the exception encountered by the Single*/void onError(@NonNull Throwable e);}
圖片來自:http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Single.html

2.4 io.reactivex.rxjava3.core.Completable
a flow without items but only a completion or error signal。
沒有數(shù)據(jù)回調返回的數(shù)據(jù)流,但是只有完成或錯誤回調。
The Completable class represents a deferred computation without any value but only indication for completion or exception.
Completable behaves similarly to Observable except that it can only emit either a completion or error signal (there is no onNext or onSuccess as with the other reactive types).
The Completable class implements the CompletableSource base interface and the default consumer type it interacts with is the CompletableObserver via the subscribe(CompletableObserver) method.
Completable類意味著一個延遲的沒有值返回的計算,但是只有完成或異常返回回調。
Completable的表現(xiàn)與Observable類似,除了他只能傳遞一個完成或錯誤異常。
Completable類實現(xiàn)了CompletableSource基本接口,默認的消費者類型是CompletableObserver,通過CompletableObserver.subscribe方法產生交互。
public interface CompletableSource {
/*** Subscribes the given {@link CompletableObserver} to this {@code CompletableSource} instance.* @param observer the {@code CompletableObserver}, not {@code null}* @throws NullPointerException if {@code observer} is {@code null}*/void subscribe(@NonNull CompletableObserver observer);}
public interface CompletableObserver {/*** Called once by the {@link Completable} to set a {@link Disposable} on this instance which* then can be used to cancel the subscription at any time.* @param d the {@code Disposable} instance to call dispose on for cancellation, not null*/void onSubscribe(@NonNull Disposable d);
/*** Called once the deferred computation completes normally.*/void onComplete();
/*** Called once if the deferred computation 'throws' an exception.* @param e the exception, not {@code null}.*/void onError(@NonNull Throwable e);}
圖片來自:http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Completable.html

2.5 io.reactivex.rxjava3.core.Maybe
a flow with no items, exactly one item or an error.
一個沒有反饋項的數(shù)據(jù)流,準確的說應該是反饋一個item或錯誤。
The Maybe class represents a deferred computation and emission of a single value, no value at all or an exception.
The Maybe class implements the MaybeSource base interface and the default consumer type it interacts with is the MaybeObserver via the subscribe(MaybeObserver) method.
Maybe類表示單個值(完全沒有值或異常)的延遲計算和發(fā)射。
Maybe類實現(xiàn)了基類借口MaybeSource,與之交互的默認消費者類型是MaybeObserver,通過MaybeObserver.subscribe()實現(xiàn)交互。
public interface MaybeSource<T> {
/*** Subscribes the given MaybeObserver to this MaybeSource instance.* @param observer the MaybeObserver, not null* @throws NullPointerException if {@code observer} is null*/void subscribe(@NonNull MaybeObserver<? super T> observer);}
public interface MaybeObserver<T> {
/*** Provides the MaybeObserver with the means of cancelling (disposing) the* connection (channel) with the Maybe in both* synchronous (from within {@code onSubscribe(Disposable)} itself) and asynchronous manner.* @param d the Disposable instance whose {@link Disposable#dispose()} can* be called anytime to cancel the connection*/void onSubscribe(@NonNull Disposable d);
/*** Notifies the MaybeObserver with one item and that the {@link Maybe} has finished sending* push-based notifications.* <p>* The {@link Maybe} will not call this method if it calls {@link #onError}.** @param t* the item emitted by the Maybe*/void onSuccess(@NonNull T t);
/*** Notifies the MaybeObserver that the {@link Maybe} has experienced an error condition.* <p>* If the {@link Maybe} calls this method, it will not thereafter call {@link #onSuccess}.** @param e* the exception encountered by the Maybe*/void onError(@NonNull Throwable e);
/*** Called once the deferred computation completes normally.*/void onComplete();}
圖片來自http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Maybe.html

3 RxJava中的線程
3.1 IO
在Schedulers類中通過靜態(tài)調用獲取IO線程,
public static Scheduler io() {return RxJavaPlugins.onIoScheduler(IO);}
io方法中,有一個IO靜態(tài)變量,在static塊中被初始化:
static {SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());}
可以看到這里不止IO線程,還有NEW_THREAD等線程,這里以IO為例說明。
Schedulers
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IOTask implements Supplier<Scheduler> {@Overridepublic Scheduler get() {return IoHolder.DEFAULT;}}
static final class IoHolder {static final Scheduler DEFAULT = new IoScheduler();}
IoScheduler
public final class IoScheduler extends Scheduler {}
public IoScheduler() {this(WORKER_THREAD_FACTORY);}
public IoScheduler(ThreadFactory threadFactory) {this.threadFactory = threadFactory;this.pool = new AtomicReference<>(NONE);start();}
@Overridepublic void start() {CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);if (!pool.compareAndSet(NONE, update)) {update.shutdown();}}CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;this.expiringWorkerQueue = new ConcurrentLinkedQueue<>();this.allWorkers = new CompositeDisposable();this.threadFactory = threadFactory;ScheduledExecutorService evictor = null;Future<?> task = null;if (unit != null) {evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);}evictorService = evictor;evictorTask = task;}
EVICTOR_THREAD_FACTORY是一個RxThreadFactory對象。
RxThreadFactory
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {@Overridepublic Thread newThread(@NonNull Runnable r) {Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);t.setPriority(priority);t.setDaemon(true);return t;}}}
可以看到IO其實對應的是IoScheduler,中間包含了一個線程池執(zhí)行對應的任務。
3.2 .mainThread
一般調用方法是:AndroidSchedulers.mainThread()。
AndroidSchedulers
public static Scheduler mainThread() {return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);}private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(new Callable<Scheduler>() {@Override public Scheduler call() throws Exception {return MainHolder.DEFAULT;}});private static final class MainHolder {static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);}
主線程的切換使用Handler MainLooper。
4 整體把握
Observable<String> observable = service.listRepos1("cg22983677");observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {}
@Overridepublic void onNext(String s) {
}
@Overridepublic void onError(Throwable e) {
}
@Overridepublic void onComplete() {
}});
4.1 subscribeOn
這兩個方法創(chuàng)建了數(shù)據(jù)流被執(zhí)行的環(huán)境。
Observable
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {Objects.requireNonNull(scheduler, "scheduler is null");return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));}
ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {super(source);this.scheduler = scheduler;}
@Overridepublic void subscribeActual(final Observer<? super T> observer) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {this.downstream = downstream;this.upstream = new AtomicReference<>();}
@Overridepublic void onSubscribe(Disposable d) {DisposableHelper.setOnce(this.upstream, d);}
@Overridepublic void onNext(T t) {downstream.onNext(t);}
@Overridepublic void onError(Throwable t) {downstream.onError(t);}
@Overridepublic void onComplete() {downstream.onComplete();}
@Overridepublic void dispose() {DisposableHelper.dispose(upstream);DisposableHelper.dispose(this);}
@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}
void setDisposable(Disposable d) {DisposableHelper.setOnce(this, d);}}
final class SubscribeTask implements Runnable {private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {this.parent = parent;}
@Overridepublic void run() {source.subscribe(parent);}}}
4.2 observeOn
Observable
public final Observable<T> observeOn(Scheduler scheduler) {return observeOn(scheduler, false, bufferSize());}
ObservableObserveOn
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {final Scheduler scheduler;final boolean delayError;final int bufferSize;public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {super(source);this.scheduler = scheduler;this.delayError = delayError;this.bufferSize = bufferSize;}
@Overrideprotected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));}}static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>implements Observer<T>, Runnable {final Observer<? super T> downstream;final Scheduler.Worker worker;SimpleQueue<T> queue;
Disposable upstream;@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(T t) {queue.offer(t);schedule();}@Overridepublic void onError(Throwable t) {done = true;schedule();}@Overridepublic void onComplete() {if (done) {return;}done = true;schedule();}@Overridepublic void dispose() {if (!disposed) {disposed = true;upstream.dispose();worker.dispose();if (!outputFused && getAndIncrement() == 0) {queue.clear();}}}void schedule() {if (getAndIncrement() == 0) {worker.schedule(this);}}@Overridepublic void run() {if (outputFused) {drainFused();} else {drainNormal();}}}}
4.3 subscribe
一句話,在這里啟動數(shù)據(jù)流。
5 流程圖
Observable<String> observable = service.listRepos1("cg22983677");observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {});


6 總結
第5節(jié)中的流程圖是Observable相關的,其實另外四種基礎類都是一樣的流程。
每個基礎類中都有不同的操作符,每一個操作符就是一個Observable,意思就是可被觀察的,內部又實現(xiàn)了觀察者,用于接收上一步的數(shù)據(jù)流,處理完畢之后,將數(shù)據(jù)流往下傳遞,最終到了消費者。
處理數(shù)據(jù)的過程所處的線程,依賴于最初設置的Scheduler,他里面的Worker是具體實施者,并在最初設定的線程中運行。
