本文主要分析 RxJava 订阅 与线程切换的内部实现原理。
一般来说,使用 RxJava 都会通过链式调用。
日常的写法👇
|
|
为了方便说明和理解,我们上述将链式调用拆为如下👇等价的逐步调用
|
|
四个关键概念
RxJava 中有四个关键概念
- 观察者:对事件进行响应的对象,在上面的代码中,我们创建一个 Observer 匿名内部类对象,即为例子中的观察者
- 被观察者
- 产生事件的对象,也可以称为生产者,上面例子中的 Observable#just 返回的是一个 ObservableFromArray 对象,ObservableFromArray 继承自 Observable 抽象类,它就是例子中的被观察者
- 事件:RxJava 中存在四种事件流:onSubscribe(订阅事件),onNext(正常事件),onError(异常事件),onComplete(完成事件:)。
- 订阅:创建观察者与被观察者之间观察关系,对应着上述代码中的subscribe()方法
订阅的流程
- 首先我们订阅的是一个内部包装了 N 层的 Observable。调用了最外层的 Observable 的 subsribe 方法之后,会触发往内层的 Observable#subscribeActual 逐层调用
- 开头的例子中,数据源是 stringObservable,第一次调用 subscribeOn 的结果赋值给了 observableSubscribeOn,然后,调用observableSubscribeOn.observeOn 得到 observableObserveOn,最终调用 observableObserveOn#subsribe 方法,并将我们的 observer 作为参数传递进去。
- 举一个例子,假设我们对数据源 Observable 进行了 10 次 subscribeOn 链式调用,那么数据源外面被 ObservableSubsribeOn 类包装了 10 层。在调用 subscribe 的时候,逐层往里面调用。
Observable#subscribe
|
|
ObservableObserveOn#subscribeActual
|
|
上游是一个 ObservableSubscribeOn
ObservableSubscribeOn#subscribeActual
|
|
数据源是 ObservableFromArray
ObservableFromArray#subscribeActual
|
|
ObservableFromArray. FromArrayDisposable#run
|
|
事件传递的流程
- 我们在 subscribe 方法传入的 Observer 在订阅的过程中,会被一层一层地进行包装,每执行一个操作符,就会被包装一次,
- 分发是,数据源的 onNext 传递给 包装的 Observer
订阅与事件传递小结
线程切换原理
observeOn 原理
传递给 observeOn 的 调度器 ,RxJava 会将下游的 observer 包装一层,上游有事件派发下来的时候,会使用 scheduler#scheduleDirect 方法,在对应的线程调用下游的 observer#onNext 方法
subscribeOn 原理
传递给 subscribeOn 的 调度器 ,RxJava 会创建一个 ObservableSubscribeOn 将上游的 observable 包装一层,在 ObservableSubscribeOn#subscribeActual 方法中(外部调用subscribe 的时候,会触发这个方法),构造一个SubscribeOnObserver 包装下游的 Observer,同时会调用 Scheduler#scheduleDirect(Runnable) 方法 ,也就是真正订阅的时候,会通过 scheduler#scheduleDirect 方法,在对应的线程调用上游的 source.subscribe(parent);方法
subscribeon 是不是只有第一次有效,会不会影响到下游的线程?
- 最终表现的结果是只有第一次生效,但是呢,中间也有生效只是没有表现出来。
- 每一轮订阅的 s.onSubscribe(parent); 还是会被影响的。
- 也就是下游的onSubscribeon 方法的调用的线程取决于,
- 线程变换后才会去调用 上游的订阅方法
- parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } 这一句应用是 scheduler 的地方
- 在 run 方法中 会调用 ObservableSource#subscribe
- 上游传递事件给下游的时候,我们设置了 observeOn 的话,会通过 observeOn 中指定的 Scheduler 去运行 task
线程切换 subscribeOn和ObserveOn是怎么实现线程调度的? (RxJava 是如何使用我们传递的 Scheduler 参数的?)
传递给 observeOn 的 调度器 ,RxJava 会将下游的 observer 包装一层,上游有事件派发下来的时候,会使用 scheduler#scheduleDirect 方法,在对应的线程调用下游的 observer#onNext 方法
传递给 subscribeOn 的 调度器 ,RxJava 会创建一个 ObservableSubscribeOn 将上游的 observable 包装一层,在 ObservableSubscribeOn#subscribeActual 方法中(外部调用subscribe 的时候,会触发这个方法),构造一个SubscribeOnObserver 包装下游的 Observer,同时会调用 Scheduler#scheduleDirect(Runnable) 方法 ,也就是真正订阅的时候,会通过 scheduler#scheduleDirect 方法,在对应的线程调用上游的 source.subscribe(parent);方法
- 那这里的 observer 包装为 SubscribeOnObserver 似乎也不是必须要的呀?
subscribeon 是不是只有第一次有效,会不会影响到下游的线程?
- 最终表现的结果是只有第一次生效,但是呢,中间也有生效只是没有表现出来。
- 每一轮订阅的 s.onSubscribe(parent); 还是会被影响的。
- 也就是下游的onSubscribeon 方法的调用的线程取决于,
- 线程变换后才会去调用 上游的订阅方法
- parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } 这一句应用是 scheduler 的地方
- 在 run 方法中 会调用 ObservableSource#subscribe
- 上游传递事件给下游的时候,我们设置了 observeOn 的话,会通过 observeOn 中指定的 Scheduler 去运行 task
其实把链式调用改为分开调用。可以发现,每一个操作符 函数返回值都是一个 Observable / Single/Flowable.
- 以 Observable 为例,调用了 subscribeOn 之后,返回的是 ObservableSubscribeOn;调用observeOn ,返回的是 ObservableObserveOn。它们内部都重写了 subscribeActual 方法
小结:以 Observable 为例,调用了 subscribeOn 之后,返回的是 ObservableSubscribeOn,这个类继承了 Observable 抽象类。重写了 subsribeActual 方法,在该方法中会实现真正的订阅逻辑,通过 scheduler 将线程切换到目标线程,然后再调用 source#subsribe 方法
真正的订阅是啥意思?还能有「假的订阅」?
订阅过程
- 其实订阅的是一个包装过 N 层的 Observable,调用了 最外层的 Observable#subsribe 方法之后,会触发一层一层往内层的 Observable#subscribeActual 调用
- 举一个例子,调用了 10 次 subscribeOn,那么数据源外面被ObservableSubsribeOn 类包装了 10 层
事件 传递
- 我们在 subscribe 方法传入的 Observer 在订阅的过程中,会被一层一层地进行包装,每执行一个操作符,就会被包装一次,
- 分发是,数据源的 onNext 传递给 包装的 Observer
我们订阅的 Observable 是一个包装了 N 层的 Observable,在调用 subscribe 方法的时候,它会一层一层往里面调用,直到最终的数据源
- 代码如下: