RxJava-订阅与线程切换原理

本文主要分析 RxJava 订阅 与线程切换的内部实现原理。

一般来说,使用 RxJava 都会通过链式调用。
日常的写法👇

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observable.just("A", "B", "C")
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});

为了方便说明和理解,我们上述将链式调用拆为如下👇等价的逐步调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//创建一个数据源
final Observable<String> stringObservable = Observable.just("A", "B", "C");
//调用 subsribeOn,对数据源进行包装,得到一个 observableSubscribeOn
final Observable<String> observableSubscribeOn = stringObservable.subscribeOn(Schedulers.io());
//调用 observeOn,对上一步的 Observable 做一层包装,得到一个 observableObserveOn
final Observable<String> observableObserveOn = observableSubscribeOn.observeOn(Schedulers.single());
//创建一个观察者
final Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//订阅
observableObserveOn
.subscribe(observer);

四个关键概念

RxJava 中有四个关键概念

  • 观察者:对事件进行响应的对象,在上面的代码中,我们创建一个 Observer 匿名内部类对象,即为例子中的观察者
  • 被观察者
    • 产生事件的对象,也可以称为生产者,上面例子中的 Observable#just 返回的是一个 ObservableFromArray 对象,ObservableFromArray 继承自 Observable 抽象类,它就是例子中的被观察者
  • 事件:RxJava 中存在四种事件流:onSubscribe(订阅事件),onNext(正常事件),onError(异常事件),onComplete(完成事件:)。
  • 订阅:创建观察者与被观察者之间观察关系,对应着上述代码中的subscribe()方法

订阅的流程

  • 首先我们订阅的是一个内部包装了 N 层的 Observable。调用了最外层的 Observable 的 subsribe 方法之后,会触发往内层的 Observable#subscribeActual 逐层调用
  • 开头的例子中,数据源是 stringObservable,第一次调用 subscribeOn 的结果赋值给了 observableSubscribeOn,然后,调用observableSubscribeOn.observeOn 得到 observableObserveOn,最终调用 observableObserveOn#subsribe 方法,并将我们的 observer 作为参数传递进去。
  • 举一个例子,假设我们对数据源 Observable 进行了 10 次 subscribeOn 链式调用,那么数据源外面被 ObservableSubsribeOn 类包装了 10 层。在调用 subscribe 的时候,逐层往里面调用。

Observable#subscribe

1
2
3
4
5
6
7
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
//省略一些代码
subscribeActual(observer);
//省略一些代码
}

ObservableObserveOn#subscribeActual

1
2
3
4
5
6
7
8
9
10
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

上游是一个 ObservableSubscribeOn

ObservableSubscribeOn#subscribeActual

1
2
3
4
5
6
7
8
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

数据源是 ObservableFromArray

ObservableFromArray#subscribeActual

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
//回调 Observer 的方法
observer.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}

ObservableFromArray. FromArrayDisposable#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
//发送事件
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}

事件传递的流程

  • 我们在 subscribe 方法传入的 Observer 在订阅的过程中,会被一层一层地进行包装,每执行一个操作符,就会被包装一次,
  • 分发是,数据源的 onNext 传递给 包装的 Observer

订阅与事件传递小结

img

线程切换原理

observeOn 原理

传递给 observeOn 的 调度器 ,RxJava 会将下游的 observer 包装一层,上游有事件派发下来的时候,会使用 scheduler#scheduleDirect 方法,在对应的线程调用下游的 observer#onNext 方法

subscribeOn 原理

传递给 subscribeOn 的 调度器 ,RxJava 会创建一个 ObservableSubscribeOn 将上游的 observable 包装一层,在 ObservableSubscribeOn#subscribeActual 方法中(外部调用subscribe 的时候,会触发这个方法),构造一个SubscribeOnObserver 包装下游的 Observer,同时会调用 Scheduler#scheduleDirect(Runnable) 方法 ,也就是真正订阅的时候,会通过 scheduler#scheduleDirect 方法,在对应的线程调用上游的 source.subscribe(parent);方法

subscribeon 是不是只有第一次有效,会不会影响到下游的线程?

  • 最终表现的结果是只有第一次生效,但是呢,中间也有生效只是没有表现出来。
  • 每一轮订阅的 s.onSubscribe(parent); 还是会被影响的。
  • 也就是下游的onSubscribeon 方法的调用的线程取决于,
  • 线程变换后才会去调用 上游的订阅方法
  • parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } 这一句应用是 scheduler 的地方
    • 在 run 方法中 会调用 ObservableSource#subscribe
  • 上游传递事件给下游的时候,我们设置了 observeOn 的话,会通过 observeOn 中指定的 Scheduler 去运行 task

线程切换 subscribeOn和ObserveOn是怎么实现线程调度的? (RxJava 是如何使用我们传递的 Scheduler 参数的?)

  • 传递给 observeOn 的 调度器 ,RxJava 会将下游的 observer 包装一层,上游有事件派发下来的时候,会使用 scheduler#scheduleDirect 方法,在对应的线程调用下游的 observer#onNext 方法

  • 传递给 subscribeOn 的 调度器 ,RxJava 会创建一个 ObservableSubscribeOn 将上游的 observable 包装一层,在 ObservableSubscribeOn#subscribeActual 方法中(外部调用subscribe 的时候,会触发这个方法),构造一个SubscribeOnObserver 包装下游的 Observer,同时会调用 Scheduler#scheduleDirect(Runnable) 方法 ,也就是真正订阅的时候,会通过 scheduler#scheduleDirect 方法,在对应的线程调用上游的 source.subscribe(parent);方法

    • 那这里的 observer 包装为 SubscribeOnObserver 似乎也不是必须要的呀?
  • subscribeon 是不是只有第一次有效,会不会影响到下游的线程?

    • 最终表现的结果是只有第一次生效,但是呢,中间也有生效只是没有表现出来。
    • 每一轮订阅的 s.onSubscribe(parent); 还是会被影响的。
    • 也就是下游的onSubscribeon 方法的调用的线程取决于,
    • 线程变换后才会去调用 上游的订阅方法
    • parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } 这一句应用是 scheduler 的地方
      • 在 run 方法中 会调用 ObservableSource#subscribe
    • 上游传递事件给下游的时候,我们设置了 observeOn 的话,会通过 observeOn 中指定的 Scheduler 去运行 task
  • 其实把链式调用改为分开调用。可以发现,每一个操作符 函数返回值都是一个 Observable / Single/Flowable.

    • 以 Observable 为例,调用了 subscribeOn 之后,返回的是 ObservableSubscribeOn;调用observeOn ,返回的是 ObservableObserveOn。它们内部都重写了 subscribeActual 方法
  • 小结:以 Observable 为例,调用了 subscribeOn 之后,返回的是 ObservableSubscribeOn,这个类继承了 Observable 抽象类。重写了 subsribeActual 方法,在该方法中会实现真正的订阅逻辑,通过 scheduler 将线程切换到目标线程,然后再调用 source#subsribe 方法

    • 真正的订阅是啥意思?还能有「假的订阅」?

    • 订阅过程

      • 其实订阅的是一个包装过 N 层的 Observable,调用了 最外层的 Observable#subsribe 方法之后,会触发一层一层往内层的 Observable#subscribeActual 调用
      • 举一个例子,调用了 10 次 subscribeOn,那么数据源外面被ObservableSubsribeOn 类包装了 10 层
    • 事件 传递

      • 我们在 subscribe 方法传入的 Observer 在订阅的过程中,会被一层一层地进行包装,每执行一个操作符,就会被包装一次,
      • 分发是,数据源的 onNext 传递给 包装的 Observer
    • 我们订阅的 Observable 是一个包装了 N 层的 Observable,在调用 subscribe 方法的时候,它会一层一层往里面调用,直到最终的数据源

- 代码如下:
Show Comments
0%