本文是对 RxJava 官方文档的翻译,整理,让我们一起从 0 到 1 走进 RXJava 的世界。
{@see: https://github.com/ReactiveX/RxJava/wiki} {@see: https://www.jianshu.com/p/d997805b37d4}0x001 首页
RxJava 是 ReactiveX (Reactive Extensions) 的 Java VM 实现:用于使用可观察序列编写异步和基于事件的程序库。
RxJava 是轻量级的:只有一个简单的 jar 包,聚焦在可观察的抽象和相关的高阶函数。
RxJava 是一个多线程实现:RxJava 支持 Java 6 或更高版本,同时支持基于 JVM 的语言,如 Groovy、Clojure、JRuby、Kotlin 和 Scala。
0x002 如何引入
maven:
<dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.2.0</version> </dependency>gradle:
compile 'io.reactivex.rxjava2:rxjava:2.2.0'Ivy:
<dependency org="io.reactivex.rxjava2" name="rxjava" rev="2.2.0" />当然也可以下载 RxJava 的源码自己构建:
$ git clone git@github.com:ReactiveX/RxJava.git $ cd RxJava/ $ ./gradlew build $ ./gradlew build :rxjava:compileJava :rxjava:processResources UP-TO-DATE :rxjava:classes :rxjava:jar :rxjava:sourcesJar :rxjava:signArchives SKIPPED :rxjava:assemble :rxjava:licenseMain UP-TO-DATE :rxjava:licenseTest UP-TO-DATE :rxjava:compileTestJava :rxjava:processTestResources UP-TO-DATE :rxjava:testClasses :rxjava:test :rxjava:check :rxjava:build BUILD SUCCESSFUL Total time: 30.758 secs0x003 开始使用
RxJava 的 examples 中提供了多种语言的演示,如 Java, Groovy, Clojure, Scala,本文以 Java 为演示语言。
public static void main(String[] args) { String[] input = new String[] { "rxjava", "ReactiveX" }; // Java 8 lambdas Flowable.fromArray(input).subscribe(s -> { System.out.println("Hello " + s + "!"); }); // doesn't support Java 8 lambdas (yet) Flowable.fromArray(input).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("Hello " + s + "!"); } }); }输出:
Hello rxjava! Hello ReactiveX!
要使用 RxJava,你需要创建 Observables (发出数据项),以各种方式转换这些 Observables (通过使用 Observable 运算符) 以获得你感兴趣的精确数据项。然后观察这些有趣项的序列并对其做出反应 (通过实现 Observers) 或订阅者,然后将其订阅到转换后的 Observable 中。
1. 从现有数据结构创建可观察对象
你可以使用 Observable 的 just() 和 from() 方法将对象,对象列表或对象数组转换为发出这些对象的 Observables.
Observable<String> o1 = Observable.fromArray("a", "b", "c"); List list = new ArrayList<>(); list.add("1"); list.add("2"); Observable<Integer> o2 = Observable.fromIterable(list); Observable<String> o3 = Observable.just("one object");对于要由 Observable 发出的每个项目,这些转换后的 Observable 将同步调用预订它们的任何订户的 onNext() 方法,然后将调用订户的 onCompleted() 方法。
2. 通过 create() 方法创建一个 Observable
通过设计自己的 Observable 并使用 create() 方法实现它,你可以实现异步 IO,计算操作,甚至 "无限" 数据流。
public static void main(String[] args) { customObservableBlocking().subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { } @Override public void onNext(String s) { System.out.println("onNext: " + s); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { System.out.println("onComplete"); } }); } private static Observable<String> customObservableBlocking() { return Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { for (int i = 0; i < 50; i++) { observableEmitter.onNext("value_" + i); } observableEmitter.onComplete(); } }); }输出:
onNext: value_0 onNext: value_1 onNext: value_2 ... onNext: value_49 onComplete
3. 用运算符转换可观测对象
RxJava 允许你将运算符链接在一起以变换和组成 Observable,比如还是上面的例子,需要跳过前 10 项,接受下 5 项,然后对其进行转换 (映射 (...))在订阅和打印项目之前:
customObservableBlocking().skip(10).take(5) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { // 遍历的单个结果 System.out.println("map: " + s); return s; } }) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { } @Override public void onNext(String s) { System.out.println("onNext: " + s); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { System.out.println("onComplete"); } });输出:
map: value_10 onNext: value_10 map: value_11 onNext: value_11 map: value_12 onNext: value_12 map: value_13 onNext: value_13 map: value_14 onNext: value_14 onComplete下面是上述转换过程图:
0x004 操作符
RxJava 的操作符非常,这边对操作符做了一个分类:
创建操作符
变换操作符
过滤操作符
组合操作符
辅助操作符
错误处理操作符
布尔操作符
转换操作符
1. 创建操作符
create
关于 create 操作符,上面的 "开始使用" 已经讲解过了。
from
可以通过各种类型的数据源创建 Observable.
Observable<String> o1 = Observable.fromArray("a", "b", "c"); List list = new ArrayList<>(); list.add("1"); list.add("2"); Observable<Integer> o2 = Observable.fromIterable(list);just
just 操作符,创建将逐个内容进行发送的 Observable,其内部将多个参数发送的内容转换为一个数组,然后将数组通过 from 操作符进行发送。
Observable.just("Hello", "World!").subscribe(onNextAction);interval
创建以 %time 为时间间隔发送整数序列的 Observable, 这个序列为一个无限递增的整数序列。interval 默认在 computation 调度器上执行,你也可以传递一个可选的 Scheduler 参数来指定调度器。
Observable.interval(1, TimeUnit.SECONDS, AndroidSchedulers.mainThread()).subscribe(onNextAction);range
创建以发送范围内的整数序列的 Observable:
Observable.range(0, 5).subscribe(onNextAction);repeat
创建一个以 N 次重复发送数据的 Observable.
Observable.range(0, 5).repeat(3).subscribe(onNextAction);
2. 变换操作符
map
将源 Observable 发送的数据转换为一个新的 Observable 对象。Func1 和 Action 的区别在于,Func1 包装的是有返回值的方法。
Observable.just("Hello", "World!").map(new Func1<String, String>() { @Override public String call(String s) { return "Hi: " + s; } }).subscribe(onNextAction);flatMap
源 Observable 通过 flatMap 操作符转换为包含源 Observable 发送的所有子条目的 Observable 集合,然后从 Observable 集合中逐个取出转化为单个 Observable 对象进行发送,不同于 map 操作符的一点就是一对多的转化。
它的应用场景可以体现在源 Observable 发送的内容为一个复杂的数据集,例如一个 Bean 对象,而该外层 Bean 对象中一个成员变量为另一个内层 Bean 对象,我们想要拆解外层 Bean 对象获取内层 Bean 对象,就可以用 flatMap 操作符。注意:flatMap 对这些 Observables 发射的数据做的是合并 (merge) 操作,因此它们可能是交错的。
String[] sources = {"Hello", "World"}; // flatMap 的合并运行允许交叉,允许交错的发送事件 Observable.from(sources).flatMap(new Func1<String, Observable<String>>() { @Override public Observable<String> call(String s) { return Observable.just("Hi:" + s); } }).subscribe(onNextAction);concatMap
类似于 flatMap 操作符,不同的一点是它按次序连接。
// 解决了flatMap的交叉问题,将发送的数据连接发送 String[] sources = {"Hello", "RxJava", "World!"}; Observable.from(sources).concatMap(new Func1<String, Observable<String>>() { @Override public Observable<String> call(String s) { return Observable.just("Hi:" + s); } }).subscribe(onNextAction);buffer
将原有 Observable 转换为一个新的 Observable,这个新的 Observable 每次发送一组值,而不是一个个进行发送,我们可以定义这个新的 Observable 存放几个原有的 Observable 对象。
// 将原有 Observable 转换为一个新的 Observable,这个新的 Observable 每次发送一组值,而不是一个个进行发送 Observable.just(1, 2, 3, 4, 5, 6).buffer(3).subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> list) { // handle list } });
3. 过滤操作符
filter
filter 默认不在任何特定的调度器上执行。
Observable.range(0, 5).filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer num) { // 自定义的条件,只有符合条件的结果才会提交给观察者 return num > 2; } }).subscribe(onNextAction);elementAt
elementAt 操作符获取原始 Observable 发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。
// elementAt 操作符,用于返回指定位置后一位的数据,即脚标 +1 的数据 // 在这里发送 0、1、2、3、4,脚标为 3 的数据为 2,发送其后一位数据 3 Observable.range(0, 5).elementAt(3).subscribe(onNextAction);distinct
只允许还没有发射过的数据项通过。
// distinct 操作符,用于 Observable 发送的元素的去重 Observable.just(1, 1, 2, 2, 2, 3).distinct().subscribe(onNextAction);skip
抑制 Observable 发射的前 N 项数据,只发送后 N 项数据。
//skip 操作符,用于 Observable 发送的元素前 N 项去除掉 Observable.range(0, 5).skip(2).subscribe(onNextAction);take
用于 Observable 发送的元素只取前 N 项
Observable.range(0, 5).take(2).subscribe(onNextAction);
4. 组合操作符
组合操作符用于将多个 Observable 组合成一个单一的 Observable.
startWith
会在发送的数据序列前插入数据序列,并且会发送插入的数据序列。
Observable.range(3, 5).startWith(0, 10086).subscribe(onNextAction);merge
将多个 Observable 对象进行合并,需要注意的是:merge 可能会让合并的 Observables 发射的数据交错。
在这里将 firstObservable 指定在 IO 线程中进行发送,secondObservable 没有指定线程,两者合并然后发送数据时便会产生数据交错的现象。
concat
concat 操作符不同于 merge 操作符的区别就是:会将多个 Observable 对象合并到一个 Observable 对象中进行发送,严格按照顺序进行发送。
// concat 操作符,会将多个 Observable 对象合并到一个 Observable 对象中进行发送,严格按照顺序进行发送 Observable<Integer> firstObservable = Observable.just(0, 1, 2).subscribeOn(Schedulers.io()); Observable<Integer> secondObservable = Observable.just(3, 4, 5); Observable.concat(firstObservable, secondObservable) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(onNextAction);zip
返回一个 Obversable,它使用这个函数按顺序结合两个或多个 Observables 发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序进行数据发送。它只发射与发射数据项最少的那个 Observable 一样多的数据。
// zip 操作符,会将多个 Observable 对象转换成一个 Observable 对象然后进行发送,转换关系可根据需求自定义 Observable<Integer> integerObservable = Observable.range(0, 4); Observable<String> stringObservable = Observable.just("a", "b", "c", "d"); Observable.zip(integerObservable, stringObservable, new Func2<Integer, String, String>() { @Override public String call(Integer num, String info) { // 在这里的转换关系为将数字与字串内容进行拼接 return "数字为:" + num + ", 字符为:" + info; } }).subscribe(onNextAction);
5. 辅助操作符
delay
可以让源 Observable 对象发送数据之前暂停一段指定的时间。
Observable.just(1, 2, 3) .delay(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(onNextAction);subscribeOn
指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程,或者叫做事件产生的线程。
observeOn
指定 Subscriber 所运行的线程,或者叫做事件消费的线程。
timeout
在超时的时候会将源 Observable 转换为备用的 Observable 对象进行发送。
// timeout 操作符,如果源 Observable 对象过了一段时间没有发送数据, timeout 会以 onError 通知终止这个 Observable Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { try { Thread.sleep(i * 100); } catch (InterruptedException e) { e.printStackTrace(); } subscriber.onNext(i); } }}).timeout(200, TimeUnit.MILLISECONDS, Observable.just(100, 200)) .observeOn(AndroidSchedulers.mainThread()) .subscribe(onNextAction);
6. 错误操作符
onErrorReturn
会在遇到错误时,停止源 Observable 发送,并调用用户自定义的返回请求,实质上就是调用一次 OnNext 方法进行内容发送后,停止消息发送。
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { if (i > 3) { subscriber.onError(new Throwable("User Alex Defined Error")); } subscriber.onNext(i); } }}).onErrorReturn(new Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 404; }}).subscribe(onNextAction, onErrorAction, onCompletedAction);retry
不会将原始 Observable 的 onError 通知传递给观察者,它会重新订阅这个 Observable.
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { if (i > 1) { subscriber.onError(new Throwable("User Alex Defined Error")); } subscriber.onNext(i); } }}).retry(2).subscribe(onNextAction,onErrorAction,onCompletedAction);
7. 布尔操作符
all
对源 Observable 发送的每一个数据根据给定的条件进行判断。如果全部符合条件,返回 true,否则返回 false.
Observable.just(1, 2, 3, 4).all(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer num) { return num > 3; } }).subscribe(onNextAction, onErrorAction, onCompletedAction);contains
对源 Observable 发送的数据是否包含定义的选项进行判断。如果包含返回 true,否则返回 false.
Observable.just(1, 2, 3, 4).contains(2).subscribe(onNextAction, onErrorAction, onCompletedAction);isEmpty
对源 Observable 发送的数据是否为空进行判断。如果源 Observable 发送的数据为空返回 true,否则返回 false.
Observable.just(1, 2, 3, 4).isEmpty().subscribe(onNextAction, onErrorAction, onCompletedAction);exists
对源 Observable 发送的单独一个数据根据给定的条件进行判断。如果有一个数据符合条件,返回 true,否则返回 false.
Observable.just(1, 2, 3, 4).exists(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer num) { return num > 3; } }).subscribe(onNextAction, onErrorAction, onCompletedAction);sequenceEqual
对两个 Observable 进行判断,两个 Observable 相同时返回 true,否则返回 false. 这里包含两个 Observable 的数据,发射顺序,终止状态是否相同。
Observable.sequenceEqual(Observable.just(1, 2, 3, 4), Observable.just(1)).subscribe(onNextAction, onErrorAction, onCompletedAction);
8. 转换操作符
toList
发射多项数据的 Observable 会为每一项数据调用 onNext 方法。你可以用 toList 操作符改变这个行为,让 Observable 将多项数据组合成一个 List,然后调用一次o nNext 方法传递整个列表。
Observable.just(1, 2, 3).toList().subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> numList) { for (Integer i : numList) { Toast.makeText(getActivity(), "i:" + i, Toast.LENGTH_SHORT).show(); } } });toSortedList
不同于 toList 操作符的是,它会对产生的列表排序,默认是自然升序。
Observable.just(40, 10, 80, 30).toSortedList().subscribe(new Action1<List<Integer>>() { @Override public void call(List<Integer> numList) { for (Integer i : numList) { Toast.makeText(getActivity(), "i:" + i, Toast.LENGTH_SHORT).show(); } } });toMap
源 Observable 发送的数据作为键值对中的值,可以提供一个用于生成 Map 的 Key 的函数,然后不同的键存储源 Observable 发送的不同的值。
Observable.just("Hello", "World").toMap(new Func1<String, Integer>() { @Override public Integer call(String s) { return s.equals("Alex")?0:1; } }).subscribe(new Action1<Map<Integer, String>>() { @Override public void call(Map<Integer, String> convertMap) { for (int i = 0; i < convertMap.size(); i++) { Toast.makeText(getActivity(), convertMap.get(i), Toast.LENGTH_SHORT).show(); } } });
0x005 插件
插件允许你从多个方面修改 RxJava 的默认行为:
通过更改默认计算,I/O 和新线程调度程序的集合;
通过注册处理程序以处理 RxJava 可能遇到的异常错误;
通过注册可以记录一些常规 RxJava 活动发生的函数。
从 1.1.7 版本开始,不推荐使用常规 RxJavaPlugins 和其他挂钩类,而推荐使用 RxJavaHooks.
1. RxJavaHooks
新的 RxJavaHooks 允许你进入 Observable, Single 和 Completable 类型的生命周期,由 Scheduler 返回的 Scheduler 并为无法交付的错误提供全部保护。
现在,你可以在运行时更改这些 hook,并且不再需要通过系统参数准备 hook。 由于用户仍可能依赖旧的挂钩系统,因此RxJavaHooks 默认情况下将委托给那些旧的 hook。
RxJavaHooks 具有各种钩子类型的设置器和获取器:
Hook描述当 lift 与 observable 一起操作时回调
在单个实例上实例化运算符和源时调用
比如:
RxJavaHooks.setOnObservableCreate(o -> { System.out.println("Creating " + o.getClass()); return o; }); try { Observable.range(1, 10) .map(v -> v * 2) .filter(v -> v % 4 == 0) .subscribe(System.out::println); } finally { RxJavaHooks.reset(); }
2. RxJavaErrorHandler
此插件让你可以注册一个函数,该函数将处理传递给 SafeSubscriber.onError (Throwable) 的错误。 (一个调用 subscribe() 时,SafeSubscriber 用于包装传入的 Subscriber)。 为此,扩展 RxJavaErrorHandler 并重写此方法:
void handleError(Throwable e)
然后,请按照下列步骤操作:
创建一个已实现的新 RxJavaErrorHandler 子类的对象。
通过 RxJavaPlugins.getInstance() 获取全局 RxJavaPlugins 实例。
将错误处理程序对象传递到该实例的 registerErrorHandler() 方法。
当你执行此操作时,RxJava 将开始使用你的错误处理程序来处理传递给 SafeSubscriber.onError(Throwable) 的错误。
RxJavaPlugins.getInstance().reset(); RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() { @Override public void handleError(Throwable e) { e.printStackTrace(); } }); Observable.error(new IOException()) .subscribe(System.out::println, e -> { });
3. RxJavaObservableExecutionHook
该插件允许你注册在某些常规 RxJava 活动上调用的函数,例如,用于日志记录或度量标准收集目的。 为此,请扩展RxJavaObservableExecutionHook 并重写以下任何或所有方法:
方法回调时机请按照下列步骤操作:
创建已实现的新 RxJavaObservableExecutionHook 子类的对象。
通过 RxJavaPlugins.getInstance() 获取全局 RxJavaPlugins 实例。
将执行 hook 对象传递给该实例的 registerObservableExecutionHook() 方法。