Rxjava2总结
文章目录
Rxjava2基础认知
形式正确的有限Observable
调用观察者的onCompleted正好一次或者它的onError正好一次,而且此后不能再调用观察者的任何其它方法。如果onComplete 或者 onError 走任何一个 都会 主动解除订阅关系;- 如果解除订阅关系以后在发射 onError 则会 报错;而发射onComplete则不会。
- 注意解除订阅关系 还是可以发射 onNext
Disposable类:
- dispose():主动解除订阅
- isDisposed():查询是否解除订阅 true 代表 已经解除订阅
CompositeDisposable类:可以快速解除所有添加的Disposable类
每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可快速解除.
1 | CompositeDisposable compositeDisposable=new CompositeDisposable(); |
基础概念
- Scheduler scheduler
timer() alt+点击timer可查看 关于timer的方法 可以看到时候有这个参数的变体!
- Callable bufferSupplier:自定义装载的容器
1 | Observable.range(1, 10) |
创建操作
- create : 创建一个具有发射能力的Observable
1 | Observable.create(e -> { |
- just:只是简单的原样发射,可将数组或Iterable当做单个数据。它接受一至九个参数
1 | Observable.just("Love", "For", "You!") |
- empty:创建一个不发射任何数据但是正常终止的Observable
- never:创建一个不发射数据也不终止的Observable
- error:创建一个不发射数据以一个错误终止的Observable
1 | Observable.empty(); |
- timer 在延迟一段给定的时间后发射一个简单的数字0
1 | Observable.timer(1000, TimeUnit.MILLISECONDS) |
- range:
- start:起始值
- count:一个是范 围的数据的数目。0不发送 ,负数 异常
1 | Observable.range(5, 3) |
- intervalRange
- start,count:同range
- initialDelay 发送第一个值的延迟时间
- period 每两个发射物的间隔时间
- unit,scheduler 额你懂的
1 | Observable.intervalRange(5, 100, 3000, 100, |
- interval:相当于intervalRange的start=0;
period 这个值一旦设定后是不可变化的
1 | //period 以后的美每次间隔 这个值一旦设定后是不可变化的 所以 count方法无效的! |
- defer 直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable
1 | Observable.defer(() -> Observable.just("Love", "For", "You!")) |
from系列
fromArray
1
2
3Integer[] items = {0, 1, 2, 3, 4, 5};
Observable.fromArray(items).subscribe(
integer -> System.out.println(integer));fromCallable
1
2Observable.fromCallable(() -> Arrays.asList("hello", "gaga"))
.subscribe(strings -> System.out.println(strings))fromIterable
1
2Observable.fromIterable(Arrays.<String>asList("one", "two", "three"))
.subscribe(integer -> System.out.println(integer));fromFuture
1
2
3Observable.fromFuture(Observable.just(1).toFuture())
.doOnComplete(() -> System.out.println("complete"))
.subscribe();
过滤操作
- elementAt:只发射第N项数据
1 | <!-- 无默认值版本 --> |
IgnoreElements:如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知
1
2
3
4Observable.range(0, 10)
.ignoreElements()
.subscribe(() -> System.out.println("complete")
, throwable -> System.out.println("throwable"));take系列
变体 count系列:只发射前面的N项数据
1
2
3Observable.range(0,10)
.take(3)
.subscribe(o -> System.out.print(o + "\t"))变体 time系列: 发射Observable开始的那段时间发射 的数据,
1
2
3Observable.range(0,10)
.take(100, TimeUnit.MILLISECONDS)
.subscribe(o -> System.out.print(o + "\t"));
takeLast
变体 count系列:只发射后面的N项数据
1
2
3Observable.range(0,10)
.takeLast(3)
.subscribe(o -> System.out.print(o + "\t"));变体 time系列: 发射在原始Observable的生命周 期内最后一段时间内发射的数据
1
2
3Observable.range(0,10)
.takeLast(100, TimeUnit.MILLISECONDS)
.subscribe(o -> System.out.print(o + "\t"));
takeUntil:发送complete的结束条件 当然发送结束之前也会包括这个值
1 | Observable.just(2,3,4,5) |
- takeWhile:当不满足这个条件 会发送结束 不会包括这个值
1 | Observable.just(2,3,4,5) |
skip系列
变体 count系列:丢弃Observable发射的前N项数据
1
2
3Observable.range(0,5)
.skip(3)
.subscribe(o -> System.out.print(o + "\t"));变体 time系列:丢弃原始Observable开始的那段时间发 射的数据
1
2
3Observable.range(0,5)
.skip(3)
.subscribe(o -> System.out.print(o + "\t"));
skipLast
变体 count系列:丢弃Observable发射的前N项数据
1
2
3Observable.range(0,5)
.skipLast(3)
.subscribe(o -> System.out.print(o + "\t"));变体 time系列:丢弃在原始Observable的生命周 期内最后一段时间内发射的数据
1
2
3Observable.range(0,10)
.skipLast(100, TimeUnit.MILLISECONDS)
.subscribe(o -> System.out.print(o + "\t"));
distinct:去重
keySelector:这个函数根据原始Observable发射的数据项产生一个 Key,然后,比较这些Key而不是数据本身,来判定两个数据是否是不同的
1
2
3
4
5
6
7
8Observable.just(1, 2, 1, 2, 3)
//这个函数根据原始Observable发射的数据项产生一个 Key,
// 然后,比较这些Key而不是数据本身,来判定两个数据是否是不同的
.distinct(integer -> Math.random())
.subscribe(o -> System.out.print(o + "\t"));
日志:
原因 key不同 所以当做数据不同处理
1 2 1 2 3无参版本 就是内部实现了的keySelector通过生成的key就是value本身
1
2
3
4
5Observable.just(1, 2, 1, 2, 3)
.distinct()
.subscribe(o -> System.out.print(o + "\t"));
日志:
1 2 3
distinctUntilChanged(相邻去重):它只判定一个数据和它的直接前驱是 否是不同的。
其他概念与distinct一样
throttleWithTimeout/debounce:
操作符会过滤掉发射速率过快的数据项
throttleWithTimeout/debounce: 含义相同
如果发送数据后 指定时间段内没有新数据的话 。则发送这条
如果有新数据 则以这个新数据作为将要发送的数据项,并且重置这个时间段,重新计时。
1 | Observable.create(e -> { |
filter:只发射通过了谓词测试的数据项
1
2
3
4Observable.range(0, 10)
//过滤掉false的元素
.filter(integer -> integer % 2 == 0)
.subscribe(o -> System.out.print(o + "\t"));ofType:ofType 是 filter 操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据
1 | Observable.just(0, "what?", 1, "String", 3) |
- first:只发射第一项(或者满足某个条件的第一项)数 感觉和take(1) elementAt(0)差不多
1 | Observable.range(0, 10) |
- last:只发射最后一项(或者满足某个条件的最后一项)数据 感觉和takeLast(1)差不多
1 | Observable.empty() |
- sample/throttleLast: 周期采样后 发送最后的数据
- throttleFirst:周期采样 的第一条数据 发送
注意: 如果是已经被发送过的 则不会继续发送
1 | Observable.create(e -> { |
辅助操作
- repeat:不是创建一个Observable,而是重复发射原始,Observable的数据序列,这个序列或者是无限的,或者通过 repeat(n) 指定重复次数
1 | Observable.just("Love", "For", "You!") |
- repeatUntil:getAsBoolean 如果返回 true则不repeat false则repeat.主要用于动态控制
1 | Observable.just("Love", "For", "You!") |
- delay:延迟一段指定的时间再发射来自Observable的发射物
注意:
delay 不会平移 onError 通知,它会立即将这个通知传递给订阅者,同时丢弃任何待 发射的 onNext 通知。
然而它会平移一个 onCompleted 通知
1 | Observable.range(0, 3) |
- delaySubscription:让你你可以延迟订阅原始Observable
1 | Observable.just(1) |
do系列
doOnEach:注册一个回调,它产生的Observable每发射一项数据就会调用它一次
1
2
3
4
5
6
7
8
9Observable.range(0, 3)
.doOnEach(integerNotification -> System.out.println(integerNotification.getValue()))
.subscribe(o -> System.out.print("===>" + o + "\t"));
日志:
doOnEach:
doOnEach:0===>0
doOnEach:1===>1
doOnEach:2===>2
doOnEach:nulldoOnNext:注类似doOnEach 不是接受一个 Notification 参数,而是接受发射的数据项。
1
2
3
4
5
6
7
8
9
10
11Observable.range(0, 3)
.doOnNext(integer -> {
if (integer == 2)
throw new Error("O__O");
System.out.print(integer);
})
.subscribe(o -> System.out.print("===>" + o + "\t")
, throwable -> System.out.print("===>throwable")
, () -> System.out.print("===>complete"));
日志:
0===>0 1===>1 ===>throwabledoOnSubscribe:注册一个动作,在观察者订阅时使用
1
2
3
4
5Observable.range(0, 3)
.doOnSubscribe(disposable -> System.out.print("开始订阅"))
.subscribe(o -> System.out.print("===>" + o + "\t"));
日志:
开始订阅===>0 ===>1 ===>2doOnComplete:注册一个动作,在观察者OnComplete时使用
1
2
3
4
5Observable.range(0, 3)
.doOnComplete(() -> System.out.print("doOnComplete"))
.subscribe(o -> System.out.print("===>" + o + "\t"));
日志:
===>0 ===>1 ===>2 doOnCompletedoOnError:注册一个动作,在观察者doOnError时使用
1
2
3
4
5
6Observable.error(new Throwable("?"))
.doOnError(throwable -> System.out.print("throwable"))
.subscribe(o -> System.out.print("===>" + o + "\t"));
日志:
异常信息....
throwabledoOnTerminate:注册一个动作,Observable终止之前会被调用,无论是正 常还是异常终止。
1
2
3
4
5Observable.range(0, 3)
.doOnTerminate(() -> System.out.print("\t doOnTerminate"))
.subscribe(o -> System.out.print("===>" + o + "\t"));
日志:
===>0 ===>1 ===>2 doOnTerminatedoFinally:注册一个动作,当它产生的Observable终止之后会被调用,无论是正常还 是异常终止。在doOnTerminate之后执行
1
2
3
4
5
6Observable.range(0, 3)
.doFinally(() -> System.out.print("\t doFinally"))
.doOnTerminate(() -> System.out.print("\t doOnTerminate"))
.subscribe(o -> System.out.print("===>" + o + "\t"));
日志:
===>0 ===>1 ===>2 doOnTerminate doFinallydoOnDispose:注册一个动作,当【观察者取消】订阅它生成的Observable它就会被调
注意:貌似需要在 为出现complete和error的时候 dispose才会触发 ~
1
2
3
4
5
6
7Disposable ab = Observable.interval(1, TimeUnit.SECONDS)
.take(3)
.doOnDispose(() -> System.out.println("解除订阅"))
.subscribe(o -> System.out.print("===>" + o + "\t"));
ab.dispose();
日志:
解除订阅
materialize:将数据项和事件通知都当做数据项发射
dematerialize:materialize相反
1 | Observable.range(0, 3) |
- observeOn:指定一个观察者在哪个调度器上观察这个Observable
- subscribeOn:指定Observable自身在哪个调度器上执行
注意 遇到错误 会立即处理而不是等待下游还没观察的数据
既onError 通知会跳到(并吞掉)原始Observable发射的数据项前面
1 | Observable.range(0, 3) |
subscribe:操作来自Observable的发射物和通知
1
2
3
4
5
6
7Javadoc: subscribe()
Javadoc: subscribe(onNext)
Javadoc: subscribe(onNext,onError)
Javadoc: subscribe(onNext,onError,onComplete)
Javadoc: subscribe(onNext,onError,onComplete,onSubscribe)
Javadoc: subscribe(Observer)
Javadoc: subscribe(Subscriber)foreach:forEach 方法是简化版的 subscribe ,你同样可以传递一到三个函数给它,解释和传递给 subscribe 时一样
不同的是,你无法使用 forEach 返回的对象取消订阅。也没办法传递一个可以用于取消订阅 的参数
1 | Observable.range(0, 3) |
serialize:保证上游下游同一线程 ,防止不同线程下 onError 通知会跳到(并吞掉)原始Observable发射的数据项前面的错误行为
1
2
3Observable.range(0, 3)
.serialize()
.subscribe(o -> System.out.print("===>" + o + "\t"));Timestamp:它将一个发射T类型数据的Observable转换为一个发射类型 为Timestamped
的数据的Observable,每一项都包含数据的原始发射时间 1
2
3
4
5
6
7
8
9
10
11
12
13Observable.interval(100, TimeUnit.MILLISECONDS)
.take(3)
.timestamp()
.subscribe(o -> System.out.println("===>" + o + "\t")
, throwable -> System.out.println("===> throwable")
, () -> System.out.println("===> complete")
, disposable -> System.out.println("===> 订阅"));
日志:
===> 订阅
===>Timed[time=1501224256554, unit=MILLISECONDS, value=0]
===>Timed[time=1501224256651, unit=MILLISECONDS, value=1]
===>Timed[time=1501224256751, unit=MILLISECONDS, value=2]
===> completetimeInterval:一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable
1 | Observable.interval(100, TimeUnit.MILLISECONDS) |
timeout
变体:过了一个指定的时长仍没有发射数据(不是仅仅考虑第一个),它会发一个错误
1
2
3
4
5
6
7
8
9
10Observable.interval(100, TimeUnit.MILLISECONDS)
// 过了一个指定的时长仍没有发射数据(不是仅仅考虑第一个),它会发一个错误
.timeout(50, TimeUnit.MILLISECONDS)
.subscribe(o -> System.out.println("===>" + o + "\t")
, throwable -> System.out.println("===>timeout throwable")
, () -> System.out.println("===>timeout complete")
, disposable -> System.out.println("===>timeout 订阅"));
timeout:
===>timeout 订阅
===>timeout throwable变体 备用Observable:过了一个指定的时长仍没有发射数据(不是仅仅考虑第一个),它会发一个错误
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15Observable<Integer> other;
Observable.empty()
// 过了一个指定的时长仍没有发射数据(不是仅仅考虑第一个),他会用备用Observable 发送数据,本身的会发送一个compelte
.timeout(50, TimeUnit.MILLISECONDS, other = Observable.just(2, 3, 4))
.subscribe(o -> System.out.println("===>" + o + "\t")
, throwable -> System.out.println("===>timeout2 throwable")
, () -> System.out.println("===>timeout2 complete")
, disposable -> System.out.println("===>timeout2 订阅"));
other.subscribe(o -> System.out.println("k ===>" + o + "\t"));
timeout2:
===>timeout2 订阅
===>timeout2 complete
k ===>2
k ===>3
k ===>4
变换操作
- map:对Observable发射的每一项数据应用一个函数,执行变换操作,就是方形过渡到圆形
1 | Observable.just(1,2) |
- flatMap: 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable
- mapper:根据发射数据映射成Observable
- combiner: 用来合并 的
注意:FlatMap 对这些Observables发射的数据做的是合并( merge )操作,因此它们可能是交 错的。
1 | Observable.just(1, 2, 3) |
- concatMap:类似FlatMap但是保证顺序 因为没有合并操作!
1 | Observable.just(1, 2, 3) |
- cast:在发射之前强制将Observable发射的所有数据转换为指定类型
1 | Observable.just(1, 2, "string") |
- groupBy:通过keySelector的apply的值当做key 进行分组,发射GroupedObservable(有getKey()方法)的group 通过group继续订阅取得其组内的值;
- keySelector:通过这个的返回值 当做key进行分组
- valueSelector:value转换
1 | Observable.range(0, 10) |
- window: 依照此范例 每三秒收集,Observable在此时间内发送的值。组装成Observable发送出去。
1 | Observable.interval(1, TimeUnit.SECONDS).take(7) |
- scan:连续地对数据序列的每一项应用一个函数,然后连续发射结果
感觉就是发送一个有 累加(函数) 过程序列
* initialValue(可选) 其实就是放到 原始数据之前发射。
* a 原始数据的中的值
* b 则是最后应用scan函数后发送的值
1 | Observable.just(1, 4, 2) |
buffer系列
变体 count系列
1
2
3
4* 范例:发射[1-10]
* buffer count 2 skip 1,结果 [1,2] [2,3] [3,4] 3=2*1+1
* buffer count 2 skip 2,结果 [1,2] [3,4] [5,6] 5=2*2+1
* buffer count 2 skip 3,结果 [1,2] [4,5] [7,8] 7=2*3+1;count:缓存的数量
skip:每个缓存创建的间隔数量
则代表 每次初始偏移量 每次真正的起始值=fistValue+skip*skipCount;
注意skip不能小于0
可以小于count这样就会导致每个发送的list之间的值会有重复
可以大于count这样就会导致每个发送的list之间的值和原有的值之间会有遗漏
可以等于count就你懂的了bufferSupplier:自定义缓存装载的容器;
1
2
3
4
5
6
7
8Observable.range(1, 10)
.buffer(2, 1,() -> new ArrayList<>())//有默认的装载器
<!-- 其他方法 -->
<!-- .buffer(2)//skip 默认和count一样 -->
<!-- .buffer(2, () -> new ArrayList<>())-->
.subscribe(integers -> System.out.println(integers));
解析:每发射1个。创建一个发射物list buffer,每个buffer缓存2个,收集的存入list后发送。变体 time系列
- timespan:缓存的时间
- timeskip:每个缓存创建的间隔时间 同skip 可以小于大于等于timespan
1
2
3
4
5
6
7
8
9Observable.interval(500, TimeUnit.MILLISECONDS).take(7)
.buffer(3, 2, TimeUnit.SECONDS, Schedulers.single(),
Functions.createArrayList(16))
.subscribe(integers -> System.out.println(integers));
解析:每两秒创建一个发射物list buffer,每个buffer缓存三秒 收集的存入list后发送。
日志:
[0, 1, 2, 3, 4]
[4, 5, 6]- 变体 自定义buffer创建和收集时间
- bufferOpenings:每当 bufferOpenings 发射了一个数据时,它就 创建一个新的 List,开始装入之后的发射数据
- closingSelector:每当 closingSelector 发射了一个数据时,就结束装填数据 发射List。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21<!-- 范例和time系列的就一样了 -->
Consumer<Long> longConsumer = aLong -> System.out.println("开始创建 bufferSupplier");
Consumer<Long> longConsumer2 = aLong -> System.out.println("结束收集");
Observable.interval(500, TimeUnit.MILLISECONDS).take(7)
// .doOnNext(aLong -> System.out.println("原始发射物:" + aLong))
.buffer(Observable.interval(2, TimeUnit.SECONDS)
.startWith(-1L)//为了刚开始就发射一次
.take(2)//多余的我就不创建了
.doOnNext(longConsumer)
, aLong -> Observable.timer(3, TimeUnit.SECONDS)
.doOnNext(longConsumer2)
, () -> new ArrayList<>())
.subscribe(integers -> System.out.println("buffer发射物" + integers));
日志:
openings:
开始创建 bufferSupplier
开始创建 bufferSupplier
结束收集
buffer发射物[0, 1, 2, 3, 4]
buffer发射物[4, 5, 6]
变体 仅仅bufer创建时间
- boundarySupplier 因为发送一个值代表上个缓存的发送 和这个缓存的创建
这个缓存是连续的, 因为发送一个值代表上个缓存的发送 和这个缓存的创建
有发射物的时候 没缓存就创建了 就是 默认第一个发射物的时候由内部创建
注意 如果不发送事件缓存 存满了 会自动发送出去的1
2
3
4
5
6
7
8
9Observable.interval(500, TimeUnit.MILLISECONDS).take(7)
.buffer(() -> Observable.timer(2, TimeUnit.SECONDS)
.doOnNext(aLong -> System.out.println("开始创建 bufferSupplier"))
, () -> new ArrayList<Object>())
.subscribe(integers -> System.out.println(integers));
日志:
开始创建 bufferSupplier
[0, 1, 2]
[3, 4, 5, 6]
合并操作符
- zip(静态方法):只有当原始的Observable中的每一个都发射了 一条数据时 zip 才发射数据。接受一到九个参数
1 | Observable<Long> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS) |
- zipWith:zip的非静态写法,总是接受两个参数,第一个参数是一个Observable或者一个Iterable。
1 | observable1.zipWith( observable2, (aLong, aLong2) -> { |
- merge(静态方法):根据时间线 合并多个observer
1 | Observable<Long> ob1 = Observable.interval(100, TimeUnit.MILLISECONDS) |
- mergeWith:merge非静态写法
1 | ob1.mergeWith(ob2) |
- combineLatest(静态方法):使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值,它接受二到九个Observable作为参数 或者单 个Observables列表作为参数
1 | Observable<Long> observable1 = Observable.interval(100, TimeUnit.MILLISECONDS) |
- withLatestFrom:类似zip ,但是只在单个原始Observable发射了一条数据时才发射数据,而不是两个都发
但是注意 如果没有合并元素 既辅助Observable一次都没发射的时候 是不发射数据的
1 | Observable<Long> observable2 = Observable.interval(150, TimeUnit.MILLISECONDS) |
- switchMap:和flatMap类似,不同的是当原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable
1 | Observable.interval(500, TimeUnit.MILLISECONDS) |
- startWith:是concat()的对应部分,在Observable开始发射他们的数据之前,startWith()通过传递一个参数来先发射一个数据序列
1 |
|
- join:任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了。一条数据,就结合两个Observable发射的数据
1 | <!-- 此demo 好使但是未让能理解透彻 仅仅想测试能结果的任用 想明白的话 此demo无效 --> |
条件操作
all:判定是否Observable发射的所有数据都满足某个条件
1
2
3
4Observable.just(2, 3, 4)
.all(integer -> integer > 3)
.subscribe((aBoolean, throwable) -> System.out.println(aBoolean));
日志:falseamb:给定多个Observable,只让第一个发射数据的Observable发射全部数据
ambArray(静态方法):根据测试结果这个静态方法发射的最后一个
1
2
3
4
5
6
7
8
9Observable.ambArray(
Observable.intervalRange(0, 3, 200, 100, TimeUnit.MILLISECONDS)
, Observable.intervalRange(10, 3, 300, 100, TimeUnit.MILLISECONDS)
, Observable.intervalRange(20, 3, 100, 100, TimeUnit.MILLISECONDS)
)
.doOnComplete(() -> System.out.println("Complete"))
.subscribe(aLong -> System.out.println(aLong));
日志:
20 21 22 CompleteambWith:这个发射原始的
1
2
3
4
5
6Observable.intervalRange(0, 3, 200, 100, TimeUnit.MILLISECONDS)
.ambWith(Observable.intervalRange(10, 3, 300, 100, TimeUnit.MILLISECONDS))
.doOnComplete(() -> System.out.println("Complete"))
.subscribe(aLong -> System.out.println(aLong));
日志:
0 1 2 Complete
contains:判定一个Observable是否发射一个特定的值
1
2
3Observable.just(2, 3, 4)
.contains(2)
.subscribe((aBoolean, throwable) -> System.out.println(aBoolean));switchIfEmpty:如果原始Observable正常终止后仍然没有发射任何数据,就使用备用的Observable
1
2
3Observable.empty()
.switchIfEmpty(Observable.just(2, 3, 4))
.subscribe(o -> System.out.println("===>" + o + "\t")); //2,3,4defaultIfEmpty:发射来自原始Observable的值,如果原始Observable没有发射任何值,就发射一个默认值,内部调用的switchIfEmpty。
1
2
3Observable.empty()
.defaultIfEmpty(1)
.subscribe(o -> System.out.println("===>" + o + "\t")); //1sequenceEqual:判定两个Observables是否发射相同的数据序列。(数据,发射顺序,终止状态)
1
2
3
4
5
6
7
8
9
10
11
12Observable.sequenceEqual(
Observable.just(2, 3, 4)
, Observable.just(2, 3, 4))
.subscribe((aBoolean, throwable) -> System.out.println(aBoolean));
<!-- 它还有一个版本接受第三个参数,可以传递一个函数用于比较两个数据项是否相同。 -->
Observable.sequenceEqual(
Observable.just(2, 3, 4)
, Observable.just(2, 3, 4)
, (integer, integer2) -> integer + 1 == integer2)
.subscribe((aBoolean, throwable) -> System.out.println(aBoolean));skipUntil:丢弃原始Observable发射的数据,直到第二个Observable发射了一项数据
1 | Observable.intervalRange(30, 20, 500, 100, TimeUnit.MILLISECONDS) |
- skipWhile:丢弃Observable发射的数据,直到一个指定的条件不成立
1 | Observable.just(1,2,3,4) |
- takeUntil:当第二个Observable发射了一项数据或者终止时,丢弃原始Observable发射的任何数据
1 | <!-- 条件变体 --> |
- takeWhile:发射Observable发射的数据,直到一个指定的条件不成立
1 | Observable.just(2,3,4,5) |
错误处理
onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23<!-- 遇到错误处理范例 -->
Observable.error(new Throwable("我擦 空啊"))
.onErrorReturnItem("hei")
.subscribe(o -> System.out.println("===>" + o + "\t")
, throwable -> System.out.println("===>throwable")
, () -> System.out.println("===>complete"));
日志:
===>hei
===>complete
<!-- 遇到错误不处理范例 -->
Observable.error(new Throwable("我擦 空啊"))
.onErrorReturn(throwable -> {
System.out.println("错误信息:" + throwable.getMessage());
return throwable;
})
.subscribe(o -> System.out.println("===>" + o + "\t")
, throwable -> System.out.println("===>throwable")
, () -> System.out.println("===>complete"));
日志:
错误信息:我擦 空啊
===>java.lang.Throwable: 我擦 空啊
===>completeresumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列
onErrorResumeNext:可以处理所有的错误
1
2
3
4
5
6
7
8
9
10
11Observable.error(new Throwable("我擦 空啊"))
.onErrorResumeNext(throwable -> {
System.out.println("错误信息:" + throwable.getMessage());
return Observable.range(0, 3);
})
.subscribe(o -> System.out.print("===>" + o + "\t")
, throwable -> System.out.print("===>throwable"+ "\t")
, () -> System.out.print("===>complete"+ "\t"));
日志:
错误信息:我擦 空啊
===>0 ===>1 ===>2 ===>completeonExceptionResumeNext:只能处理异常。
Throwable 不是一个 Exception ,它会将错误传递给观察者的 onError 方法,不会使用备用 的Observable。
1
2
3
4
5
6
7
8
9<!-- Throwable不能处理范例 -->
Observable.error(new Throwable("我擦 空啊"))
.onExceptionResumeNext(observer -> Observable.range(0, 3))
.subscribe(o -> System.out.println("===>" + o + "\t")
, throwable -> System.out.println("===>throwable")
, () -> System.out.println("===>complete"));
日志:
===>throwable
<!-- 正确演示范例 无效ing 求解答~ todo -->
retry:如果原始Observable遇到错误,重新订阅它期望它能正常终止
变体count 重复次数
1
2
3
4
5
6
7
8
9
10
11Observable.create(e -> {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("hehe"));
})
.retry(2)
.subscribe(o -> System.out.print("===>" + o + "\t")
, throwable -> System.out.print("===>throwable\t")
, () -> System.out.print("===>complete\t"));
日志:
===>1 ===>2 ===>1 ===>2 ===>1 ===>2 ===>throwable变体Predicate 条件判定 如果返回 true retry,false 放弃 retry
1
2
3
4
5
6
7
8
9
10
11Observable.create(e -> {
e.onNext(1);
e.onNext(2);
e.onError(new Throwable("hehe"));
})
.retry(throwable -> throwable.getMessage().equals("hehe1"))
.subscribe(o -> System.out.print("===>" + o + "\t")
, throwable -> System.out.print("===>throwable\t")
, () -> System.out.print("===>complete\t"));
日志:
===>1 ===>2 ===>throwable
retryWhen: 需要一个Observable 通过判断 throwableObservable,Observable发射一个数据 就重新订阅,发射的是 onError 通知,它就将这个通知传递给观察者然后终止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39<!-- 正常范例 -->
Observable.just(1, "2", 3)
.cast(Integer.class)
<!-- 结果:1,1,complete 原因这个Observable发了一次数据 -->
.retryWhen(throwableObservable -> Observable.timer(1, TimeUnit.SECONDS))
<!-- 结果:1,1,1,1,complete 原因这个Observable发了三次数据 -->
.retryWhen(throwableObservable -> Observable.interval(1, TimeUnit.SECONDS)
.take(3))
.subscribe(o -> System.out.println("retryWhen 1===>" + o + "\t")
, throwable -> System.out.println("retryWhen 1===>throwable")
, () -> System.out.println("retryWhen 1===>complete"));
<!-- 通过判断throwable 进行处理范例 -->
Observable.just(1, "2", 3)
.cast(Integer.class)
.retryWhen(throwableObservable -> {
return throwableObservable.switchMap(throwable -> {
if (throwable instanceof IllegalArgumentException)
return Observable.just(throwable);
<!-- 这种方式OK -->
// else{
// PublishSubject<Object> pb = PublishSubject.create();
// pb .onError(throwable);
// return pb;
// }
else
//方法泛型
return Observable.<Object>error(throwable);
<!-- 这种方式也OK -->
// return Observable.just(1).cast(String.class);
});
})
.subscribe(o -> System.out.println("retryWhen 2===>" + o + "\t")
, throwable -> System.out.println("retryWhen 2===>throwable")
, () -> System.out.println("retryWhen 2===>complete"));
日志:
retryWhen 2===>1
retryWhen 2===>throwable
阻塞操作
toList
1
2
3Observable.just(1, 2, 3)
.toList().blockingGet()
.forEach(aLong -> System.out.println(aLong));toSortList
1
2
3
4Observable.just(5, 2, 3)
.toSortedList()
.blockingGet()
.forEach(integer -> System.out.println(integer))toMap
1
2
3
4
5
6
7Map<String, Integer> map = Observable.just(5, 2, 3)
// .toMap(integer -> integer + "_")
//key 就是5_,value就是5+10 mapSupplier map提供者
.toMap(integer -> integer + "_"
, integer -> integer + 10
, () -> new HashMap<>())
.blockingGet();toFuture
这个操作符将Observable转换为一个返 回单个数据项的 Future 带有返回值的任务
如果原始Observable发射多个数据项, Future 会收到1个 IllegalArgumentException
如果原始Observable没有发射任何数据, Future 会收到一 个 NoSuchElementException
如果你想将发射多个数据项的Observable转换为 Future ,可以这样 用: myObservable.toList().toFuture()
1 | Observable.just(1, 2, 3) |
blockingSubscribe
1
2Observable.just(1, 2, 3)
.blockingSubscribe(integer -> System.out.println(integer));blockingForEach:对BlockingObservable发射的每一项数据调用一个方法,会阻塞直到Observable完成。
1
2
3
4
5
6
7
Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext(aLong -> {
if (aLong == 10)
throw new RuntimeException();
}).onErrorReturnItem(-1L)
.blockingForEach(aLong -> System.out.println(aLong));blockingIterable
1
2
3
4Observable.just(1, 2, 3)
.blockingIterable()
// .blockingIterable(5);
.forEach(aLong -> System.out.println("aLong:" + aLong));blockingFirst
1
2
3
4Observable.empty()
// .blockingFirst();
//带默认值版本
.blockingFirst(-1));blockingLast:
1
2
3
4Observable.just(1,2,3)
// .blockingLast();
//带默认值版本
.blockingLast(-1));blockingMostRecent:返回一个总是返回Observable最近发射的数据的Iterable,类似于while的感觉
1
2
3
4
5
6
7
8
9
10Iterable<Long> c = Observable.interval(100, TimeUnit.MILLISECONDS)
.doOnNext(aLong -> {
if (aLong == 10)
throw new RuntimeException();
}).onErrorReturnItem(-1L)
.blockingMostRecent(-3L);
for (Long aLong : c) {
System.out.println("aLong:" + aLong);
}
日志很长 可以自己一试变知blockingSingle:
终止时只发射了一个值,返回那个值
empty 无默认值 报错, 默认值的话显示默认值
多个值的话 有无默认值都报错
1 | System.out.println("emit 1 value:" + Observable.just(1).blockingSingle()); |
组合操作
- compose:有多个 Observable ,并且他们都需要应用
一组相同的 变换
1 | <!-- 用一个工具类去写 这样符合单一职责 --> |
ConnectableObservable:可连接的Observable在 被订阅时并不开始发射数据,只有在它的 connect() 被调用时才开始用这种方法,
你可以 等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。即使没有任何订阅者订阅它,你也可以使用 connect 让他发射replay(Observable的方法): 每次订阅 都对单个订阅的重复播放一边
- bufferSize:对源发射队列的缓存数量, 从而对新订阅的进行发射;
Observable的方法 返回是ConnectableObservable
切记要让ConnectableObservable具有重播的能力,必须Obserable的时候调用replay,而不是ConnectableObservable 的时候调用replay1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29//this is OK,too!
ConnectableObservable<Integer> co = Observable.just(1, 2, 3)
//类似 publish直接转成 ConnectableObservable 切记要重复播放的话必须Obserable的时候调用replay
//而不是ConnectableObservable 的时候调用replay 所以 .publish().replay()则无效
.replay(3);//重复播放的 是1 2 3
// .replay(2);//重复播放的 是 2 3
co.doOnSubscribe(disposable -> System.out.print("订阅1:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
co.connect();//此时开始发射数据 不同与 refCount 只发送一次
co.doOnSubscribe(disposable -> System.out.print("订阅2:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
co.doOnSubscribe(disposable -> System.out.print("订阅3:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
replay(3)日志:只能缓存原始队列的两个【1,2,3】
订阅1:1 2 3
订阅2:1 2 3
订阅3:1 2 3
replay(2)日志:只能缓存原始队列的两个【2,3】
订阅1:1 2 3
订阅2: 2 3
订阅3: 2 3publish(Observable的方法):将普通的Observable转换为可连接的Observable
1
2
3
4
5
6
7ConnectableObservable<Integer> co = Observable.just(1, 2, 3)
.publish();
co.subscribe(integer -> System.out.println("订阅1:" + integer));
co.subscribe(integer -> System.out.println("订阅2:" + integer));
co.subscribe(integer -> System.out.println("订阅3:" + integer));
co.connect();//此时开始发射数据- refCount(ConnectableObservable的方法): 操作符把从一个可连接的Observable连接和断开的过程自动化了, 就像reply的感觉式样 每次订阅 都对单个订阅的重复播放一边
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22Observable<Integer> co = Observable.just(1, 2, 3)
.publish()
//类似于reply 跟时间线有关 订阅开始就开始发送
.refCount();
co.doOnSubscribe(disposable -> System.out.print("订阅1:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
co.doOnSubscribe(disposable -> System.out.print("订阅2:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
Observable.timer(300, TimeUnit.MILLISECONDS)
.doOnComplete(() -> {
co.doOnSubscribe(disposable -> System.out.print("订阅3:"))
.doFinally(() -> System.out.println())
.subscribe(integer -> System.out.print(integer + "\t"));
}).blockingSubscribe();
日志:
订阅1:1 2 3
订阅2:1 2 3
订阅3:1 2 3
- refCount(ConnectableObservable的方法): 操作符把从一个可连接的Observable连接和断开的过程自动化了, 就像reply的感觉式样 每次订阅 都对单个订阅的重复播放一边
Subjects
Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当 了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个 Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射 新的数据。
对我来说为什么用subjects呢?所有Subject都可以直接发射,不需要 发射器的引用 和 Observable.create()不同
- AsyncSubject:简单的说使用AsyncSubject无论输入多少参数,永远只输出最后一个参数。
但是如果因为发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
1 | AsyncSubject<Integer> source = AsyncSubject.create(); |
- BehaviorSubject:会发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。
如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何 数据,只是简单的向前传递这个错误通知。
1 | BehaviorSubject<Integer> source = BehaviorSubject.create(); |
publishSubject
(subject里最常用的):可以说是最正常的Subject,从那里订阅就从那里开始发送数据。如果原始的Observable因为发生了一个错误而终止,PublishSubject将不会发射任何数据,只 是简单的向前传递这个错误通知。
1 | PublishSubject bs = PublishSubject.create(); |
- replaySubject: 无论何时订阅,都会将所有历史订阅内容全部发出。
1 | ReplaySubject bs = ReplaySubject.create(); |
Single与Completable
参考:http://developer.51cto.com/art/201703/535298.htm
使用场景:其实这个网络请求并不是一个连续事件流,你只会发起一次 Get 请求返回数据并且只收到一个事件。我们都知道这种情况下 onComplete 会紧跟着 onNext 被调用,那为什么不把它们合二为一呢?
- Single:它总是只发射一个值,或者一个错误通知,而不是发射 一系列的值。因此,不同于Observable需要三个方法onNext, onError, onCompleted,订阅Single只需要两 个方法:
Single只会调用这两个方法中的一个,而且只会调用一次,调用了任何一个方法之后,订阅关 系终止。
* onSuccess - Single发射单个的值到这个方法
* onError - 如果无法发射需要的值,Single发射一个Throwable对象到这个方法
1 | <!-- retrofit 范例--> |
使用场景:通过 PUT 请求更新数据 我只关心 onComplete 事件。使用 Completable 时我们忽略 onNext 事件,只处理 onComplete 和 onError 事件
- Completable:本质上来说和 Observable 与 Single 不一样,因为它不发射数据。
1 | <!-- retrofit 范例--> |
andThen( Completable中的方法最常用):在这个操作符中你可以传任何Observable、Single、Flowable、Maybe或者其他Completable,它们会在原来的 Completable 结束后执行
1
2
3
4
5
6
7
8
9
10
11
12
13apiClient.updateMyData(myUpdatedData)
.andThen(performOtherOperation()) // a Single<OtherResult>
.subscribe(new Consumer<OtherResult>() {
@Override
public void accept(OtherResult result) throws Exception {
// handle otherResult
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception{
// handle error
}
});
自定义操作符
- lift 原理图
1 | @Test |
实用技巧
flatMap 与 zip 配合的实用范例:
1 | Observable.fromArray(new File("/Users/fuzhipeng/Documents")) |
map的实用范例:
1 | //有些服务几口设计,返回数据外层会包裹一些额外信息,可以使用map()吧外层格式剥掉 |
方法泛型的实用范例:
1 | Observable.just(1, "2", 3) |
BehaviorSubject的使用技巧:
cache BehaviorSubject 是桥梁 并且有 发送最近的缓存特性!
1 | BehaviorSubject<Object> cache = BehaviorSubject.create(); |
Observable 发射元素的封装范例:
1 | //创建一个Observable 可以直接发送的 原因 获取rx内部方法需要final很恶心 所以... |
Reference&Thanks:
https://www.gitbook.com/book/mcxiaoke/rxdocs/details
基本上所有的都参考此文档 很神!
http://blog.csdn.net/maplejaw_/article/details/52396175
http://developer.51cto.com/art/201703/535298.htm
http://gank.io/post/560e15be2dca930e00da1083
https://github.com/amitshekhariitbhu/RxJava2-Android-Samples