Android RxJava:图文详解 变换操作符。Android RxJava:组合 / 合并操作符 详细教程。

前言

  • Rxjava,由于其因事件流的链式调用、逻辑简洁 &
    使用简易
    的特色,深受各大 Android开发者的迎。

Github截图

假若还免了解 RxJava,请看文章:Android:这是同样篇 清晰 &
易懂的Rxjava 入门教程

  • RxJava如此这般让欢迎之原委,在于那供了长 &
    功能强大的操作符,几乎能做到所有的效应需求
  • 今,我以为大家详细介绍RxJava操作符中最好常用的更换操作符,并附带
    Retrofit 结合 RxJava的实例Demo教学,希望你们会喜欢。
  1. 遵照系列文章主要基于 Rxjava 2.0
  2. 连着下的光阴,自我用不止推出 AndroidRxjava 2.0
    的平等系列文章,包括原理、操作符、应用场景、背压等等

    ,有趣味可以延续关注Carson_Ho的安卓开发笔记!!

示意图


前言

  • Rxjava,由于其根据事件流的链式调用、逻辑简洁 &
    使用简易
    的特征,深受各大 Android开发者的接。

Github截图

要还免了解 RxJava,请圈文章:Android:这是同一篇 清晰 &
易懂的Rxjava 入门教程

  • RxJava诸如此类受欢迎之原由,在于那供了长 &
    功能强大的操作符,几乎能好有的功能要求
  • 今,我拿为大家详细介绍RxJava操作符中极常用之 整合 /
    合并操作符
    ,并附带 Retrofit 结合
    RxJava的实例Demo教学
    ,希望你们会喜欢。
  1. 据系列文章要基于 Rxjava 2.0
  2. 连下的工夫,自己以随地推出 AndroidRxjava 2.0
    的平多级文章,包括原理、操作符、应用场景、背压等等

    ,有趣味可以继续关心Carson_Ho的安卓开发笔记!!

示意图


目录

示意图


目录

示意图


1. 作用

  • 对事件序列中的波 / 整个事件序列
    进行加工处理(即变换),使得那变化成为不同之轩然大波 / 整个事件序列
  • 实际原理如下

示意图


1. 作用

构成 多独给观察者(Observable) & 合并需要发送的事件


2. 类型

  • RxJava受到广大的转换操作符如下:

    示意图

  • 下,我以针对每种操作符进行详尽介绍

注:本文仅讲解RxJava2以出过程遭到常用之易操作符


2. 类型

  • RxJava 2 中,常见的组合 / 合并操作符 主要出:

    示意图

  • 下,我以针对每个操作符进行详尽讲解


3. 应用场景 & 对应操作符 介绍

  • 下面,我将对 RxJava2 中的转移操作符进行逐项讲解
  • 注:在使用RxJava 2操作符前,记得在路的Gradle中添加依赖:

dependencies {
      compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
      compile 'io.reactivex.rxjava2:rxjava:2.0.7'
      // 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在
}

3. 应用场景 & 对应操作符 介绍

注:在使用RxJava 2操作符前,记得在品种的Gradle中添加依赖:

dependencies {
      compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
      compile 'io.reactivex.rxjava2:rxjava:2.0.7'
      // 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在
}

3.1 Map()

  • 作用
    对 被观察者发送的各级1只事件还经 点名的函数
    处理,从而变换成另外一种植事件

即, 拿被观察者发送的事件转换为随意的种事件。

  • 原理

示意图

  • 使场景
    数据类型转换

  • 具体行使
    下为以 使用Map() 将事件之参数从 整型 变换成 字符串类型
    为例子说明

示意图

 // 采用RxJava基于事件流的链式操作
        Observable.create(new ObservableOnSubscribe<Integer>() {

            // 1. 被观察者发送事件 = 参数为整型 = 1、2、3
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);

            }
            // 2. 使用Map变换操作符中的Function函数对被观察者发送的事件进行统一变换:整型变换成字符串类型
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "使用 Map变换操作符 将事件" + integer +"的参数从 整型"+integer + " 变换成 字符串类型" + integer ;
            }
        }).subscribe(new Consumer<String>() {

            // 3. 观察者接收事件时,是接收到变换后的事件 = 字符串类型
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
  • 测试结果

示意图

打地方可以见见,map() 将参数中之 Integer 类型对象转换成为一个
String类型 对象后赶回

同时,事件之参数类型为鉴于 Integer 类型变成了 String 类型


3.1 组合多独给观察者

拖欠品种的操作符的作用 = 组合多独让观察者

3.2 FlatMap()

  • 意:将于观察者发送的风波序列进行 拆分 &
    单独转换
    ,再统一成为一个新的风波序列,最后重复展开发送

  • 原理

  • 啊事件序列中每个事件还创造一个 Observable 对象;

  • 拿对每个 原始事件 转换后底 新事件 都放入到对应 Observable对象;
  • 用新建的每个Observable 都合并及一个 新建的、总的Observable
    对象;
  • 新建的、总的Observable 对象 将 新合并的波序列
    发送给观察者(Observer

示意图

  • 以场景
    无序的用于观察者发送的浑事件序列进行转移

  • 切实以

// 采用RxJava基于事件流的链式操作
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }

            // 采用flatMap()变换操作符
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("我是事件 " + integer + "拆分后的子事件" + i);
                    // 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
                    // 最终合并,再发送给被观察者
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
  • 测试结果
示意图

流淌:新合并生成的波序列顺序是无序的,即 与老序列发送事件之一一无关

concat() / concatArray()

  • 作用
    做多个被观察者一起发送数据,合并后 以殡葬顺序串行执行

两边分别:组合于观察者的数码,即concat()结合被观察者数量≤4只,而concatArray()则可>4个

  • 切实使用

// concat():组合多个被观察者(≤4个)一起发送数据
        // 注:串行执行
        Observable.concat(Observable.just(1, 2, 3),
                           Observable.just(4, 5, 6),
                           Observable.just(7, 8, 9),
                           Observable.just(10, 11, 12))
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

// concatArray():组合多个被观察者一起发送数据(可>4个)
        // 注:串行执行
        Observable.concatArray(Observable.just(1, 2, 3),
                           Observable.just(4, 5, 6),
                           Observable.just(7, 8, 9),
                           Observable.just(10, 11, 12),
                           Observable.just(13, 14, 15))
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
  • 测试结果

concat()情况

concatArray()情况

3.3 ConcatMap()

  • 作用:类似FlatMap()操作符

  • FlatMap()的 区别在于:拆分 & 重新合并生成的波序列 的相继 =
    被观察者旧序列生产的一一

  • 原理

示意图

  • 运场景
    不变的将为观察者发送的通事件序列进行转换

  • 具体行使

// 采用RxJava基于事件流的链式操作
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }

            // 采用concatMap()变换操作符
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("我是事件 " + integer + "拆分后的子事件" + i);
                    // 通过concatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
                    // 最终合并,再发送给被观察者
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
  • 测试结果
示意图

横流:新合并生成的事件序列顺序是稳步的,即 严格依照原序列发送事件之依次


merge() / mergeArray()

  • 作用
    结合多只给观察者一起发送数据,合并后 依照时间线并行执行
  1. 双面分别:组合被观察者的多少,即merge()结为观察者数量≤4个,而mergeArray()则可>4个
  2. 分上述concat()操作符:同样是整合多独被观察者一起发送数据,但concat()操作符合并后是比照殡葬顺序串行执行
  • 具体运用

// merge():组合多个被观察者(<4个)一起发送数据
        // 注:合并后按照时间线并行执行
        Observable.merge(
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)) // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                  .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

// mergeArray() = 组合4个以上的被观察者一起发送数据,此处不作过多演示,类似concatArray()
  • 测试结果

些微个给观察者发送事件并行执行,输出结果 = 0,2 -> 1,3 -> 2,4

33333.gif

3.4 Buffer()

  • 作用
    期从 被观察者(Obervable)需要发送的轩然大波受到 获取一定数额之风波 &
    放到缓存区中,最终发送

  • 原理

示意图

  • 采取场景
    缓存被观察者发送的事件

  • 实际应用
    那么,Buffer()历次是取多少个事件放到缓存区中的也?下面我用通过一个例子来说明

// 被观察者 需要发送5个数字
        Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 1) // 设置缓存区大小 & 步长
                                    // 缓存区大小 = 每次从被观察者中获取的事件数量
                                    // 步长 = 每次获取新事件的数量
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(List<Integer> stringList) {
                        //
                        Log.d(TAG, " 缓存区里的事件数量 = " +  stringList.size());
                        for (Integer value : stringList) {
                            Log.d(TAG, " 事件 = " + value);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应" );
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
  • 测试结果

示意图

  • 进程解释

脚,我用透过一个贪图来说明Buffer()原理 & 整个例子的结果

示意图

至此,关于RxJava2吃主要的转移操作符已经教结束


concatDelayError() / mergeDelayError()

  • 作用

示意图

  • 切切实实应用

a. 无使用concatDelayError()的情况

Observable.concat(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

测试结果:第1只为观察者发送Error事件后,第2独吃观察者则未会见持续发送事件

示意图

<-- 使用了concatDelayError()的情况 -->
Observable.concatArrayDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

测试结果:第1独叫观察者的Error事件将以第2个为观察者发送完事件后重新持续发送

示意图

mergeDelayError()操作符同理,此处不发了多演示


4. 实际付出需要案例

  • 转换操作符的重点出需求场景 = 嵌套回调(Callback hell
  • 下面,我用应用一个实际应用场景实例来上课嵌套回调(Callback hell

切切实实要圈文章Android RxJava
实际利用案例教学:网络要嵌套回调


3.2 合并多个事件

欠项目的操作符主要是对几近独叫观察者中之轩然大波进行合并处理。

5. Demo地址

上述所有的Demo源代码都存放于:Carson_Ho的Github地址:RxJava2_变操作符

喜爱的麻烦点个star


Zip()

  • 作用
    联合
    多只被观察者(Observable)发送的轩然大波,生成一个新的风波序列(即成后底波序列),并最终发送

  • 原理
    实际求看下图

示意图

  • 特别注意:

  • 事件做措施 = 严格按照本事件序列 进行针对性各统一

  • 终极合并的波数量 = 多个被观察者(Observable)中数量极其少之数

就要下图

示意图

  • 现实行使

<-- 创建第1个被观察者 -->
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "被观察者1发送了事件1");
                emitter.onNext(1);
                // 为了方便展示效果,所以在发送事件后加入2s的延迟
                Thread.sleep(1000);

                Log.d(TAG, "被观察者1发送了事件2");
                emitter.onNext(2);
                Thread.sleep(1000);

                Log.d(TAG, "被观察者1发送了事件3");
                emitter.onNext(3);
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io()); // 设置被观察者1在工作线程1中工作

<-- 创建第2个被观察者 -->
        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "被观察者2发送了事件A");
                emitter.onNext("A");
                Thread.sleep(1000);

                Log.d(TAG, "被观察者2发送了事件B");
                emitter.onNext("B");
                Thread.sleep(1000);

                Log.d(TAG, "被观察者2发送了事件C");
                emitter.onNext("C");
                Thread.sleep(1000);

                Log.d(TAG, "被观察者2发送了事件D");
                emitter.onNext("D");
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());// 设置被观察者2在工作线程2中工作
        // 假设不作线程控制,则该两个被观察者会在同一个线程中工作,即发送事件存在先后顺序,而不是同时发送

<-- 使用zip变换操作符进行事件合并 -->
// 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String string) throws Exception {
                return  integer + string;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "最终接收到的事件 =  " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
  • 测试结果

示意图

  • 特别注意:
    1. 尽管为观察者2底风波D从未有过事件及那联合,但要么会继续发送
    2. 如于为观察者1 &
      被观察者2之轩然大波序列最后发送onComplete()事件,则于观察者2之事件D也无见面发送,测试结果如下

示意图

  • 因为Zip()操作符较为复杂 & 难理解,此处将就此1摆设图总结

示意图

关于Zip()结合RxJavaRxtrofit的实例讲解将在第4节省吃详尽讲解


6. 总结

  • 下面,我拿就此同摆设图总结 RxJava2 中常用之变操作符

示意图

  • 连通下去的时空,本身将不断生产 AndroidRxjava 2.0
    的平等层层文章,包括原理、操作符、应用场景、背压等等

    ,有趣味可以连续关注Carson_Ho的安卓开发笔记!!

示意图


combineLatest()

  • 作用
    当两个Observables中之别一个殡葬了数量后,将先发送了数码的Observables
    的摩登(最后)一个数据 与
    另外一个Observable出殡的每个数据做,最终因该函数之结果发送数据

Zip()的区别:Zip() =
按个数合并,即1对1联;CombineLatest() =
按日统一,即在和一个时刻接触达合

  • 具体采用

Observable.combineLatest(
                    Observable.just(1L, 2L, 3L), // 第1个发送数据事件的Observable
                    Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                    new BiFunction<Long, Long, Long>() {
                @Override
                public Long apply(Long o1, Long o2) throws Exception {
                    // o1 = 第1个Observable发送的最新(最后)1个数据
                    // o2 = 第2个Observable发送的每1个数据
                    Log.e(TAG, "合并的数据是: "+ o1 + " "+ o2);
                    return o1 + o2;
                    // 合并的逻辑 = 相加
                    // 即第1个Observable发送的最后1个数据 与 第2个Observable发送的每1个数据进行相加
                }
            }).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long s) throws Exception {
                    Log.e(TAG, "合并的结果是: "+s);
                }
            });
  • 测试结果

示意图

请求点赞!因为您的鞭策是自写作的极其酷动力!

连锁文章读

  • 操作符使用
    Android:这是一模一样首 清晰 & 易懂的Rxjava
    入门教程
    Android RxJava:最基础的操作符详解 –
    创建操作符
    Android RxJava:图文详解
    变换操作符
    Android RxJava:组合 / 合并操作符
    详细教程
    Android RxJava:功能性操作符
    全面授课
  • 实在运用讲解
    Android RxJava
    实际用讲解:(无条件)网络要轮询
    Android RxJava
    实际应用讲解:(有格)网络要轮询
    Android RxJava
    实际运用讲解:网络要嵌套回调
    Android RxJava
    实际用讲解:合并数据源
    Android RxJava 实际应用讲解:从磁盘 / 内存缓存着
    获取缓存数据
    Android RxJava
    实际运用讲解:联合判断
    Android RxJava:细说 线程控制(切换 / 调度
    )(含Retrofit实例讲解)
    Android RxJava
    实际利用讲解:网络要出错重连(结合Retrofit)

combineLatestDelayError()

用意类似于concatDelayError() / mergeDelayError()
,即错误处理,此处不发了多描述

迎关注Carson_Ho的简书!

非定期分享有关安卓开发的干货,追求短、平、快,但却非亏深度

reduce()

  • 作用
    管于观察者需要发送的轩然大波聚合成1单事件 & 发送

会合的逻辑依据需要撰写,但实质都是前2只数据聚合,然后与继1独数据持续拓展联谊,依次类推

  • 切实使用

Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    // 在该复写方法中复写聚合的逻辑
                    @Override
                    public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                        Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2);
                        return s1 * s2;
                        // 本次聚合的逻辑是:全部数据相乘起来
                        // 原理:第1次取前2个数据相乘,之后每次获取到的数据 = 返回的数据x原始下1个数据每
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最终计算的结果是: "+s);

            }
        });
  • 测试结果

示意图

collect()

  • 作用
    用为观察者Observable出殡的数码事件采访到一个数据结构里

  • 切实用

Observable.just(1, 2, 3 ,4, 5, 6)
                .collect(
                        // 1. 创建数据结构(容器),用于收集被观察者发送的数据
                        new Callable<ArrayList<Integer>>() {
                            @Override
                            public ArrayList<Integer> call() throws Exception {
                                return new ArrayList<>();
                            }
                            // 2. 对发送的数据进行收集
                        }, new BiConsumer<ArrayList<Integer>, Integer>() {
                            @Override
                            public void accept(ArrayList<Integer> list, Integer integer)
                                    throws Exception {
                                // 参数说明:list = 容器,integer = 后者数据
                                list.add(integer);
                                // 对发送的数据进行收集
                            }
                        }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(@NonNull ArrayList<Integer> s) throws Exception {
                Log.e(TAG, "本次发送的数据是: "+s);

            }
        });
  • 测试结果

示意图

3.3 发送事件前增加发送事件

startWith() / startWithArray()

  • 作用
    在一个于观察者发送事件前,追加发送一些数额 / 一个初的为观察者

  • 切切实实用

<-- 在一个被观察者发送事件前,追加发送一些数据 -->
        // 注:追加数据顺序 = 后调用先追加
        Observable.just(4, 5, 6)
                  .startWith(0)  // 追加单个数据 = startWith()
                  .startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });


<-- 在一个被观察者发送事件前,追加发送被观察者 & 发送数据 -->
        // 注:追加数据顺序 = 后调用先追加
        Observable.just(4, 5, 6)
                .startWith(Observable.just(1, 2, 3))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
  • 测试结果

示意图

示意图

3.4 统计发送事件数量

count()

  • 作用
    统计于观察者发送事件的数量

  • 切切实实运用

// 注:返回结果 = Long类型
        Observable.just(1, 2, 3, 4)
                  .count()
                  .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "发送的事件数量 =  "+aLong);

                    }
                });
  • 测试结果

示意图

至此,RxJava 2遇之组成 / 合并操作符讲解结束。


4. 实际上开发需要案例

下,我将执教组合 / 合并操作符的常见实际需要:

  1. 起缓存(磁盘、内存)中得到缓存数据
  2. 合数据源
  3. 一道判断

  4. 下面,我拿对每个应用场景进行实例Demo演示讲解。

4.1 获取缓存数据

  • 不怕从缓存中(磁盘缓存 &
    内存缓存)获取数据;若缓存中很多仍,才通过网要获取数据
  • 现实要圈文章:Android RxJava 实际用讲解:从磁盘 / 内存缓存着
    获取缓存数据

4.2 合并数据源 & 同时出示

  • 尽管,数据源 来自不同地方(如网 + 本地),需要从不同的地方获取数据 &
    同时出示
  • 切切实实求看文章:Android RxJava
    实际用讲解:合并数据源

4.3 联合判断

  • 即便,同时针对大多独事件展开同步判断

如,填写表单时,需要表单里有所信息(姓名、年龄、职业等)都受填后,才允点击
“提交” 按钮

  • 切切实实要看文章:Android RxJava
    实际用讲解:联合判断

5. Demo地址

上述所有的Demo源代码都存放于:Carson_Ho的Github地址:RxJava2_成 /
合并操作符


6. 总结

  • 下面,我将就此同样摆图总结 RxJava2 中常用之组合 / 合并操作符

示意图

  • 连通下的时光,自己以连生产 AndroidRxjava 2.0
    的如出一辙文山会海文章,包括原理、操作符、应用场景、背压等等

    ,有趣味可以继承关心Carson_Ho的安卓开发笔记!!

示意图


请求点赞!因为你的砥砺是本身写的极酷动力!

连锁文章读

  • 操作符使用
    Android:这是一模一样首 清晰 & 易懂的Rxjava
    入门教程
    Android RxJava:最基础的操作符详解 –
    创建操作符
    Android RxJava:图文详解
    变换操作符
    Android RxJava:组合 / 合并操作符
    详细教程
    Android RxJava:功能性操作符
    全面授课
  • 骨子里运用讲解
    Android RxJava
    实际用讲解:(无条件)网络要轮询
    Android RxJava
    实际应用讲解:(有规则)网络要轮询
    Android RxJava
    实际运用讲解:网络要嵌套回调
    Android RxJava
    实际用讲解:合并数据源
    Android RxJava 实际应用讲解:从磁盘 / 内存缓存着
    获取缓存数据
    Android RxJava
    实际运用讲解:联合判断
    Android RxJava:细说 线程控制(切换 / 调度
    )(含Retrofit实例讲解)
    Android RxJava
    实际利用讲解:网络要出错重连(结合Retrofit)

迎关注Carson_Ho的简书!

匪期分享关于安卓开发的干货,追求短、平、快,但倒无亏深度

相关文章