RxJava wiki 官方文档 (中文易懂,精简整理)

文章正文
发布时间:2025-05-14 08:58

本文是对 RxJava 官方文档的翻译,整理,让我们一起从 0 到 1 走进 RXJava 的世界。

{@see: https://github.com/ReactiveX/RxJava/wiki} {@see: https://www.jianshu.com/p/d997805b37d4}

 

0x001 首页

RxJava 是 ReactiveX (Reactive Extensions) 的 Java VM 实现:用于使用可观察序列编写异步和基于事件的程序库。

RxJava 是轻量级的:只有一个简单的 jar 包,聚焦在可观察的抽象和相关的高阶函数。

RxJava 是一个多线程实现:RxJava 支持 Java 6 或更高版本,同时支持基于 JVM  的语言,如 Groovy、Clojure、JRuby、Kotlin 和 Scala。

 

0x002 如何引入

maven:

<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.2.0</version> </dependency>

gradle:

compile 'io.reactivex.rxjava2:rxjava:2.2.0'

Ivy:

<dependency org="io.reactivex.rxjava2" name="rxjava" rev="2.2.0" />

当然也可以下载 RxJava 的源码自己构建:

$ git clone git@github.com:ReactiveX/RxJava.git $ cd RxJava/ $ ./gradlew build $ ./gradlew build :rxjava:compileJava :rxjava:processResources UP-TO-DATE :rxjava:classes :rxjava:jar :rxjava:sourcesJar :rxjava:signArchives SKIPPED :rxjava:assemble :rxjava:licenseMain UP-TO-DATE :rxjava:licenseTest UP-TO-DATE :rxjava:compileTestJava :rxjava:processTestResources UP-TO-DATE :rxjava:testClasses :rxjava:test :rxjava:check :rxjava:build BUILD SUCCESSFUL Total time: 30.758 secs

 

0x003 开始使用

RxJava 的 examples 中提供了多种语言的演示,如 Java, Groovy, Clojure, Scala,本文以 Java 为演示语言。

public static void main(String[] args) { String[] input = new String[] { "rxjava", "ReactiveX" }; // Java 8 lambdas Flowable.fromArray(input).subscribe(s -> { System.out.println("Hello " + s + "!"); }); // doesn't support Java 8 lambdas (yet) Flowable.fromArray(input).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("Hello " + s + "!"); } }); }

输出:

Hello rxjava! Hello ReactiveX!


要使用 RxJava,你需要创建 Observables (发出数据项),以各种方式转换这些 Observables (通过使用 Observable 运算符) 以获得你感兴趣的精确数据项。然后观察这些有趣项的序列并对其做出反应 (通过实现 Observers)  或订阅者,然后将其订阅到转换后的 Observable 中。

 

1. 从现有数据结构创建可观察对象

你可以使用 Observable 的 just() 和 from() 方法将对象,对象列表或对象数组转换为发出这些对象的 Observables.

Observable<String> o1 = Observable.fromArray("a", "b", "c"); List list = new ArrayList<>(); list.add("1"); list.add("2"); Observable<Integer> o2 = Observable.fromIterable(list); Observable<String> o3 = Observable.just("one object");

对于要由 Observable 发出的每个项目,这些转换后的 Observable 将同步调用预订它们的任何订户的 onNext() 方法,然后将调用订户的 onCompleted() 方法。

 

2. 通过 create() 方法创建一个 Observable

通过设计自己的 Observable 并使用 create() 方法实现它,你可以实现异步 IO,计算操作,甚至 "无限" 数据流。

public static void main(String[] args) { customObservableBlocking().subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { } @Override public void onNext(String s) { System.out.println("onNext: " + s); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { System.out.println("onComplete"); } }); } private static Observable<String> customObservableBlocking() { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { for (int i = 0; i < 50; i++) { observableEmitter.onNext("value_" + i); } observableEmitter.onComplete(); } }); }

输出:

onNext: value_0 onNext: value_1 onNext: value_2 ... onNext: value_49 onComplete

 

3. 用运算符转换可观测对象

RxJava 允许你将运算符链接在一起以变换和组成 Observable,比如还是上面的例子,需要跳过前 10 项,接受下 5 项,然后对其进行转换 (映射 (...))在订阅和打印项目之前:

customObservableBlocking().skip(10).take(5) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { // 遍历的单个结果 System.out.println("map: " + s); return s; } }) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { } @Override public void onNext(String s) { System.out.println("onNext: " + s); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { System.out.println("onComplete"); } });

输出:

map: value_10 onNext: value_10 map: value_11 onNext: value_11 map: value_12 onNext: value_12 map: value_13 onNext: value_13 map: value_14 onNext: value_14 onComplete

下面是上述转换过程图:


 

 

0x004 操作符

RxJava 的操作符非常,这边对操作符做了一个分类:

创建操作符

变换操作符

过滤操作符

组合操作符

辅助操作符

错误处理操作符

布尔操作符

转换操作符

 

1. 创建操作符

create

关于 create 操作符,上面的 "开始使用" 已经讲解过了。

from

可以通过各种类型的数据源创建 Observable.

Observable<String> o1 = Observable.fromArray("a", "b", "c"); List list = new ArrayList<>(); list.add("1"); list.add("2"); Observable<Integer> o2 = Observable.fromIterable(list);

just

just 操作符,创建将逐个内容进行发送的 Observable,其内部将多个参数发送的内容转换为一个数组,然后将数组通过 from 操作符进行发送。

Observable.just("Hello", "World!").subscribe(onNextAction);

interval

创建以 %time 为时间间隔发送整数序列的 Observable, 这个序列为一个无限递增的整数序列。interval 默认在 computation 调度器上执行,你也可以传递一个可选的 Scheduler 参数来指定调度器。

Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(onNextAction);

range

创建以发送范围内的整数序列的 Observable:

Observable.range(0, 5).subscribe(onNextAction);

repeat

创建一个以 N 次重复发送数据的 Observable.

Observable.range(0, 5).repeat(3).subscribe(onNextAction);

 

2. 变换操作符

map

将源 Observable 发送的数据转换为一个新的 Observable 对象。Func1 和 Action 的区别在于,Func1 包装的是有返回值的方法。

Observable.just("Hello", "World!").map(new Func1<String, String>() { @Override public String call(String s) { return "Hi: " + s; } }).subscribe(onNextAction);

flatMap

源 Observable 通过 flatMap 操作符转换为包含源 Observable 发送的所有子条目的 Observable 集合,然后从 Observable 集合中逐个取出转化为单个 Observable 对象进行发送,不同于 map 操作符的一点就是一对多的转化。

它的应用场景可以体现在源 Observable 发送的内容为一个复杂的数据集,例如一个 Bean 对象,而该外层 Bean 对象中一个成员变量为另一个内层 Bean 对象,我们想要拆解外层 Bean 对象获取内层 Bean 对象,就可以用 flatMap 操作符。注意:flatMap 对这些 Observables 发射的数据做的是合并 (merge) 操作,因此它们可能是交错的。

String[] sources = {"Hello", "World"}; // flatMap 的合并运行允许交叉,允许交错的发送事件 Observable.from(sources).flatMap(new Func1<String, Observable<String>>() { @Override public Observable<String> call(String s) { return Observable.just("Hi:" + s); } }).subscribe(onNextAction);

concatMap

类似于 flatMap 操作符,不同的一点是它按次序连接。

// 解决了flatMap的交叉问题,将发送的数据连接发送 String[] sources = {"Hello", "RxJava", "World!"}; Observable.from(sources).concatMap(new Func1<String, Observable<String>>() { @Override public Observable<String> call(String s) { return Observable.just("Hi:" + s); } }).subscribe(onNextAction);

buffer

将原有 Observable 转换为一个新的 Observable,这个新的 Observable 每次发送一组值,而不是一个个进行发送,我们可以定义这个新的 Observable 存放几个原有的 Observable 对象。

// 将原有 Observable 转换为一个新的 Observable,这个新的 Observable 每次发送一组值,而不是一个个进行发送 Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> list) { // handle list } });

 

 

3. 过滤操作符

filter

filter 默认不在任何特定的调度器上执行。

Observable.range(0, 5).filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer num) { // 自定义的条件,只有符合条件的结果才会提交给观察者 return num > 2; } }).subscribe(onNextAction);

elementAt

elementAt 操作符获取原始 Observable 发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。

// elementAt 操作符,用于返回指定位置后一位的数据,即脚标 +1 的数据 // 在这里发送 0、1、2、3、4,脚标为 3 的数据为 2,发送其后一位数据 3 Observable.range(0, 5).elementAt(3).subscribe(onNextAction);

distinct

只允许还没有发射过的数据项通过。

// distinct 操作符,用于 Observable 发送的元素的去重 Observable.just(1, 1, 2, 2, 2, 3).distinct().subscribe(onNextAction);

skip

抑制 Observable 发射的前 N 项数据,只发送后 N 项数据。

//skip 操作符,用于 Observable 发送的元素前 N 项去除掉 Observable.range(0, 5).skip(2).subscribe(onNextAction);

take

用于 Observable 发送的元素只取前 N 项

Observable.range(0, 5).take(2).subscribe(onNextAction);

 

4.  组合操作符

组合操作符用于将多个 Observable 组合成一个单一的 Observable.

startWith

会在发送的数据序列前插入数据序列,并且会发送插入的数据序列。

Observable.range(3, 5).startWith(0, 10086).subscribe(onNextAction);

merge

将多个 Observable 对象进行合并,需要注意的是:merge 可能会让合并的 Observables 发射的数据交错。
在这里将 firstObservable 指定在 IO 线程中进行发送,secondObservable 没有指定线程,两者合并然后发送数据时便会产生数据交错的现象。

// merge 操作符,会将多个 Observable 对象合并到一个 Observable 对象中进行发送 Observable<Integer> firstObservable = Observable.just(0, 1, 2).subscribeOn(Schedulers.io()); Observable<Integer> secondObservable = Observable.just(3, 4, 5); Observable.merge(firstObservable, secondObservable).subscribe(onNextAction, onErrorAction);

concat

concat 操作符不同于 merge 操作符的区别就是:会将多个 Observable 对象合并到一个 Observable 对象中进行发送,严格按照顺序进行发送。

// concat 操作符,会将多个 Observable 对象合并到一个 Observable 对象中进行发送,严格按照顺序进行发送 Observable<Integer> firstObservable = Observable.just(0, 1, 2).subscribeOn(Schedulers.io()); Observable<Integer> secondObservable = Observable.just(3, 4, 5); Observable.concat(firstObservable, secondObservable) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(onNextAction);

zip

返回一个 Obversable,它使用这个函数按顺序结合两个或多个 Observables 发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序进行数据发送。它只发射与发射数据项最少的那个 Observable 一样多的数据。

// zip 操作符,会将多个 Observable 对象转换成一个 Observable 对象然后进行发送,转换关系可根据需求自定义 Observable<Integer> integerObservable = Observable.range(0, 4); Observable<String> stringObservable = Observable.just("a", "b", "c", "d"); Observable.zip(integerObservable, stringObservable, new Func2<Integer, String, String>() { @Override public String call(Integer num, String info) { // 在这里的转换关系为将数字与字串内容进行拼接 return "数字为:" + num + ", 字符为:" + info; } }).subscribe(onNextAction);

 

5.  辅助操作符

delay

可以让源 Observable 对象发送数据之前暂停一段指定的时间。

Observable.just(1, 2, 3) .delay(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(onNextAction);

subscribeOn

指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程,或者叫做事件产生的线程。

observeOn

指定 Subscriber 所运行的线程,或者叫做事件消费的线程。

timeout

在超时的时候会将源 Observable 转换为备用的 Observable 对象进行发送。

// timeout 操作符,如果源 Observable 对象过了一段时间没有发送数据, timeout 会以 onError 通知终止这个 Observable Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { try { Thread.sleep(i * 100); } catch (InterruptedException e) { e.printStackTrace(); } subscriber.onNext(i); } }}).timeout(200, TimeUnit.MILLISECONDS, Observable.just(100, 200)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(onNextAction);

 

6.  错误操作符

onErrorReturn

会在遇到错误时,停止源 Observable 发送,并调用用户自定义的返回请求,实质上就是调用一次 OnNext 方法进行内容发送后,停止消息发送。

Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { if (i > 3) { subscriber.onError(new Throwable("User Alex Defined Error")); } subscriber.onNext(i); } }}).onErrorReturn(new Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 404; }}).subscribe(onNextAction, onErrorAction, onCompletedAction);

retry

不会将原始 Observable 的 onError 通知传递给观察者,它会重新订阅这个 Observable.

Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { if (i > 1) { subscriber.onError(new Throwable("User Alex Defined Error")); } subscriber.onNext(i); } }}).retry(2).subscribe(onNextAction,onErrorAction,onCompletedAction);

 

7.  布尔操作符

all

对源 Observable 发送的每一个数据根据给定的条件进行判断。如果全部符合条件,返回 true,否则返回 false.

Observable.just(1, 2, 3, 4).all(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer num) { return num > 3; } }).subscribe(onNextAction, onErrorAction, onCompletedAction);

contains

对源 Observable 发送的数据是否包含定义的选项进行判断。如果包含返回 true,否则返回 false.

Observable.just(1, 2, 3, 4).contains(2).subscribe(onNextAction, onErrorAction, onCompletedAction);

isEmpty

对源 Observable 发送的数据是否为空进行判断。如果源 Observable 发送的数据为空返回 true,否则返回 false.

Observable.just(1, 2, 3, 4).isEmpty().subscribe(onNextAction, onErrorAction, onCompletedAction);

exists

对源 Observable 发送的单独一个数据根据给定的条件进行判断。如果有一个数据符合条件,返回 true,否则返回 false.

Observable.just(1, 2, 3, 4).exists(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer num) { return num > 3; } }).subscribe(onNextAction, onErrorAction, onCompletedAction);

sequenceEqual

对两个 Observable 进行判断,两个 Observable 相同时返回 true,否则返回 false. 这里包含两个 Observable 的数据,发射顺序,终止状态是否相同。

Observable.sequenceEqual(Observable.just(1, 2, 3, 4), Observable.just(1)).subscribe(onNextAction, onErrorAction, onCompletedAction);

 

8. 转换操作符

toList

发射多项数据的 Observable 会为每一项数据调用 onNext 方法。你可以用 toList 操作符改变这个行为,让 Observable 将多项数据组合成一个 List,然后调用一次o nNext 方法传递整个列表。

Observable.just(1, 2, 3).toList().subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> numList) { for (Integer i : numList) { Toast.makeText(getActivity(), "i:" + i, Toast.LENGTH_SHORT).show(); } } });

toSortedList

不同于 toList 操作符的是,它会对产生的列表排序,默认是自然升序。

Observable.just(40, 10, 80, 30).toSortedList().subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> numList) { for (Integer i : numList) { Toast.makeText(getActivity(), "i:" + i, Toast.LENGTH_SHORT).show(); } } });

toMap

源 Observable 发送的数据作为键值对中的值,可以提供一个用于生成 Map 的 Key 的函数,然后不同的键存储源 Observable 发送的不同的值。

Observable.just("Hello", "World").toMap(new Func1<String, Integer>() { @Override public Integer call(String s) { return s.equals("Alex")?0:1; } }).subscribe(new Action1<Map<Integer, String>>() { @Override public void call(Map<Integer, String> convertMap) { for (int i = 0; i < convertMap.size(); i++) { Toast.makeText(getActivity(), convertMap.get(i), Toast.LENGTH_SHORT).show(); } } });

 

 

0x005 插件

插件允许你从多个方面修改 RxJava 的默认行为:

通过更改默认计算,I/O 和新线程调度程序的集合;

通过注册处理程序以处理 RxJava 可能遇到的异常错误;

通过注册可以记录一些常规 RxJava 活动发生的函数。

从 1.1.7 版本开始,不推荐使用常规 RxJavaPlugins 和其他挂钩类,而推荐使用 RxJavaHooks.

 

1. RxJavaHooks

新的 RxJavaHooks 允许你进入 Observable, Single 和 Completable 类型的生命周期,由 Scheduler 返回的 Scheduler 并为无法交付的错误提供全部保护。

现在,你可以在运行时更改这些 hook,并且不再需要通过系统参数准备 hook。 由于用户仍可能依赖旧的挂钩系统,因此RxJavaHooks 默认情况下将委托给那些旧的 hook。

RxJavaHooks 具有各种钩子类型的设置器和获取器:

Hook描述
onError : Action1<Throwable>   捕获所有异常的回调  
onObservableCreate : Func1<Observable.OnSubscribe, Observable.OnSubscribe>   Observable 初始化创建时回调  
onObservableStart : Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>   在订阅 observable 实际发生之前调用  
onObservableSubscribeError : Func1<Throwable, Throwable>   订阅 observable 失败时调用  
onObservableReturn : Func1<Subscription, Subscription>   订阅成功后且在返回订阅处理程序之前调用  
onObservableLift : Func1<Observable.Operator, Observable.Operator>  

当 lift 与 observable 一起操作时回调

 
onSingleCreate : Func1<Single.OnSubscribe, Single.OnSubscribe>  

在单个实例上实例化运算符和源时调用

 
onSingleStart : Func2<Single, Observable.OnSubscribe, Observable.OnSubscribe>   在单个订阅发生之前调用  
onSingleSubscribeError : Func1<Throwable, Throwable>   单个订阅失败时调用  
onSingleReturn : Func1<Subscription, Subscription>   订阅成功时调用此方法,并在返回其订阅处理程序之前调用  
onSingleLift : Func1<Observable.Operator, Observable.Operator>   在使用 lift 操作时调用  
onCompletableCreate : Func1<Completable.OnSubscribe, Completable.OnSubscribe>   在完成时实例化运算符和源时调用  
onCompletableStart : Func2<Completable, Completable.OnSubscribe, Completable.OnSubscribe>   在订阅实际完成之前调用  
onCompletableSubscribeError : Func1<Throwable, Throwable>   订阅完成失败时调用  
onCompletableLift : Func1<Completable.Operator, Completable.Operator>   当 lift 与可完成一起使用时调用  
onComputationScheduler : Func1<Scheduler, Scheduler>   当使用 Schedulers.computation()时调用  
onIOScheduler : Func1<Scheduler, Scheduler>   当使用 Schedulers.io() 时调用  
onNewThreadScheduler : Func1<Scheduler, Scheduler>   当使用 Schedulers.newThread() 是调用  
onScheduleAction : Func1<Action0, Action0>   在调度安排任务时调用  
onGenericScheduledExecutorService : Func0<ScheduledExecutorService>   返回单线程执行程序以支持 RxJava 本身的后台定时任务时调用  

比如:

RxJavaHooks.setOnObservableCreate(o -> { System.out.println("Creating " + o.getClass()); return o; }); try { Observable.range(1, 10) .map(v -> v * 2) .filter(v -> v % 4 == 0) .subscribe(System.out::println); } finally { RxJavaHooks.reset(); }

 

2. RxJavaErrorHandler

此插件让你可以注册一个函数,该函数将处理传递给 SafeSubscriber.onError (Throwable) 的错误。 (一个调用 subscribe() 时,SafeSubscriber 用于包装传入的 Subscriber)。 为此,扩展 RxJavaErrorHandler 并重写此方法:

void handleError(Throwable e)

然后,请按照下列步骤操作:

创建一个已实现的新 RxJavaErrorHandler 子类的对象。

通过 RxJavaPlugins.getInstance() 获取全局 RxJavaPlugins 实例。

将错误处理程序对象传递到该实例的 registerErrorHandler() 方法。

当你执行此操作时,RxJava 将开始使用你的错误处理程序来处理传递给 SafeSubscriber.onError(Throwable) 的错误。

RxJavaPlugins.getInstance().reset(); RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { @Override public void handleError(Throwable e) { e.printStackTrace(); } }); Observable.error(new IOException()) .subscribe(System.out::println, e -> { });

 

3. RxJavaObservableExecutionHook

该插件允许你注册在某些常规 RxJava 活动上调用的函数,例如,用于日志记录或度量标准收集目的。 为此,请扩展RxJavaObservableExecutionHook 并重写以下任何或所有方法:

方法回调时机
onCreate( )   Observable.create( ) 创建期间  
onSubscribeStart( )   Observable.subscribe( ) 之前  
onSubscribeReturn( )   Observable.subscribe( ) 之后  
onSubscribeError( )   Observable.subscribe( ) 执行失败时  
onLift( )   Observable.lift( ) 之间  

请按照下列步骤操作:

创建已实现的新 RxJavaObservableExecutionHook 子类的对象。

通过 RxJavaPlugins.getInstance() 获取全局 RxJavaPlugins 实例。

将执行 hook 对象传递给该实例的 registerObservableExecutionHook() 方法。

首页
评论
分享
Top