概述
RxJava 是什么呢?根据RxJava在GitHub上给出的描述 RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java 大致意思是:一个可以在 JVM上 使用的,是由异步的基于事件编写的通过使用可观察序列构成的一个库。
关键词:异步,基于事件,可观察序列
本文主要讲述 RxJava 的订阅原理。
示例:HelloRxJava
一般学一门新的编程语言都是先从打印的「hello word」开始,我们看看如何用 Rxjava 打印出「Hello RxJava」 并以之作为分析的实例。
|
|
Observerable 是如何创建的?
首先 new 了一个 ObservableOnSubscribe 对象,并实现其中的 subscribe 方法。该对象被传递给了 Observable#create 方法以创建 Observable(当然也有其他方法可以创建 Observable ,但是原理大同小异)。
io.reactivex.Observable#create
|
|
create 方法传递进来的 Observable 又传递给了 Observable 构造方法。
|
|
io.reactivex.plugins.RxJavaPlugins#onAssembly(io.reactivex.Observable
|
|
io.reactivex.plugins.RxJavaPlugins#onAssembly 方法中只是调用了相关的 hook 函数(如果有的话),然后返回原对象。
创建的结果:ObservableOnSubsribe 外面包装了一层。如下图所示:

订阅过程
实例代码中在订阅之前我们先创建了一个 observer 对象。
然后调用 Observable#subscribe 方法,将 observer 作为参数传递给该方法。
点开看看 Observable#subscribe 方法的实现。
|
|
上面 try 块中的第一行代码调用了 RxJavaPlugins#onSubscribe 。点开看看它具体做了啥?
|
|
我们订阅的实际对象是 ObserverableCreate,因此点进去看看其中的 subscribeActual 方法实现:
ObservableCreate#subscribeActual
|
|
这里的source#subcribe 也就是:
|
|
订阅是从下游传递到上游。传递到源头之后,会触发调用 Emitter#onXxx 方法,将数据从上游发送到下游。
emitter 是如何将数据发射给 observer 的呢?
我们先看看 emitter 是什么?
上面的 emitter 的实际类型是 CreateEmitter
具体实现如下:
|
|
CreateEmitter 是 ObservableCreate的一个内部类,继承自AtomicReference<Disposable>, 实现了两个接口 ObservableEmitter<T> 和 Disposable 。
可以看到 CreateEmitter 对 observer 进行了包装(observer 依赖通过构造函数参数注入)。它在调用 observer 的相应方法的前后对状态进行判断和更新。
CreateEmitter 又是在什么时候创建的呢?
在订阅过程中调用到 ObservableCreate#subscribeActual ,该方法会利用 observer 构造一个 CreateEmmiter, 然后把它作为参数去调用 source#subcribe 方法。
source 也就是我们创建的 ObservableOnSubscribe 匿名内部类。CreateEmmiter 就是通过这样的方式作为参数传递给了我们自己实现的 subscribe 方法。
小结
订阅是从下游传递到上游。传递到源头之后,会触发调用 Emitter#onXxx 方法,将数据从上游发送到下游。
上游对数据流的控制是通过 CreateEmitter 实现的。
由于本人水平有限,可能出于误解或者笔误难免出错,如果发现有问题或者对文中内容存在疑问请在下面评论区告诉我,谢谢!