标签: RxJava

RxJava操作符之Share, Publish, Refcount

RxJava操作符之Share, Publish, Refcount

看源码知道.share()操作符是.publish().refcount()调用链的包装。

先来看ConnectedObservable

“ConnectedObservable” – This is a kind of observable which doesn’t emit items even if subscribed to.
It only starts emitting items after its .connect() method is called.

因为这个原因,在ConnectedObservable的connect这个方法被调用之前,connected obesrvable也被认为是“冷”和不活跃。

再看publish方法

.publish()– This method allows us to change an ordinary observable into a “ConnectedObservable”.
Simply call this method on an ordinary observable and it becomes a connected one.

现在我们知道了share操作符的1/2,那么为什么需要运用Connected Observable这个操作符呢?文档上是这么写的:

In this way you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

这就意味着publish可以调用多个subscriber。当你有超过一个订阅者的时候,处理每个订阅和正确的销毁他们变得棘手。 为了使这个更方便,Rx发明了这个魔法操作符refcount():

refcount() – This operator keeps track of how many subscribers are subscribed to the resulting Observable and
refrains from disconnecting from the source ConnectedObservable until all such Observables are unsubscribed.

refcount本质上在后台维护着一个引用计数器,当一个subscription需要取消订阅或者销毁的时候,发出一个正确的动作。

我们再次看一下debouncedBuffer的例子,看一下在哪,share是怎么用的。

Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
// which is really the same as:
Observable<Object> tapEventEmitter = _rxBus.toObserverable().publish().refcount();

我们现在有了一个”shareable”的名字叫”tapEventEmitter”的observable。 因为他是可以分享的,而且还不是“live”(share操作符中的publish调用使其变成一个ConnectedObservable), 我们可以用他构成我们的Observables,而且要确保我们有了一个原始的observable的引用 (这个例子中原始的observable是_rxBus.toObserverable()).

Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
Observable<Object> debouncedEventEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
tapEventEmitter.buffer(debouncedEventEmitter)
//...

所有的这一切看起来都很好。然而这个实现会有一个可能的竞争条件。因为这有两个subscribers(debounce and buffer)而且会在不同的时间点发生,所以竞争条件就会发生。 记住RxBus是由hot/live Subject支持的不断的发射数据。通过使用share操作符,我们保证引用的是同一个资源。 而不是subscribers在不同的时间点订阅,他们会收到准确的相同的数据。

The race condition is when the two consumers subscribe. Often on a hot stream it doesn’t matter when subscribers come and go,and refCount is perfect for that.
The race condition refCount protects against is having only 1 active subscription upstream. However,if 2 Subscribers subscribe to a refcounted stream that emits 1, 2, 3, 4, 5, the first may get 1, 2, 3, 4, 5 and the second may get 2, 3, 4, 5.

To ensure all subscribers start at exactly the same time and get the exact same values, refCount can not be used.
Either ConnectableObservable with a manual, imperative invocation of connect needs to be done, or the variant of publish(function)which connects everything within the function before connecting the upstream.

在我们的用法中几乎立即执行所以没有什么关系。但是我们原始的意图是把debouncedBuffer方法作为一个单独的操作符。 如果相同的事件没有被发射出去,从概念上看起来是不正确的。

通过Bean后来的建议,我添加了一个更好的第三方的实现,用来处理这种竞争条件。

复制代码
复制代码
// don't start emitting items just yet by turning the observable to a connected one
ConnectableObservable<Object> tapEventEmitter = _rxBus.toObserverable().publish();

tapEventEmitter.publish((Func1) (stream) -> {

// inside `publish`, "stream" is truly multicasted

// applying the same technique for getting a debounced buffer sequence
    return stream.buffer(stream.debounce(1, TimeUnit.SECONDS));

}).subscribe((Action1) (taps) {
_showTapCount(taps.size());
});

// start listening to events now
tapEventEmitter.connect();
复制代码
复制代码

 

RxJava系列 过滤操作符

前面一篇文章中我们介绍了转换类操作符,那么这一章我们就来介绍下过滤类的操作符。顾名思义,这类operators主要用于对事件数据的筛选过滤,只返回满足我们条件的数据。过滤类操作符主要包含: FilterTake TakeLast TakeUntil Skip SkipLast ElementAt Debounce Distinct DistinctUntilChanged First Last等等。

Filter

filter(Func1)用来过滤观测序列中我们不想要的值,只返回满足条件的值,我们看下原理图:

%title插图%num
filter(Func1)

还是拿前面文章中的小区Community[] communities来举例,假设我需要赛选出所有房源数大于10个的小区,我们可以这样实现:

Observable.from(communities)
        .filter(new Func1<Community, Boolean>() {
            @Override
            public Boolean call(Community community) {
                return community.houses.size()>10;
            }
        }).subscribe(new Action1<Community>() {
    @Override
    public void call(Community community) {
        System.out.println(community.name);
    }
});

Take

take(int)用一个整数n作为一个参数,从原始的序列中发射前n个元素.

%title插图%num
take(int)

现在我们需要取小区列表communities中的前10个小区

Observable.from(communities)
        .take(10)
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                System.out.println(community.name);
            }
        });

TakeLast

takeLast(int)同样用一个整数n作为参数,只不过它发射的是观测序列中后n个元素。

%title插图%num
takeLast(int)

获取小区列表communities中的后3个小区

Observable.from(communities)
        .takeLast(3)
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                System.out.println(community.name);
            }
        });

TakeUntil

takeUntil(Observable)订阅并开始发射原始Observable,同时监视我们提供的第二个Observable。如果第二个Observable发射了一项数据或者发射了一个终止通知,takeUntil()返回的Observable会停止发射原始Observable并终止。

%title插图%num
takeUntil(Observable)
Observable<Long> observableA = Observable.interval(300, TimeUnit.MILLISECONDS);
Observable<Long> observableB = Observable.interval(800, TimeUnit.MILLISECONDS);

observableA.takeUntil(observableB)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }
            @Override
            public void onError(Throwable e) {

            }
            @Override
            public void onNext(Long aLong) {
                System.out.println(aLong);
            }
        });

try {
    Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
    e.printStackTrace();
}

程序输出:

0
1

takeUntil(Func1)通过Func1中的call方法来判断是否需要终止发射数据。

%title插图%num
takeUntil(Func1)
Observable.just(1, 2, 3, 4, 5, 6, 7)
                .takeUntil(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer >= 5;
                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println(integer);
            }
        });

程序输出:

1
2
3
4
5

Skip

skip(int)让我们可以忽略Observable发射的前n项数据。

%title插图%num
skip(int)

过滤掉小区列表communities中的前5个小区

Observable.from(communities)
        .skip(5)
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                System.out.println(community.name);
            }
        });

SkipLast

skipLast(int)忽略Observable发射的后n项数据。

%title插图%num
skipLast(int)

ElementAt

elementAt(int)用来获取元素Observable发射的事件序列中的第n项数据,并当做唯一的数据发射出去。

%title插图%num
elementAt(int)

Debounce

debounce(long, TimeUnit)过滤掉了由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射*后的那个。通常我们用来结合RxBing(Jake Wharton大神使用RxJava封装的Android UI组件)使用,防止button重复点击。

%title插图%num
debounce(long, TimeUnit)

debounce(Func1)可以根据Func1的call方法中的函数来过滤,Func1中的中的call方法返回了一个临时的Observable,如果原始的Observable在发射一个新的数据时,上一个数据根据Func1的call方法生成的临时Observable还没结束,那么上一个数据就会被过滤掉。

%title插图%num
debounce(Func1)

Distinct

distinct()的过滤规则是只允许还没有发射过的数据通过,所有重复的数据项都只会发射一次。

%title插图%num
distinct()

过滤掉一段数字中的重复项:

Observable.just(2, 1, 2, 2, 3, 4, 3, 4, 5, 5)
        .distinct()
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.print(i + " ");
            }
        });

程序输出:

2 1 3 4 5 

distinct(Func1)参数中的Func1中的call方法会根据Observable发射的值生成一个Key,然后比较这个key来判断两个数据是不是相同;如果判定为重复则会和distinct()一样过滤掉重复的数据项。

%title插图%num
distinct(Func1)

假设我们要过滤掉一堆房源中小区名重复的小区:

List<House> houses = new ArrayList<>();
//House构造函数中的*个参数为该房源所属小区名,第二个参数为房源描述
List<House> houses = new ArrayList<>();
houses.add(new House("中粮·海景壹号", "中粮海景壹号新出大平层!总价4500W起"));
houses.add(new House("竹园新村", "满五唯一,黄金地段"));
houses.add(new House("竹园新村", "一楼自带小花园"));
houses.add(new House("中粮·海景壹号", "毗邻汤臣一品"));
houses.add(new House("中粮·海景壹号", "顶级住宅,给您总统般尊贵体验"));
houses.add(new House("竹园新村", "顶层户型,两室一厅"));
houses.add(new House("中粮·海景壹号", "南北通透,豪华五房"));
Observable.from(houses)
        .distinct(new Func1<House, String>() {

            @Override
            public String call(House house) {
                return house.communityName;
            }
        }).subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
            }
        });            

程序输出:

小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起
小区:竹园新村; 房源描述:满五唯一,黄金地段

DistinctUntilChanged

distinctUntilChanged()distinct()类似,只不过它判定的是Observable发射的当前数据项和前一个数据项是否相同。

%title插图%num
distinctUntilChanged()

同样还是上面过滤数字的例子:

Observable.just(2, 1, 2, 2, 3, 4, 3, 4, 5, 5)
        .distinctUntilChanged()
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.print(i + " ");
            }
        });

程序输出:

2 1 2 3 4 3 4 5 

distinctUntilChanged(Func1)distinct(Func1)一样,根据Func1中call方法产生一个Key来判断两个相邻的数据项是否相同。

%title插图%num
distinctUntilChanged(Func1)

我们还是拿前面的过滤房源的例子:

Observable.from(houses)
        .distinctUntilChanged(new Func1<House, String>() {

            @Override
            public String call(House house) {
                return house.communityName;
            }
        }).subscribe(new Action1<House>() {
    @Override
    public void call(House house) {
        System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
    }
});

程序输出:

小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起
小区:竹园新村; 房源描述:满五唯一,黄金地段
小区:中粮·海景壹号; 房源描述:毗邻汤臣一品
小区:竹园新村; 房源描述:顶层户型,两室一厅
小区:中粮·海景壹号; 房源描述:南北通透,豪华五房

First

first()顾名思义,它是的Observable只发送观测序列中的*个数据项。

%title插图%num
first()

获取房源列表houses中的*套房源:

Observable.from(houses)
        .first()
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
            }                
        });

程序输出:

小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起

first(Func1)只发送符合条件的*个数据项。

%title插图%num
first(Func1)

现在我们要获取房源列表houses中小区名为竹园新村的*套房源。

Observable.from(houses)
        .first(new Func1<House, Boolean>() {
            @Override
            public Boolean call(House house) {
                return "竹园新村".equals(house.communityName);
            }
        })
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
            }
        });

程序输出:

小区:竹园新村; 房源描述:满五唯一,黄金地段

Last

last()只发射观测序列中的*后一个数据项。

%title插图%num
last()

获取房源列表中的*后一套房源:

Observable.from(houses)
        .last()
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
            }
        });

程序输出:

小区:中粮·海景壹号; 房源描述:南北通透,豪华五房

last(Func1)只发射观测序列中符合条件的*后一个数据项。

%title插图%num
last(Func1)

获取房源列表houses中小区名为竹园新村的*后一套房源:

Observable.from(houses)
        .last(new Func1<House, Boolean>() {
            @Override
            public Boolean call(House house) {
                return "竹园新村".equals(house.communityName);
            }
        })
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:" + house.communityName + "; 房源描述:" + house.desc);
            }
        });

程序输出:

小区:竹园新村; 房源描述:顶层户型,两室一厅

这一章我们就先聊到这,更多的过滤类操作符的介绍大家可以去查阅官方文档和源码;在下一章我们将继续介绍组合类操作符。

RxJava系列 转换操作符

前面两篇文章中我们介绍了RxJava的一些基本概念和RxJava*简单的用法。从这一章开始,我们开始聊聊RxJava中的操作符Operators,RxJava中的操作符主要分成了三类:

  1. 转换类操作符(map flatMap concatMap flatMapIterable switchMap scan groupBy …);
  2. 过滤类操作符(fileter take takeLast takeUntil distinct distinctUntilChanged skip skipLast …);
  3. 组合类操作符(merge zip join combineLatest and/when/then switch startSwitch …)。

这一章我们主要讲讲转换类操作符。所有这些Operators都作用于一个可观测序列,然后变换它发射的值,*后用一种新的形式返回它们。概念实在是不好理解,下面我们结合实际的例子一一介绍。

map

map()函数接受一个Func1类型的参数(就像这样map(Func1<? super T, ? extends R> func)),然后吧这个Func1应用到每一个由Observable发射的值上,将发射的只转换为我们期望的值。这种狗屁定义我相信你也听不懂,我们来看一下官方给出的原理图:

%title插图%num
MapOperator

假设我们需要将一组数字装换成字符串,我们可以通过map这样实现:

Observable.just(1, 2, 3, 4, 5)
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer i) {
                return "This is " + i;
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println(s);
            }
        });

Func1构造函数中的两个参数分别是Observable发射值当前的类型和map转换后的类型,上面这个例子中发射值当前的类型是Integer,转换后的类型是String。

flatMap

flatMap()函数同样也是做转换的,但是作用却不一样。flatMap不开好理解,我们直接看例子(我们公司是个房产平台,那我就拿房子举例):假设我们有一组小区Community[] communites,现在我们要输出每个小区的名字;我们可以这样实现:

Observable.from(communities)
        .map(new Func1<Community, String>() {
            @Override
            public String call(Community community) {
                return community.name;
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String name) {
                System.out.println("Community name : " + name);
            }
        });

现在我们需求有变化,需要打印出每个小区下面所有房子的价格。于是我可以这样实现:

Community[] communities = {};
Observable.from(communities)
        .subscribe(new Action1<Community>() {
            @Override
            public void call(Community community) {
                for (House house : community.houses) {
                    System.out.println("House price : " + house.price);
                }
            }
        });

如果我不想在Subscriber中使用for循环,而是希望Subscriber中直接传入单个的House对象呢(这对于代码复用很重要)?用map()显然是不行的,因为map()是一对一的转化,而我现在的要求是一对多的转化。那么我们可以使用flatMap()把一个Community转化成多个House。

Observable.from(communities)
        .flatMap(new Func1<Community, Observable<House>>() {
            @Override
            public Observable<House> call(Community community) {
                return Observable.from(community.houses);
            }
        })
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("House price : " + house.price);
            }
        });

从前面的例子中你坑定发现了,flatMap()和map()都是把传入的参数转化之后返回另一个对象。但和map()不同的是,flatMap()中返回的是Observable对象,并且这个Observable对象并不是被直接发送到 Subscriber的回调方法中。

flatMap()的原理是这样的:

  1. 将传入的事件对象装换成一个Observable对象;
  2. 这是不会直接发送这个Observable, 而是将这个Observable激活让它自己开始发送事件;
  3. 每一个创建出来的Observable发送的事件,都被汇入同一个Observable,这个Observable负责将这些事件统一交给Subscriber的回调方法。

这三个步骤,把事件拆成了两级,通过一组新创建的Observable将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是flatMap()所谓的flat。

*后我们来看看flatMap的原理图:

%title插图%num
FlatMapOperator

concatMap

concatMap()解决了flatMap()的交叉问题,它能够把发射的值连续在一起,就像这样:

%title插图%num
ConcatMapOperator

flatMapIterable

flatMapIterable()flatMap()几乎是一样的,不同的是flatMapIterable()它转化的多个Observable是使用Iterable作为源数据的。

%title插图%num
FlatMapIterableOperator
Observable.from(communities)
        .flatMapIterable(new Func1<Community, Iterable<House>>() {
            @Override
            public Iterable<House> call(Community community) {
                return community.houses;
            }
        })
        .subscribe(new Action1<House>() {

            @Override
            public void call(House house) {

            }
        });

switchMap

switchMap()flatMap()很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。

%title插图%num
SwitchMapOperator

scan

scan()对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用合格函数时的*个参数使用。

%title插图%num
ScanOperator

我们来看个简单的例子:

Observable.just(1, 2, 3, 4, 5)
        .scan(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) {
                return integer + integer2;
            }
        }).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        System.out.print(integer+“ ”);
    }
});

输出结果为:1 3 6 10 15

groupBy

groupBy()将原始Observable发射的数据按照key来拆分成一些小的Observable,然后这些小Observable分别发射其所包含的的数据,和SQL中的groupBy类似。实际使用中,我们需要提供一个生成key的规则(也就是Func1中的call方法),所有key相同的数据会包含在同一个小的Observable中。另外我们还可以提供一个函数来对这些数据进行转化,有点类似于集成了flatMap。

%title插图%num
GroupByOperator

单纯的文字描述和图片解释可能难以理解,我们来看个例子:假设我现在有一组房源List<House> houses,每套房子都属于某一个小区,现在我们需要根据小区名来对房源进行分类,然后依次将房源信息输出。

List<House> houses = new ArrayList<>();
houses.add(new House("中粮·海景壹号", "中粮海景壹号新出大平层!总价4500W起"));
houses.add(new House("竹园新村", "满五唯一,黄金地段"));
houses.add(new House("中粮·海景壹号", "毗邻汤臣一品"));
houses.add(new House("竹园新村", "顶层户型,两室一厅"));
houses.add(new House("中粮·海景壹号", "南北通透,豪华五房"));
Observable<GroupedObservable<String, House>> groupByCommunityNameObservable = Observable.from(houses)
        .groupBy(new Func1<House, String>() {

            @Override
            public String call(House house) {
                return house.communityName;
            }
        });

通过上面的代码我们创建了一个新的Observable:groupByCommunityNameObservable,它将会发送一个带有GroupedObservable的序列(也就是指发送的数据项的类型为GroupedObservable)。GroupedObservable是一个特殊的Observable,它基于一个分组的key,在这个例子中的key就是小区名。现在我们需要将分类后的房源依次输出:

Observable.concat(groupByCommunityNameObservable)
        .subscribe(new Action1<House>() {
            @Override
            public void call(House house) {
                System.out.println("小区:"+house.communityName+"; 房源描述:"+house.desc);
            }
        });

执行结果:

小区:中粮·海景壹号; 房源描述:中粮海景壹号新出大平层!总价4500W起
小区:中粮·海景壹号; 房源描述:毗邻汤臣一品
小区:中粮·海景壹号; 房源描述:南北通透,豪华五房
小区:竹园新村; 房源描述:满五唯一,黄金地段
小区:竹园新村; 房源描述:顶层户型,两室一厅

转换类的操作符就先介绍到这,后续还会继续介绍组合、过滤类的操作符及源码分析,敬请期待!

RxJava系列 基本概念及使用介绍

前言

上一篇的示例代码中大家一定发现了Observable这个类。从纯Java的观点看,Observable类源自于经典的观察者模式。RxJava的异步实现正是基于观察者模式来实现的,而且是一种扩展的观察者模式。

观察者模式

观察者模式基于Subject这个概念,Subject是一种特殊对象,又叫做主题或者被观察者。当它改变时那些由它保存的一系列对象将会得到通知,而这一系列对象被称作Observer(观察者)。它们会对外暴漏了一个通知方法(比方说update之类的),当Subject状态发生变化时会调用的这个方法。

观察者模式很适合下面这些场景中的任何一个:

  1. 当你的架构有两个实体类,一个依赖另一个,你想让它们互不影响或者是独立复用它们时。
  2. 当一个变化的对象通知那些与它自身变化相关联的未知数量的对象时。
  3. 当一个变化的对象通知那些无需推断具体类型的对象时。

通常一个观察者模式的类图是这样的:

%title插图%num
Observer

如果你对观察者模式不是很了解,那么强烈建议你先去学习下。关于观察者模式的详细介绍可以参考我之前的文章:设计模式之观察者模式

扩展的观察者模式

在RxJava中主要有4个角色:

  • Observable
  • Subject
  • Observer
  • Subscriber

Observable和Subject是两个“生产”实体,Observer和Subscriber是两个“消费”实体。说直白点Observable对应于观察者模式中的被观察者,而ObserverSubscriber对应于观察者模式中的观察者Subscriber其实是一个实现了Observer的抽象类,后面我们分析源码的时候也会介绍到。Subject比较复杂,以后再分析。

上一篇文章中我们说到RxJava中有个关键概念:事件。观察者Observer和被观察者Observable通过subscribe()方法实现订阅关系。从而Observable 可以在需要的时候发出事件来通知Observer

RxJava如何使用

我自己在学习一种新技术的时候通常喜欢先去了解它是怎么用的,掌握了使用方法后再去深挖其原理。那么我们现在就来说说RxJava到底该怎么用。

*步:创建观察者Observer

Observer<Object> observer = new Observer<Object>() {

    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Object s) {

    }
 };

这么简单,一个观察者Observer创建了!

大兄弟你等等…,你之前那篇观察者模式中不是说观察者只提供一个update方法的吗?这特么怎么有三个?!!

少年勿急,且听我慢慢道来。在普通的观察者模式中观察者一般只会提供一个update()方法用于被观察者的状态发生变化时,用于提供给被观察者调用。而在RxJava中的观察者Observer提供了:onNext()、 onCompleted()onError()三个方法。还记得吗?开篇我们讲过RxJava是基于一种扩展的观察这模式实现,这里多出的onCompleted和onError正是对观察者模式的扩展。ps:onNext就相当于普通观察者模式中的update

RxJava中添加了普通观察者模式缺失的三个功能:

  1. RxJava中规定当不再有新的事件发出时,可以调用onCompleted()方法作为标示;
  2. 当事件处理出现异常时框架自动触发onError()方法;
  3. 同时Observables支持链式调用,从而避免了回调嵌套的问题。

第二步:创建被观察者Observable

Observable.create()方法可以创建一个Observable,使用crate()创建Observable需要一个OnSubscribe对象,这个对象继承Action1。当观察者订阅我们的Observable时,它作为一个参数传入并执行call()函数。

Observable<Object> observable = Observable.create(new               Observable.OnSubscribe<Object>() {
    @Override
    public void call(Subscriber<? super Object> subscriber) {

    }
});

除了create(),just()和from()同样可以创建Observable。看看下面两个例子:

just(T...)将传入的参数依次发送

Observable observable = Observable.just("One", "Two", "Three");
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

from(T[])/from(Iterable<? extends T>)将传入的数组或者Iterable拆分成Java对象依次发送

String[] parameters = {"One", "Two", "Three"};
Observable observable = Observable.from(parameters);
//上面这行代码会依次调用
//onNext("One");
//onNext("Two");
//onNext("Three");
//onCompleted();

第三步:被观察者Observable订阅观察者Observerps:你没看错,不同于普通的观察者模式,这里是被观察者订阅观察者

有了观察者和被观察者,我们就可以通过subscribe()来实现二者的订阅关系了。

observable.subscribe(observer);
%title插图%num
observable.subscribe(observer)

连在一起写就是这样:

Observable.create(new Observable.OnSubscribe<Integer>() {

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 5; i++) {
            subscriber.onNext(i);
        }
        subscriber.onCompleted();
    }

}).subscribe(new Observer<Integer>() {

    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
    }

    @Override
    public void onNext(Integer item) {
        System.out.println("Item is " + item);
    }
});

至此一个完整的RxJava调用就完成了。

兄台,你叨逼叨叨逼叨的说了一大堆,可是我没搞定你特么到底在干啥啊?!!不急,我现在就来告诉你们到底发生了什么。

首先我们使用Observable.create()创建了一个新的Observable<Integer>,并为create()方法传入了一个OnSubscribe,OnSubscribe中包含一个call()方法,一旦我们调用subscribe()订阅后就会自动触发call()方法。call()方法中的参数Subscriber其实就是subscribe()方法中的观察者Observer。我们在call()方法中调用了5次onNext()和1次onCompleted()方法。一套流程周下来以后输出结果就是下面这样的:

Item is 0
Item is 1
Item is 2
Item is 3
Item is 4
onCompleted

看到这里可能你又要说了,大兄弟你别唬我啊!OnSubscribe的call()方法中的参数Subscriber怎么就变成了subscribe()方法中的观察者Observer?!!!这俩儿货明明看起来就是两个不同的类啊。

我们先看看Subscriber这个类:

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    ...
}

从源码中我们可以看到,Subscriber是Observer的一个抽象实现类,所以我首先可以肯定的是Subscriber和Observer类型是一致的。接着往下我们看看subscribe()这个方法:

public final Subscription subscribe(final Observer<? super T> observer) {

    //这里的if判断对于我们要分享的问题没有关联,可以先无视
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    return subscribe(new Subscriber<T>() {

        @Override
        public void onCompleted() {
            observer.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            observer.onError(e);
        }

        @Override
        public void onNext(T t) {
            observer.onNext(t);
        }

    });
}

我们看到subscribe()方法内部首先将传进来的Observer做了一层代理,将它转换成了Subscriber。我们再看看这个方法内部的subscribe()方法:

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

进一步往下追踪看看return后面这段代码到底做了什么。精简掉其他无关代码后的subscribe(subscriber, this)方法是这样的:

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

    subscriber.onStart();
    try {
        hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
        return hook.onSubscribeReturn(subscriber);
    } catch (Throwable e) {
        return Subscriptions.unsubscribed();
    }
}

我们重点看看hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber),前面这个hook.onSubscribeStart(observable, observable.onSubscribe)返回的是它自己括号内的第二个参数observable.onSubscribe,然后调用了它的call方法。而这个observable.onSubscribe正是create()方法中的Subscriber,这样整个流程就理顺了。看到这里是不是对RxJava的执行流程清晰了一点呢?这里也建议大家在学习新技术的时候多去翻一翻源码,知其然还要能知其所以然不是吗。

subscribe()的参数除了可以是Observer和Subscriber以外还可以是Action1、Action0;这是一种更简单的回调,只有一个call(T)方法;由于太简单这里就不做详细介绍了!

异步

上一篇文章中开篇就讲到RxJava就是来处理异步任务的。但是默认情况下我们在哪个线程调用subscribe()就在哪个线程生产事件,在哪个线程生产事件就在哪个线程消费事件。那怎么做到异步呢?RxJava为我们提供Scheduler用来做线程调度,我们来看看RxJava提供了哪些Scheduler。

%title插图%num

同时RxJava还为我们提供了subscribeOn()observeOn()两个方法来指定Observable和Observer运行的线程。

Observable.from(getCommunitiesFromServer())
            .flatMap(community -> Observable.from(community.houses))
            .filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::addHouseInformationToScreen);

上面这段代码大家应该有印象吧,没错正是我们上一篇文章中的例子。subscribeOn(Schedulers.io())指定了获取小区列表、处理房源信息等一系列事件都是在IO线程中运行,observeOn(AndroidSchedulers.mainThread())指定了在屏幕上展示房源的操作在UI线程执行。这就做到了在子线程获取房源,主线程展示房源。

好了,RxJava系列的入门内容我们就聊到这。下一篇我们再继续介绍更多的API以及它们内部的原理。

RxJava 简介

前言

提升开发效率,降低维护成本一直是开发团队永恒不变的宗旨。近一年来国内的技术圈子中越来越多的开始提及Rx,经过一段时间的学习和探索之后我也深深的感受到了RxJava的魅力。它能帮助我们简化代码逻辑,提升代码可读性。这对于开发效率的提升、后期维护成本的降低帮助都是巨大的。个人预测RxJava一定是2016年的一个大趋势,所以也有打算将它引入到公司现有的项目中来,写这一系列的文章主要也是为了团队内部做技术分享。

由于我本人是个Android程序猿,因此这一系列文章中的场景都是基于Android平台的。如果你是个Java Web工程师或者是其它方向的那也没关系,我会尽量用通俗的语言将问题描述清楚。

响应式编程

在介绍RxJava前,我们先聊聊响应式编程。那么什么是响应式编程呢?响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

响应式编程的一个关键概念是事件。事件可以被等待,可以触发过程,也可以触发其它事件。事件是唯一的以合适的方式将我们的现实世界映射到我们的软件中:如果屋里太热了我们就打开一扇窗户。同样的,当我们的天气app从服务端获取到新的天气数据后,我们需要更新app上展示天气信息的UI;汽车上的车道偏移系统探测到车辆偏移了正常路线就会提醒驾驶者纠正,就是是响应事件。

今天,响应式编程*通用的一个场景是UI:我们的移动App必须做出对网络调用、用户触摸输入和系统弹框的响应。在这个世界上,软件之所以是事件驱动并响应的是因为现实生活也是如此。

本章节中部分概念摘自《RxJava Essentials》一书

RxJava的来历

Rx是微软.Net的一个响应式扩展,Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。2012年Netflix为了应对不断增长的业务需求开始将.NET Rx迁移到JVM上面。并于13年二月份正式向外展示了RxJava。
从语义的角度来看,RxJava就是.NET Rx。从语法的角度来看,Netflix考虑到了对应每个Rx方法,保留了Java代码规范和基本的模式。

%title插图%num
RxJava来历

什么是RxJava

那么到底什么是RxJava呢?我对它的定义是:RxJava本质上是一个异步操作库,是一个能让你用*其简洁的逻辑去处理繁琐复杂任务的异步事件库。

RxJava好在哪

Android平台上为已经开发者提供了AsyncTask,Handler等用来做异步操作的类库,那我们为什么还要选择RxJava呢?答案是简洁!RxJava可以用非常简洁的代码逻辑来解决复杂问题;而且即使业务逻辑的越来越复杂,它依然能够保持简洁!再配合上Lambda用简单的几行代码分分钟就解决你负责的业务问题。简直逼格爆表,拿它装逼那是*好的!

多说无益,上代码!

假设我们安居客用户App上有个需求,需要从服务端拉取上海浦东新区塘桥板块的所有小区Community[] communities,每个小区下包含多套房源List<House> houses;我们需要把塘桥板块的所有总价大于500W的房源都展示在App的房源列表页。用于从服务端拉取communities需要发起网络请求,比较耗时,因此需要在后台运行。而这些房源信息需要展示到App的页面上,因此需要在UI线程上执行。(此例子思路来源于扔物线的给Android开发者的RxJava详解一文)

new Thread() {
        @Override
        public void run() {
            super.run();
            //从服务端获取小区列表
            List<Community> communities = getCommunitiesFromServer();
            for (Community community : communities) {
                List<House> houses = community.houses;
                for (House house : houses) {
                    if (house.price >= 5000000) {
                        runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                //将房子的信息添加到屏幕上
                                addHouseInformationToScreen(house);
                            }
                        });
                    }
                }
            }
        }
    }.start();

使用RxJava的写法是这样的:

Observable.from(getCommunitiesFromServer())
            .flatMap(new Func1<Community, Observable<House>>() {
                @Override
                public Observable<House> call(Community community) {
                    return Observable.from(community.houses);
                }
            }).filter(new Func1<House, Boolean>() {
                @Override
                public Boolean call(House house) {
                    return house.price>=5000000;
                }
            }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<House>() {
                @Override
                public void call(House house) {
                    //将房子的信息添加到屏幕上
                    addHouseInformationToScreen(house);
                }
            });

从上面这段代码我们可以看到:虽然代码量看起来变复杂了,但是RxJava的实现是一条链式调用,没有任何的嵌套;整个实现逻辑看起来异常简洁清晰,这对我们的编程实现和后期维护是有巨大帮助的。特别是对于那些回调嵌套的场景。配合Lambda表达式还可以简化成这样:

  Observable.from(getCommunitiesFromServer())
            .flatMap(community -> Observable.from(community.houses))
            .filter(house -> house.price>=5000000).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::addHouseInformationToScreen);

简洁!有美感!这才是一个有情怀的程序员应该写出来的代码。

看完这篇文章大家应该能够理解RxJava为什么会越来越火了。它能*大的提高我们的开发效率和代码的可读性!当然了RxJava的学习曲线也是比较陡的,在后面的文章我会对主要的知识点做详细的介绍,敬请关注!

Android 开发者必看的 RxJava 详解

前言

我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 。而*近这几个月,我也发现国内越来越多的人开始提及 RxJava 。有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava 到底是什么?

鉴于 RxJava 目前这种既火爆又神秘的现状,而我又在一年的使用过程中对 RxJava 有了一些理解,我决定写下这篇文章来对 RxJava 做一个相对详细的、针对 Android 开发者的介绍。

这篇文章的目的有两个: 1. 给对 RxJava 感兴趣的人一些入门的指引 2. 给正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析

  • RxJava 到底是什么
  • RxJava 好在哪
  • API 介绍和原理简析
    • 1. 概念:扩展的观察者模式
      • 观察者模式
      • RxJava 的观察者模式
    • 2. 基本实现
      • 1) 创建 Observer
      • 2) 创建 Observable
      • 3) Subscribe (订阅)
      • 4) 场景示例
        • a. 打印字符串数组
        • b. 由 id 取得图片并显示
    • 3. 线程控制 —— Scheduler (一)
      • 1) Scheduler 的 API (一)
      • 2) Scheduler 的原理 (一)
    • 4. 变换
      • 1) API
      • 2) 变换的原理:lift()
      • 3) compose: 对 Observable 整体的变换
    • 5. 线程控制:Scheduler (二)
      • 1) Scheduler 的 API (二)
      • 2) Scheduler 的原理(二)
      • 3) 延伸:doOnSubscribe()
  • RxJava 的适用场景和使用方式
    • 1. 与 Retrofit 的结合
    • 2. RxBinding
    • 3. 各种异步操作
    • 4. RxBus
  • *后
    • 关于作者:
    • 为什么写这个?

在正文开始之前的*后,放上 GitHub 链接和引入依赖的 gradle 代码: Github:
https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
引入依赖:
compile 'io.reactivex:rxjava:1.0.14'
compile 'io.reactivex:rxandroid:1.0.1'
(版本号是文章发布时的*新稳定版)

另外,感谢 RxJava 核心成员流火枫林的技术支持和内测读者代码家、鲍永章、drakeet、马琳、有时放纵、程序亦非猿、大头鬼、XZoomEye、席德雨、TCahead、Tiiime、Ailurus、宅学长、妖孽、大大大大大臣哥、NicodeLee的帮助,以及周伯通招聘的赞助。

RxJava 到底是什么

一个词:异步

RxJava 在 GitHub 主页上的自我介绍是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

然而,对于初学者来说,这太难看懂了。因为它是一个『总结』,而初学者更需要一个『引言』。

其实, RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,而别的定语都是基于这之上的。

RxJava 好在哪

换句话说,『同样是做异步,为什么人们用它,而不用现成的 AsyncTask / Handler / XXX / … ?』

一个词:简洁

异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。

举个例子

假设有这样一个需求:界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。常用的实现方式有多种,我这里贴出其中一种:

new Thread() {
    @Override
    public void run() {
        super.run();
        for (File folder : folders) {
            File[] files = folder.listFiles();
            for (File file : files) {
                if (file.getName().endsWith(".png")) {
                    final Bitmap bitmap = getBitmapFromFile(file);
                    getActivity().runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            imageCollectorView.addImage(bitmap);
                        }
                    });
                }
            }
        }
    }
}.start();

而如果使用 RxJava ,实现方式是这样的:

Observable.from(folders)
    .flatMap(new Func1<File, Observable<File>>() {
        @Override
        public Observable<File> call(File file) {
            return Observable.from(file.listFiles());
        }
    })
    .filter(new Func1<File, Boolean>() {
        @Override
        public Boolean call(File file) {
            return file.getName().endsWith(".png");
        }
    })
    .map(new Func1<File, Bitmap>() {
        @Override
        public Bitmap call(File file) {
            return getBitmapFromFile(file);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            imageCollectorView.addImage(bitmap);
        }
    });

那位说话了:『你这代码明明变多了啊!简洁个毛啊!』大兄弟你消消气,我说的是逻辑的简洁,不是单纯的代码量少(逻辑简洁才是提升读写代码速度的必杀技对不?)。观察一下你会发现, RxJava 的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显(试想如果还要求只选取前 10 张图片,常规方式要怎么办?如果有更多这样那样的要求呢?再试想,在这一大堆需求实现完两个月之后需要改功能,当你翻回这里看到自己当初写下的那一片迷之缩进,你能保证自己将迅速看懂,而不是对着代码重新捋一遍思路?)。

另外,如果你的 IDE 是 Android Studio ,其实每次打开某个 Java 文件的时候,你会看到被自动 Lambda 化的预览,这将让你更加清晰地看到程序逻辑:

Observable.from(folders)
    .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
    .filter((Func1) (file) -> { file.getName().endsWith(".png") })
    .map((Func1) (file) -> { getBitmapFromFile(file) })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

如果你习惯使用 Retrolambda ,你也可以直接把代码写成上面这种简洁的形式。而如果你看到这里还不知道什么是 Retrolambda ,我不建议你现在就去学习它。原因有两点:1. Lambda 是把双刃剑,它让你的代码简洁的同时,降低了代码的可读性,因此同时学习 RxJava 和 Retrolambda 可能会让你忽略 RxJava 的一些技术细节;2. Retrolambda 是 Java 6/7 对 Lambda 表达式的非官方兼容方案,它的向后兼容性和稳定性是无法保障的,因此对于企业项目,使用 Retrolambda 是有风险的。所以,与很多 RxJava 的推广者不同,我并不推荐在学习 RxJava 的同时一起学习 Retrolambda。事实上,我个人虽然很欣赏 Retrolambda,但我从来不用它。

在Flipboard 的 Android 代码中,有一段逻辑非常复杂,包含了多次内存操作、本地文件操作和网络操作,对象分分合合,线程间相互配合相互等待,一会儿排成人字,一会儿排成一字。如果使用常规的方法来实现,肯定是要写得欲仙欲死,然而在使用 RxJava 的情况下,依然只是一条链式调用就完成了。它很长,但很清晰。

所以, RxJava 好在哪?就好在简洁,好在那把什么复杂逻辑都能穿成一条线的简洁。

API 介绍和原理简析

这个我就做不到一个词说明了……因为这一节的主要内容就是一步步地说明 RxJava 到底怎样做到了异步,怎样做到了简洁。

1. 概念:扩展的观察者模式

RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。

观察者模式

先简述一下观察者模式,已经熟悉的可以跳过这一段。

观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。 Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到*高的反馈速度。当然,这也得益于我们可以随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』。

OnClickListener 的模式大致如下图:

OnClickListener 观察者模式

如图所示,通过 setOnClickListener() 方法,Button 持有 OnClickListener 的引用(这一过程没有在图上画出);当用户点击时,Button 自动调用 OnClickListener 的 onClick() 方法。另外,如果把这张图中的概念抽象出来(Button -> 被观察者、OnClickListener -> 观察者、setOnClickListener() -> 订阅,onClick() -> 事件),就由专用的观察者模式(例如只用于监听控件点击)转变成了通用的观察者模式。如下图:

通用观察者模式

而 RxJava 作为一个工具库,使用的就是通用形式的观察者模式。

RxJava 的观察者模式

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()

  • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
  • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  • 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的*后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

RxJava 的观察者模式大致如下图:

RxJava 的观察者模式

2. 基本实现

基于以上的概念, RxJava 的基本实现主要有三点:

1) 创建 Observer

Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:

  1. onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
  2. unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以*好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
2) 创建 Observable

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

这个例子很简单:事件的内容是字符串,而不是一些复杂的对象;事件的内容是已经定好了的,而不像有的观察者模式一样是待确定的(例如网络请求的结果在请求返回之前是未知的);所有事件在一瞬间被全部发送出去,而不是夹杂一些确定或不确定的时间间隔或者经过某种触发器来触发的。总之,这个例子看起来毫无实用价值。但这是为了便于说明,实质上只要你想,各种各样的事件发送规则你都可以自己来写。至于具体怎么做,后面都会讲到,但现在不行。只有把基础原理先说明白了,上层的运用才能更容易说清楚。

create() 方法是 RxJava *基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如:

  • just(T...): 将传入的参数依次发送出来。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
  • from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等价的。

3) Subscribe (订阅)

创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

有人可能会注意到, subscribe() 这个方法有点怪:它看起来是『observalbe 订阅了 observer / subscriber』而不是『observer / subscriber 订阅了 observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成 observer.subscribe(observable) / subscriber.subscribe(observable) ,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。

Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

可以看到,subscriber() 做了3件事:

  1. 调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
  2. 调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
  3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().

整个过程中对象间的关系如下图:

关系静图

或者可以看动图:

关系静图

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。形式如下:

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的 Action1 和 Action0。 Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。 Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用*广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2Action3) 的,它们可以被用以包装不同的无返回值的方法。

注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 过程中*终会被转换成 Subscriber 对象,因此,从这里开始,后面的描述我将用 Subscriber 来代替 Observer ,这样更加严谨。

4) 场景示例

下面举两个例子:

为了把原理用更清晰的方式表述出来,本文中挑选的都是功能尽可能简单的例子,以至于有些示例代码看起来会有『画蛇添足』『明明不用 RxJava 可以更简便地解决问题』的感觉。当你看到这种情况,不要觉得是因为 RxJava 太啰嗦,而是因为在过早的时候举出真实场景的例子并不利于原理的解析,因此我刻意挑选了简单的情景。

a. 打印字符串数组

将字符串数组 names 中的所有字符串依次打印出来:

String[] names = ...;
Observable.from(names)
    .subscribe(new Action1<String>() {
        @Override
        public void call(String name) {
            Log.d(tag, name);
        }
    });
b. 由 id 取得图片并显示

由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中,并在出现异常的时候打印 Toast 报错:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
}).subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

正如上面两个例子这样,创建出 Observable 和 Subscriber ,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完成了。非常简单。

然而,

这并没有什么diao用

在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler 。

3. 线程控制 —— Scheduler (一)

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

1) Scheduler 的 API (一)

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

文字叙述总归难理解,上代码:

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

上面这段代码中,由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1234 将会在 IO 线程发出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 数字的打印将发生在主线程 。事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

而前面提到的由图片 id 取得图片并显示的例子,如果也加上这两句:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
    @Override
    public void call(Subscriber<? super Drawable> subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Observer<Drawable>() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

那么,加载图片将会发生在 IO 线程,而设置图片则被设定在了主线程。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。

2) Scheduler 的原理 (一)

RxJava 的 Scheduler API 很方便,也很神奇(加了一句话就把线程切换了,怎么做到的?而且 subscribe() 不是*外层直接调用的方法吗,它竟然也能被指定线程?)。然而 Scheduler 的原理需要放在后面讲,因为它的原理是以下一节《变换》的原理作为基础的。

好吧这一节其实我屁也没说,只是为了让你安心,让你知道我不是忘了讲原理,而是把它放在了更合适的地方。

4. 变换

终于要到牛逼的地方了,不管你激动不激动,反正我是激动了。

RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的*大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 API。

1) API

首先看一个 map() 的例子:

Observable.just("images/logo.png") // 输入类型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 参数类型 String
            return getBitmapFromPath(filePath); // 返回类型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 参数类型 Bitmap
            showBitmap(bitmap);
        }
    });

这里出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。

可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。这种直接变换对象并返回的,是*常见的也*容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。我列举几个常用的变换:

  • map(): 事件对象的直接变换,具体功能上面已经介绍过。它是 RxJava *常用的变换。 map() 的示意图: map() 示意图
  • flatMap(): 这是一个很有用但非常难理解的变换,因此我决定花多些篇幅来介绍它。 首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Func1<Student, String>() {
        @Override
        public String call(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);

很简单。那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:

Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
    @Override
    public void onNext(Student student) {
        List<Course> courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
    ...
};
Observable.from(students)
    .subscribe(subscriber);

依然很简单。那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢?

这个时候,就需要用 flatMap() 了:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

flatMap() 示意图:

flatMap() 示意图

扩展:由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):

networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // 处理显示消息列表
            showMessages(messages);
        }
    });

传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。

  • throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器: RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms .subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。

此外, RxJava 还提供很多便捷的方法来实现事件序列的变换,这里就不一一举例了。

2) 变换的原理:lift()

这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):

// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——

  • subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。
  • 当含有 lift() 时:
    1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;
    2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe
    3.当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe
    4.而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 SubscriberOperator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。
    这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber

如果你更喜欢具象思维,可以看图:

lift() 原理图

或者可以看动图:

lift 原理动图

两次和多次的 lift() 同理,如下图:

两次 lift

举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子,仅供参考:

observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 将事件序列中的 Integer 对象转换为 String 对象
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});

讲述 lift() 的原理只是为了让你更好地了解 RxJava ,从而可以更好地使用它。然而不管你是否理解了 lift() 的原理,RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。

3) compose: 对 Observable 整体的变换

除了 lift() 之外, Observable 还有一个变换方法叫做 compose(Transformer)。它和 lift() 的区别在于, lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。你可以这么写:

observable1
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);
observable2
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber2);
observable3
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber3);
observable4
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);

你觉得这样太不软件工程了,于是你改成了这样:

private Observable liftAll(Observable observable) {
    return observable
        .lift1()
        .lift2()
        .lift3()
        .lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);

可读性、可维护性都提高了。可是 Observable 被一个方法包起来,这种方式对于 Observale 的灵活性似乎还是增添了那么点限制。怎么办?这个时候,就应该用 compose() 来解决了:

public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()
            .lift4();
    }
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);

像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。

compose() 的原理比较简单,不附图喽。

5. 线程控制:Scheduler (二)

除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。

1) Scheduler 的 API (二)

前面讲到了,可以利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap() 等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?

答案是:能。因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。上代码:

Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新线程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 线程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。

不过,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn() 呢?会有什么效果?

这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。

2) Scheduler 的原理(二)

其实, subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程):

subscribeOn() 原理图:

subscribeOn() 原理

observeOn() 原理图:

observeOn() 原理

从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 “schedule…” 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。

*后,我用一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整):

线程控制综合调用

图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受*个 subscribeOn() 影响,运行在红色线程;③和④处受*个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被*个 subscribeOn() 截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个 subscribeOn() 的时候,只有*个 subscribeOn() 起作用。

3) 延伸:doOnSubscribe()

然而,虽然超过一个的 subscribeOn() 对事件处理的流程没有影响,但在流程之前却是可以利用的。

在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart()由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。

而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它*近的 subscribeOn() 所指定的线程。

示例代码:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。

RxJava 的适用场景和使用方式

1. 与 Retrofit 的结合

Retrofit 是 Square 的一个著名的网络请求库。没有用过 Retrofit 的可以选择跳过这一小节也没关系,我举的每种场景都只是个例子,而且例子之间并无前后关联,只是个抛砖引玉的作用,所以你跳过这里看别的场景也可以的。

Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本的 Observable 形式 API。下面我用对比的方式来介绍 Retrofit 的 RxJava 版 API 和传统版本的区别。

以获取一个 User 对象的接口作为例子。使用Retrofit 的传统 API,你可以用这样的方式来定义请求:

@GET("/user")
public void getUser(@Query("userId") String userId, Callback<User> callback);

在程序的构建过程中, Retrofit 会把自动把方法实现并生成代码,然后开发者就可以利用下面的方法来获取特定用户并处理响应:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        userView.setUser(user);
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

而使用 RxJava 形式的 API,定义同样的请求是这样的:

@GET("/user")
public Observable<User> getUser(@Query("userId") String userId);

使用的时候是这样的:

getUser(userId)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

看到区别了吗?

当 RxJava 形式的时候,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 或在请求失败后调用 onError()

对比来看, Callback 形式和 Observable 形式长得不太一样,但本质都差不多,而且在细节上 Observable 形式似乎还比 Callback 形式要差点。那 Retrofit 为什么还要提供 RxJava 的支持呢?

因为它好用啊!从这个例子看不出来是因为这只是*简单的情况。而一旦情景复杂起来, Callback 形式马上就会开始让人头疼。比如:

假设这么一种情况:你的程序取到的 User 并不应该直接显示,而是需要先与数据库中的数据进行比对和修正后再显示。使用 Callback 方式大概可以这么写:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        processUser(user); // 尝试修正 User 数据
        userView.setUser(user);
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

有问题吗?

很简便,但不要这样做。为什么?因为这样做会影响性能。数据库的操作很重,一次读写操作花费 10~20ms 是很常见的,这样的耗时很容易造成界面的卡顿。所以通常情况下,如果可以的话一定要避免在主线程中处理数据库。所以为了提升性能,这段代码可以优化一下:

getUser(userId, new Callback<User>() {
    @Override
    public void success(User user) {
        new Thread() {
            @Override
            public void run() {
                processUser(user); // 尝试修正 User 数据
                runOnUiThread(new Runnable() { // 切回 UI 线程
                    @Override
                    public void run() {
                        userView.setUser(user);
                    }
                });
            }).start();
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
};

性能问题解决,但……这代码实在是太乱了,迷之缩进啊!杂乱的代码往往不仅仅是美观问题,因为代码越乱往往就越难读懂,而如果项目中充斥着杂乱的代码,无疑会降低代码的可读性,造成团队开发效率的降低和出错率的升高。

这时候,如果用 RxJava 的形式,就好办多了。 RxJava 形式的代码是这样的:

getUser(userId)
    .doOnNext(new Action1<User>() {
        @Override
        public void call(User user) {
            processUser(user);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

后台代码和前台代码全都写在一条链中,明显清晰了很多。

再举一个例子:假设 /user 接口并不能直接访问,而需要填入一个在线获取的 token ,代码应该怎么写?

Callback 方式,可以使用嵌套的 Callback

@GET("/token")
public void getToken(Callback<String> callback);

@GET("/user")
public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);

...

getToken(new Callback<String>() {
    @Override
    public void success(String token) {
        getUser(token, userId, new Callback<User>() {
            @Override
            public void success(User user) {
                userView.setUser(user);
            }

            @Override
            public void failure(RetrofitError error) {
                // Error handling
                ...
            }
        };
    }

    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
});

倒是没有什么性能问题,可是迷之缩进毁一生,你懂我也懂,做过大项目的人应该更懂。

而使用 RxJava 的话,代码是这样的:

@GET("/token")
public Observable<String> getToken();

@GET("/user")
public Observable<User> getUser(@Query("token") String token, @Query("userId") String userId);

...

getToken()
    .flatMap(new Func1<String, Observable<User>>() {
        @Override
        public Observable<User> onNext(String token) {
            return getUser(token, userId);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

用一个 flatMap() 就搞定了逻辑,依然是一条链。看着就很爽,是吧?

2016/03/31 更新,加上我写的一个 Sample 项目:
rengwuxian RxJava Samples

好,Retrofit 部分就到这里。

2. RxBinding

RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API。所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API。

举个设置点击监听的例子。使用 RxBinding ,可以把事件监听用这样的方法来设置:

Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
    .subscribe(new Action1<ViewClickEvent>() {
        @Override
        public void call(ViewClickEvent event) {
            // Click handling
        }
    });

看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener() 来实现的。然而,仅仅这一个形式的改变,却恰好就是 RxBinding 的目的:扩展性。通过 RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。扩展的方式有很多,根据需求而定。一个例子是前面提到过的 throttleFirst() ,用于去抖动,也就是消除手抖导致的快速连环点击:

RxView.clickEvents(button)
    .throttleFirst(500, TimeUnit.MILLISECONDS)
    .subscribe(clickAction);

如果想对 RxBinding 有更多了解,可以去它的 GitHub 项目 下面看看。

3. 各种异步操作

前面举的 Retrofit 和 RxBinding 的例子,是两个可以提供现成的 Observable 的库。而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现,有了之前几章的例子,这里应该不用再举例了。

4. RxBus

RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用 Otto 或者 GreenRobot 的 EventBus。至于什么是 RxBus,可以看这篇文章。顺便说一句,Flipboard 已经用 RxBus 替换掉了 Otto ,目前为止没有不良反应。

*后

对于 Android 开发者来说, RxJava 是一个很难上手的库,因为它对于 Android 开发者来说有太多陌生的概念了。可是它真的很牛逼。因此,我写了这篇《给 Android 开发者的 RxJava 详解》,希望能给始终搞不明白什么是 RxJava 的人一些入门的指引,或者能让正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析。无论如何,只要能给各位同为 Android 工程师的你们提供一些帮助,这篇文章的目的就达到了。

 

为什么写这个?

与两三年前的境况不同,中国现在已经不缺初级 Android 工程师,但中级和高级工程师严重供不应求。因此我决定从今天开始不定期地发布我的技术分享,只希望能够和大家共同提升,通过我们的成长来解决一点点国内互联网公司人才稀缺的困境,也提升各位技术党的收入。所以,不仅要写这篇,我还会写更多。至于内容的定位,我计划只定位真正的干货,一些边边角角的小技巧和炫酷的黑科技应该都不会写,总之希望每篇文章都能帮读者提升真正的实力。

深入浅出RxJava这一篇就够了

前言:

*次接触RxJava是在前不久,一个新Android项目的启动,在评估时选择了RxJava。RxJava是一个基于事件订阅的异步执行的一个类库。听起来有点复杂,其实是要你使用过一次,就会大概明白它是怎么回事了!为是什么一个Android项目启动会联系到RxJava呢?因为在RxJava使用起来得到广泛的认可,又是基于Java语言的。自然会有善于组织和总结的开发者联想到Android!没错,RxAndroid就这样在RxJava的基础上,针对Android开发的一个库。今天我们主要是来讲解一下RxJava,在接下来的几篇博客中我会陆续带大家来认识RxAndroid,Retrofit框架的使用,这些都是目前比较火的一些技术框架!

这里是Github上RxJava的项目地址:https://github.com/ReactiveX/RxJava

技术文档Api:http://reactivex.io/RxJava/javadoc/

 

官方的介绍

1.支持Java6+

2.android 2.3+

3.异步的

4.基于观察者设计模式(Observer、Observable)不懂设计模式的可以移步到此:浅谈Java设计模式(十五)观察者模式(Observer)

5.Subscribe (订阅)

 

正式使用RxJava

用框架或者库都是为了简洁、方便,RxJava也不例外它能使你的代码逻辑更加的简洁。举个例子之前我们先来引入依赖的 gradle 代码:

[java] view plain copy
  1. compile ‘io.reactivex:rxjava:1.0.14’   
  2. compile ‘io.reactivex:rxandroid:1.0.1’   

既然是基于异步,当然要在处理比较耗时的操作上才能彰显它的优势!现在我们假设有这样一个需求:
需要实现一个多个下载的图片并且显示的功能,它的作用可以添加多个下载操作,由于下载这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。常用的实现方式有多种,我这里贴出其中一种:

[java] view plain copy
  1. new Thread() {  
  2.     @Override  
  3.     public void run() {  
  4.         super.run();  
  5.         for (File folder : folders) {  
  6.             File[] files = folder.listFiles();
  7.             for (File file : files) {  
  8.                 if (file.getName().endsWith(“.png”)) {  
  9.                     final Bitmap bitmap = getBitmapFromFile(file);  
  10.                     getActivity().runOnUiThread(new Runnable() {  
  11.                         @Override  
  12.                         public void run() {  
  13.                             imageCollectorView.addImage(bitmap);
  14.                         }
  15.                     });
  16.                 }
  17.             }
  18.         }
  19.     }
  20. }.start();

里面的判断是不是看起来有点晕晕,当然这是我自己写的,我一眼就能看清楚里面的逻辑,但是如果换做是别人来阅读你的代码,这就比较的尴尬了!
我们来看看使用RxJava的代码:

[java] view plain copy
  1. Observable.from(folders)
  2.     .flatMap(new Func1<File, Observable<File>>() {  
  3.         @Override  
  4.         public Observable<File> call(File file) {  
  5.             return Observable.from(file.listFiles());  
  6.         }
  7.     })
  8.     .filter(new Func1<File, Boolean>() {  
  9.         @Override  
  10.         public Boolean call(File file) {  
  11.             return file.getName().endsWith(“.png”);  
  12.         }
  13.     })
  14.     .map(new Func1<File, Bitmap>() {  
  15.         @Override  
  16.         public Bitmap call(File file) {  
  17.             return getBitmapFromFile(file);  
  18.         }
  19.     })
  20.     .subscribeOn(Schedulers.io())
  21.     .observeOn(AndroidSchedulers.mainThread())
  22.     .subscribe(new Action1<Bitmap>() {  
  23.         @Override  
  24.         public void call(Bitmap bitmap) {  
  25.             imageCollectorView.addImage(bitmap);
  26.         }
  27.     });

是不是明了,虽然说算不上简单,但是习惯了就一如既往了!

如果你使用的AndroidStudio的话,你打开Java文件的时候,你会看到被自动 Lambda 化的预览,这将让你更加清晰地看到程序逻辑:

[java] view plain copy
  1. Observable.from(folders)
  2.     .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
  3.     .filter((Func1) (file) -> { file.getName().endsWith(“.png”) })  
  4.     .map((Func1) (file) -> { getBitmapFromFile(file) })
  5.     .subscribeOn(Schedulers.io())
  6.     .observeOn(AndroidSchedulers.mainThread())
  7.     .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

不过如果你对Java8还不是很了解的话呢这一段可以暂时忽略,但是你可以移步到这里了解一下Java8:Java8部分新特性介绍

看完代码,是不是有种相见恨晚的冲动?别急,我们来慢慢了解RxJava!

 

前面已经提到他是基于Java观察者设计模式的,这个模式上面有给大家链接,可以去看看,这里不不坐过多的介绍,我们来介绍一下RxJava中的观察者模式:
RxJava 的观察者模式

一、说明
1)RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
2)与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
3)onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
4)onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
5)在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的*后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

二、实现
1) 创建 Observer
Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:

[java] view plain copy
  1. Observer<String> observer = new Observer<String>() {  
  2.     @Override  
  3.     public void onNext(String s) {  
  4.         Log.d(tag, “Item: ” + s);  
  5.     }
  6.     @Override  
  7.     public void onCompleted() {  
  8.         Log.d(tag, “Completed!”);  
  9.     }
  10.     @Override  
  11.     public void onError(Throwable e) {  
  12.         Log.d(tag, “Error!”);  
  13.     }
  14. };

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:

[java] view plain copy
  1. Subscriber<String> subscriber = new Subscriber<String>() {  
  2.     @Override  
  3.     public void onNext(String s) {  
  4.         Log.d(tag, “Item: ” + s);  
  5.     }
  6.     @Override  
  7.     public void onCompleted() {  
  8.         Log.d(tag, “Completed!”);  
  9.     }
  10.     @Override  
  11.     public void onError(Throwable e) {  
  12.         Log.d(tag, “Error!”);  
  13.     }
  14. };

不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:
onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以*好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

2) 创建 Observable
Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

[java] view plain copy
  1. Observable observable = Observable.create(new Observable.OnSubscribe<String>() {  
  2.     @Override  
  3.     public void call(Subscriber<? super String> subscriber) {  
  4.         subscriber.onNext(“Hello”);  
  5.         subscriber.onNext(“Hi”);  
  6.         subscriber.onNext(“Aloha”);  
  7.         subscriber.onCompleted();
  8.     }
  9. });

可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式

create() 方法是 RxJava *基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如:
just(T…): 将传入的参数依次发送出来。

[java] view plain copy
  1. Observable observable = Observable.just(“Hello”, “Hi”, “Aloha”);  
  2. // 将会依次调用:  
  3. // onNext(“Hello”);  
  4. // onNext(“Hi”);  
  5. // onNext(“Aloha”);  
  6. // onCompleted();  

from(T[]) / from(Iterable<? extends T>) : 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。

[java] view plain copy
  1. String[] words = {“Hello”, “Hi”, “Aloha”};  
  2. Observable observable = Observable.from(words);
  3. // 将会依次调用:  
  4. // onNext(“Hello”);  
  5. // onNext(“Hi”);  
  6. // onNext(“Aloha”);  
  7. // onCompleted();  

上面 just(T…) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等价的。

3) Subscribe (订阅)
创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

[java] view plain copy
  1. observable.subscribe(observer);
  2. // 或者:  
  3. observable.subscribe(subscriber);

Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

[java] view plain copy
  1. // 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。  
  2. // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。  
  3. public Subscription subscribe(Subscriber subscriber) {  
  4.     subscriber.onStart();
  5.     onSubscribe.call(subscriber);
  6.     return subscriber;  
  7. }

可以看到,subscriber() 做了3件事:
1.调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
2.调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
3.将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。形式如下:

[java] view plain copy
  1. Action1<String> onNextAction = new Action1<String>() {  
  2.     // onNext()  
  3.     @Override  
  4.     public void call(String s) {  
  5.         Log.d(tag, s);
  6.     }
  7. };
  8. Action1<Throwable> onErrorAction = new Action1<Throwable>() {  
  9.     // onError()  
  10.     @Override  
  11.     public void call(Throwable throwable) {  
  12.         // Error handling  
  13.     }
  14. };
  15. Action0 onCompletedAction = new Action0() {  
  16.     // onCompleted()  
  17.     @Override  
  18.     public void call() {  
  19.         Log.d(tag, “completed”);  
  20.     }
  21. };
  22. // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()  
  23. observable.subscribe(onNextAction);
  24. // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()  
  25. observable.subscribe(onNextAction, onErrorAction);
  26. // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()  
  27. observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

简单解释一下这段代码中出现的 Action1 和 Action0。 Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。 Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用*广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。

4) 场景示例

下面举两个例子:
a. 打印字符串数组
将字符串数组 names 中的所有字符串依次打印出来:

[java] view plain copy
  1. String[] names = …;
  2. Observable.from(names)
  3.     .subscribe(new Action1<String>() {  
  4.         @Override  
  5.         public void call(String name) {  
  6.             Log.d(tag, name);
  7.         }
  8.     });

b. 由 id 取得图片并显示
由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中,并在出现异常的时候打印 Toast 报错:

[java] view plain copy
  1. int drawableRes = …;  
  2. ImageView imageView = …;
  3. Observable.create(new OnSubscribe<Drawable>() {  
  4.     @Override  
  5.     public void call(Subscriber<? super Drawable> subscriber) {  
  6.         Drawable drawable = getTheme().getDrawable(drawableRes));
  7.         subscriber.onNext(drawable);
  8.         subscriber.onCompleted();
  9.     }
  10. }).subscribe(new Observer<Drawable>() {  
  11.     @Override  
  12.     public void onNext(Drawable drawable) {  
  13.         imageView.setImageDrawable(drawable);
  14.     }
  15.     @Override  
  16.     public void onCompleted() {  
  17.     }
  18.     @Override  
  19.     public void onError(Throwable e) {  
  20.         Toast.makeText(activity, “Error!”, Toast.LENGTH_SHORT).show();  
  21.     }
  22. });

正如上面两个例子这样,创建出 Observable 和 Subscriber ,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完成了。非常简单。

注意:在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler 。

线程控制 —— Scheduler (一)
前言:

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

1) Scheduler 的 API (一)
在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

代码来理解上面的文字叙述:

[java] view plain copy
  1. Observable.just(1, 2, 3, 4)  
  2.     .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程  
  3.     .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程  
  4.     .subscribe(new Action1<Integer>() {  
  5.         @Override  
  6.         public void call(Integer number) {  
  7.             Log.d(tag, “number:” + number);  
  8.         }
  9.     });

上面这段代码中,由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1、2、3、4 将会在 IO 线程发出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 数字的打印将发生在主线程 。事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

而前面提到的由图片 id 取得图片并显示的例子,如果也加上这两句:

[java] view plain copy
  1. int drawableRes = …;  
  2. ImageView imageView = …;
  3. Observable.create(new OnSubscribe<Drawable>() {  
  4.     @Override  
  5.     public void call(Subscriber<? super Drawable> subscriber) {  
  6.         Drawable drawable = getTheme().getDrawable(drawableRes));
  7.         subscriber.onNext(drawable);
  8.         subscriber.onCompleted();
  9.     }
  10. })
  11. .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程  
  12. .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程  
  13. .subscribe(new Observer<Drawable>() {  
  14.     @Override  
  15.     public void onNext(Drawable drawable) {  
  16.         imageView.setImageDrawable(drawable);
  17.     }
  18.     @Override  
  19.     public void onCompleted() {  
  20.     }
  21.     @Override  
  22.     public void onError(Throwable e) {  
  23.         Toast.makeText(activity, “Error!”, Toast.LENGTH_SHORT).show();  
  24.     }
  25. });

那么,加载图片将会发生在 IO 线程,而设置图片则被设定在了主线程。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。

2) Scheduler 的原理 (一)
RxJava 的 Scheduler API 很方便,也很神奇(加了一句话就把线程切换了,怎么做到的?而且 subscribe() 不是*外层直接调用的方法吗,它竟然也能被指定线程?)。然而 Scheduler 的原理需要放在后面讲,因为它的原理是以下一节《变换》的原理作为基础的。
好吧这一节其实我屁也没说,只是为了让你安心,让你知道我不是忘了讲原理,而是把它放在了更合适的地方。

变换
RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的*大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 API。
1) API
首先看一个 map() 的例子:

[java] view plain copy
  1. Observable.just(“images/logo.png”) // 输入类型 String  
  2.     .map(new Func1<String, Bitmap>() {  
  3.         @Override  
  4.         public Bitmap call(String filePath) { // 参数类型 String  
  5.             return getBitmapFromPath(filePath); // 返回类型 Bitmap  
  6.         }
  7.     })
  8.     .subscribe(new Action1<Bitmap>() {  
  9.         @Override  
  10.         public void call(Bitmap bitmap) { // 参数类型 Bitmap  
  11.             showBitmap(bitmap);
  12.         }
  13.     });

这里出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。这种直接变换对象并返回的,是*常见的也*容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。我列举几个常用的变换:
map(): 事件对象的直接变换,具体功能上面已经介绍过。它是 RxJava *常用的变换。 map() 的示意图:
flatMap(): 这是一个很有用但非常难理解的变换,因此我决定花多些篇幅来介绍它。 首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:

[java] view plain copy
  1. Student[] students = …;
  2. Subscriber<String> subscriber = new Subscriber<String>() {  
  3.     @Override  
  4.     public void onNext(String name) {  
  5.         Log.d(tag, name);
  6.     }
  7.     …
  8. };
  9. Observable.from(students)
  10.     .map(new Func1<Student, String>() {  
  11.         @Override  
  12.         public String call(Student student) {  
  13.             return student.getName();  
  14.         }
  15.     })
  16.     .subscribe(subscriber);

很简单。那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:

[java] view plain copy
  1. Student[] students = …;
  2. Subscriber<Student> subscriber = new Subscriber<Student>() {  
  3.     @Override  
  4.     public void onNext(Student student) {  
  5.         List<Course> courses = student.getCourses();
  6.         for (int i = 0; i < courses.size(); i++) {  
  7.             Course course = courses.get(i);
  8.             Log.d(tag, course.getName());
  9.         }
  10.     }
  11.     …
  12. };
  13. Observable.from(students)
  14.     .subscribe(subscriber);

依然很简单。那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢?
这个时候,就需要用 flatMap() 了:

[java] view plain copy
  1. Student[] students = …;
  2. Subscriber<Course> subscriber = new Subscriber<Course>() {  
  3.     @Override  
  4.     public void onNext(Course course) {  
  5.         Log.d(tag, course.getName());
  6.     }
  7.     …
  8. };
  9. Observable.from(students)
  10.     .flatMap(new Func1<Student, Observable<Course>>() {  
  11.         @Override  
  12.         public Observable<Course> call(Student student) {  
  13.             return Observable.from(student.getCourses());  
  14.         }
  15.     })
  16.     .subscribe(subscriber);

从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

扩展:由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):

[java] view plain copy
  1. networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token  
  2.     .flatMap(new Func1<String, Observable<Messages>>() {  
  3.         @Override  
  4.         public Observable<Messages> call(String token) {  
  5.             // 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表  
  6.             return networkClient.messages();  
  7.         }
  8.     })
  9.     .subscribe(new Action1<Messages>() {  
  10.         @Override  
  11.         public void call(Messages messages) {  
  12.             // 处理显示消息列表  
  13.             showMessages(messages);
  14.         }
  15.     });

传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。
throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器: RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms .subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。
此外, RxJava 还提供很多便捷的方法来实现事件序列的变换,这里就不一一举例了。

2) 变换的原理:lift()
这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。

[java] view plain copy
  1. public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {  
  2.     return Observable.create(new OnSubscribe<R>() {  
  3.         @Override  
  4.         public void call(Subscriber subscriber) {  
  5.             Subscriber newSubscriber = operator.call(subscriber);
  6.             newSubscriber.onStart();
  7.             onSubscribe.call(newSubscriber);
  8.         }
  9.     });
  10. }

这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——
subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。
当含有 lift() 时:
1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;
2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;
3.当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;
4.而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。
这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。
精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子,仅供参考:

[java] view plain copy
  1. observable.lift(new Observable.Operator<String, Integer>() {  
  2.     @Override  
  3.     public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {  
  4.         // 将事件序列中的 Integer 对象转换为 String 对象  
  5.         return new Subscriber<Integer>() {  
  6.             @Override  
  7.             public void onNext(Integer integer) {  
  8.                 subscriber.onNext(“” + integer);  
  9.             }
  10.             @Override  
  11.             public void onCompleted() {  
  12.                 subscriber.onCompleted();
  13.             }
  14.             @Override  
  15.             public void onError(Throwable e) {  
  16.                 subscriber.onError(e);
  17.             }
  18.         };
  19.     }
  20. });

3) compose: 对 Observable 整体的变换
除了 lift() 之外, Observable 还有一个变换方法叫做 compose(Transformer)。它和 lift() 的区别在于, lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。你可以这么写:

[java] view plain copy
  1. observable1
  2.     .lift1()
  3.     .lift2()
  4.     .lift3()
  5.     .lift4()
  6.     .subscribe(subscriber1);
  7. observable2
  8.     .lift1()
  9.     .lift2()
  10.     .lift3()
  11.     .lift4()
  12.     .subscribe(subscriber2);
  13. observable3
  14.     .lift1()
  15.     .lift2()
  16.     .lift3()
  17.     .lift4()
  18.     .subscribe(subscriber3);
  19. observable4
  20.     .lift1()
  21.     .lift2()
  22.     .lift3()
  23.     .lift4()
  24.     .subscribe(subscriber1);

你觉得这样太不软件工程了,于是你改成了这样:

[java] view plain copy
  1. private Observable liftAll(Observable observable) {  
  2.     return observable  
  3.         .lift1()
  4.         .lift2()
  5.         .lift3()
  6.         .lift4();
  7. }
  8. liftAll(observable1).subscribe(subscriber1);
  9. liftAll(observable2).subscribe(subscriber2);
  10. liftAll(observable3).subscribe(subscriber3);
  11. liftAll(observable4).subscribe(subscriber4);

可读性、可维护性都提高了。可是 Observable 被一个方法包起来,这种方式对于 Observale 的灵活性似乎还是增添了那么点限制。怎么办?这个时候,就应该用 compose() 来解决了:

[java] view plain copy
  1. public class LiftAllTransformer implements Observable.Transformer<Integer, String> {  
  2.     @Override  
  3.     public Observable<String> call(Observable<Integer> observable) {  
  4.         return observable  
  5.             .lift1()
  6.             .lift2()
  7.             .lift3()
  8.             .lift4();
  9.     }
  10. }
  11. Transformer liftAll = new LiftAllTransformer();  
  12. observable1.compose(liftAll).subscribe(subscriber1);
  13. observable2.compose(liftAll).subscribe(subscriber2);
  14. observable3.compose(liftAll).subscribe(subscriber3);
  15. observable4.compose(liftAll).subscribe(subscriber4);

像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。
compose() 的原理比较简单,不附图喽。

线程控制:Scheduler (二)
除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。
1) Scheduler 的 API (二)
前面讲到了,可以利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap() 等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?
答案是:能。因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。上代码:

[java] view plain copy
  1. Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定  
  2.     .subscribeOn(Schedulers.io())
  3.     .observeOn(Schedulers.newThread())
  4.     .map(mapOperator) // 新线程,由 observeOn() 指定  
  5.     .observeOn(Schedulers.io())
  6.     .map(mapOperator2) // IO 线程,由 observeOn() 指定  
  7.     .observeOn(AndroidSchedulers.mainThread)
  8.     .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定  

如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。
不过,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。
又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn() 呢?会有什么效果?
这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。
2) Scheduler 的原理(二)
其实, subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程):
从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 “schedule…” 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。
3) 延伸:doOnSubscribe()
然而,虽然超过一个的 subscribeOn() 对事件处理的流程没有影响,但在流程之前却是可以利用的。
在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。
而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它*近的 subscribeOn() 所指定的线程。
示例代码:

[java] view plain copy
  1. Observable.create(onSubscribe)
  2.     .subscribeOn(Schedulers.io())
  3.     .doOnSubscribe(new Action0() {  
  4.         @Override  
  5.         public void call() {  
  6.             progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行  
  7.         }
  8.     })
  9.     .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程  
  10.     .observeOn(AndroidSchedulers.mainThread())
  11.     .subscribe(subscriber);

如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了

RxJava系列(组合操作符)

这一章我们接着介绍组合操作符,这类operators可以同时处理多个Observable来创建我们所需要的Observable。组合操作符主要包含: Merge StartWith Concat Zip CombineLatest SwitchOnNext Join等等。

Merge

merge(Observable, Observable)将两个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。你可以简单的将它理解为两个Obsrvable合并成了一个Observable,合并后的数据是无序的。

%title插图%num
merge(Observable, Observable)

我们看下面的例子,一共有两个Observable:一个用来发送字母,另一个用来发送数字;现在我们需要两连个Observable发射的数据合并。

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }
        }).take(letters.length);

Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(5);

Observable.merge(letterSequence, numberSequence)
        .subscribe(new Observer<Serializable>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());
            }

            @Override
            public void onNext(Serializable serializable) {
                System.out.print(serializable.toString()+" ");
            }
        });

程序输出:

A 0 B C 1 D E 2 F 3 G H 4 

merge(Observable[])将多个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。

%title插图%num
merge(Observable[])

StartWith

startWith(T)用于在源Observable发射的数据前插入数据。使用startWith(Iterable<T>)我们还可以在源Observable发射的数据前插入Iterable。官方示意图:

%title插图%num
startWith(T) startWith(Iterable<T>)

startWith(Observable<T>)用于在源Observable发射的数据前插入另一个Observable发射的数据(这些数据会被插入到
源Observable发射数据的前面)。官方示意图:

%title插图%num
startWith(Observable<T>)

Concat

concat(Observable<? extends T>, Observable<? extends T>) concat(Observable<? extends Observable<T>>)用于将多个obserbavle发射的的数据进行合并发射,concat严格按照顺序发射数据,前一个Observable没发射玩是不会发射后一个Observable的数据的。它和merge、startWitch和相似,不同之处在于:

  1. merge:合并后发射的数据是无序的;
  2. startWitch:只能在源Observable发射的数据前插入数据。
    %title插图%num
    concat(Observable<? extends T>, Observable<? extends T>)、concat(Observable<? extends Observable<T>>)

这里我们将前面Merge操作符的例子拿过来,并将操作符换成Concat,然后我们看看执行结果:

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }
        }).take(letters.length);

Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(5);

Observable.concat(letterSequence, numberSequence)
        .subscribe(new Observer<Serializable>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());
            }

            @Override
            public void onNext(Serializable serializable) {
                System.out.print(serializable.toString() + " ");
            }
        });

程序输出:

A B C D E F G H 0 1 2 3 4 

Zip

zip(Observable, Observable, Func2)用来合并两个Observable发射的数据项,根据Func2函数生成一个新的值并发射出去。当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停在发射数据。

%title插图%num
zip(Observable, Observable, Func2)

和前面的例子一样,我们将操作符换成了zip:

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(120, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }
        }).take(letters.length);

Observable<Long> numberSequence = Observable.interval(200, TimeUnit.MILLISECONDS).take(5);

Observable.zip(letterSequence, numberSequence, new Func2<String, Long, String>() {
    @Override
    public String call(String letter, Long number) {
        return letter + number;
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.exit(0);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Error:" + e.getMessage());
    }

    @Override
    public void onNext(String result) {
        System.out.print(result + " ");
    }
});

程序输出:

A0 B1 C2 D3 E4

CombineLatest

comnineLatest(Observable, Observable, Func2)用于将两个Observale*近发射的数据已经Func2函数的规则进展组合。下面是官方提供的原理图:

%title插图%num
comnineLatest(Observable, Observable, Func2)

下面这张图应该更容易理解:

%title插图%num
comnineLatest(Observable, Observable, Func2)
List<String> communityNames = DataSimulator.getCommunityNames();
List<Location> locations = DataSimulator.getLocations();

Observable<String> communityNameSequence = Observable.interval(1, TimeUnit.SECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return communityNames.get(position.intValue());
            }
        }).take(communityNames.size());
Observable<Location> locationSequence = Observable.interval(1, TimeUnit.SECONDS)
        .map(new Func1<Long, Location>() {
            @Override
            public Location call(Long position) {
                return locations.get(position.intValue());
            }
        }).take(locations.size());

Observable.combineLatest(
        communityNameSequence,
        locationSequence,
        new Func2<String, Location, String>() {
            @Override
            public String call(String communityName, Location location) {
                return "小区名:" + communityName + ", 经纬度:" + location.toString();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        });

程序输出:

小区名:竹园新村, 经纬度:(21.827, 23.323)
小区名:康桥半岛, 经纬度:(21.827, 23.323)
小区名:康桥半岛, 经纬度:(11.923, 16.309)
小区名:中粮·海景壹号, 经纬度:(11.923, 16.309)
小区名:中粮·海景壹号, 经纬度:(22.273, 53.623)
小区名:浦江名苑, 经纬度:(22.273, 53.623)
小区名:南辉小区, 经纬度:(22.273, 53.623)

SwitchOnNext

switchOnNext(Observable<? extends Observable<? extends T>>用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射这多个小Observable所发射的数据。如果一个小的Observable正在发射数据的时候,源Observable又发射出一个新的小Observable,则前一个Observable发射的数据会被抛弃,直接发射新
的小Observable所发射的数据。

结合下面的原理图大家应该很容易理解,我们可以看到下图中的黄色圆圈就被丢弃了。

%title插图%num
switchOnNext(Observable<? extends Observable<? extends T>>)

Join

join(Observable, Func1, Func1, Func2)我们先介绍下join操作符的4个参数:

  • Observable:源Observable需要组合的Observable,这里我们姑且称之为目标Observable;
  • Func1:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了源Obsrvable发射出来的数据的有效期;
  • Func1:接收目标Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了目标Obsrvable发射出来的数据的有效期;
  • Func2:接收从源Observable和目标Observable发射出来的数据,并将这两个数据组合后返回。

所以Join操作符的语法结构大致是这样的:onservableA.join(observableB, 控制observableA发射数据有效期的函数, 控制observableB发射数据有效期的函数,两个observable发射数据的合并规则)

join操作符的效果类似于排列组合,把*个数据源A作为基座窗口,他根据自己的节奏不断发射数据元素,第二个数据源B,每发射一个数据,我们都把它和*个数据源A中已经发射的数据进行一对一匹配;举例来说,如果某一时刻B发射了一个数据“B”,此时A已经发射了0,1,2,3共四个数据,那么我们的合并操作就会把“B”依次与0,1,2,3配对,得到四组数据: [0, B] [1, B] [2, B] [3, B]

再看看下面的图是不是好理解了呢?!

%title插图%num
join(Observable, Func1, Func1, Func2)

读懂了上面的文字,我们再来写段代码加深理解。

final List<House> houses = DataSimulator.getHouses();//模拟的房源数据,用于测试

//用来每秒从houses总取出一套房源并发射出去
Observable<House> houseSequence =
        Observable.interval(1, TimeUnit.SECONDS)
                .map(new Func1<Long, House>() {
                    @Override
                    public House call(Long position) {
                        return houses.get(position.intValue());
                    }
                }).take(houses.size());//这里的take是为了防止houses.get(position.intValue())数组越界

//用来实现每秒发送一个新的Long型数据
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

houseSequence.join(tictoc,
        new Func1<House, Observable<Long>>() {
            @Override
            public Observable<Long> call(House house) {
                return Observable.timer(2, TimeUnit.SECONDS);
            }
        },
        new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.timer(0, TimeUnit.SECONDS);
            }
        },
        new Func2<House, Long, String>() {
            @Override
            public String call(House house, Long aLong) {
                return aLong + "-->" + house.getDesc();
            }
        }
).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.exit(0);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Error:"+e.getMessage());
    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

程序输出:

0-->中粮海景壹号新出大平层!总价4500W起
1-->中粮海景壹号新出大平层!总价4500W起
1-->满五唯一,黄金地段
2-->中粮海景壹号新出大平层!总价4500W起
2-->满五唯一,黄金地段
2-->一楼自带小花园
3-->一楼自带小花园
3-->毗邻汤臣一品
4-->毗邻汤臣一品
4-->顶级住宅,给您总统般尊贵体验
5-->顶级住宅,给您总统般尊贵体验
5-->顶层户型,两室一厅
6-->顶层户型,两室一厅
6-->南北通透,豪华五房
7-->南北通透,豪华五房

通过转换操作符、过滤操作符、组合操作符三个篇幅将RxJava主要的操作符也介绍的七七八八了。更多操作符的介绍建议大家去查阅官方文档,并自己动手实践一下。这一系列的文章也会持续更新,欢迎大家保持关注!:)

RxJava系列 从微观角度解读RxJava源码

前言

通过前面五个篇幅的介绍,相信大家对RxJava的基本使用以及操作符应该有了一定的认识。但是知其然还要知其所以然;所以从这一章开始我们聊聊源码,分析RxJava的实现原理。本文我们主要从三个方面来分析RxJava的实现:

  • RxJava基本流程分析
  • 操作符原理分析
  • 线程调度原理分析

本章节基于RxJava1.1.9版本的源码

一、RxJava执行流程分析

在RxJava系列2(基本概念及使用介绍)中我们介绍过,一个*基本的RxJava调用是这样的:

示例A

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello RxJava!");
        subscriber.onCompleted();
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("completed!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

首先调用Observable.create()创建一个被观察者Observable,同时创建一个OnSubscribe作为create()方法的入参;接着创建一个观察者Subscriber,然后通过subseribe()实现二者的订阅关系。这里涉及到三个关键对象和一个核心的方法:

  • Observable(被观察者)
  • OnSubscribe (从纯设计模式的角度来理解,OnSubscribe.call()可以看做是观察者模式中被观察者用来通知观察者的notifyObservers()方法)
  • Subscriber (观察者)
  • subscribe() (实现观察者与被观察者订阅关系的方法)

1、Observable.create()源码分析

首先我们来看看Observable.create()的实现:

public static <T> Observable<T> create(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

这里创建了一个被观察者Observable,同时将RxJavaHooks.onCreate(f)作为构造函数的参数,源码如下:

protected Observable(OnSubscribe<T> f) {
    this.onSubscribe = f;
}

我们看到源码中直接将参数RxJavaHooks.onCreate(f)赋值给了当前我们构造的被观察者Observable的成员变量onSubscribe。那么RxJavaHooks.onCreate(f)返回的又是什么呢?我们接着往下看:

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1<OnSubscribe, OnSubscribe> f = onObservableCreate;
    if (f != null) {
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

由于我们并没调用RxJavaHooks.initCreate(),所以上面代码中的onObservableCreate为null;因此RxJavaHooks.onCreate(f)*终返回的就是f,也就是我们在Observable.create()的时候new出来的OnSubscribe。(由于对RxJavaHooks的理解并不影响我们对RxJava执行流程的分析,因此在这里我们不做进一步的探讨。为了方便理解我们只需要知道RxJavaHooks一系列方法的返回值就是入参本身就OK了,例如这里的RxJavaHooks.onCreate(f)返回的就是f)。

至此我们做下逻辑梳理:Observable.create()方法构造了一个被观察者Observable对象,同时将new出来的OnSubscribe赋值给了该Observable的成员变量onSubscribe

2、Subscriber源码分析

接着我们看下观察者Subscriber的源码,为了增加可读性,我去掉了源码中的注释和部分代码。

public abstract class Subscriber<T> implements Observer<T>, Subscription {
    
    private final SubscriptionList subscriptions;//订阅事件集,所有发送给当前Subscriber的事件都会保存在这里
    
    ...

    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
        this.subscriber = subscriber;
        this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }

    ...

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }

    @Override
    public final boolean isUnsubscribed() {
        return subscriptions.isUnsubscribed();
    }

    public void onStart() {
    }
    
    ...
}
public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}

Subscriber实现了Subscription接口,从而对外提供isUnsubscribed()unsubscribe()方法。前者用于判断是否已经取消订阅;后者用于将订阅事件列表(也就是当前观察者的成员变量subscriptions)中的所有Subscription取消订阅,并且不再接受观察者Observable发送的后续事件。

3、subscribe()源码分析

前面我们分析了观察者和被观察者相关的源码,那么接下来便是整个订阅流程中**关键的环节了。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {

    ...

    subscriber.onStart();
    
    if (!(subscriber instanceof SafeSubscriber)) {
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    try {
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);

        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

subscribe()方法中将传进来的subscriber包装成了SafeSubscriberSafeSubscriber其实是subscriber的一个代理,对subscriber的一系列方法做了更加严格的安全校验。保证了onCompleted()onError()只会有一个被执行且只执行一次,一旦它们其中方法被执行过后onNext()就不在执行了。

上述代码中*关键的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)。这里的RxJavaHooks和之前提到的一样,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)返回的正是他的第二个入参observable.onSubscribe,也就是当前observable的成员变量onSubscribe。而这个成员变量我们前面提到过,它是我们在Observable.create()的时候new出来的。所以这段代码可以简化为onSubscribe.call(subscriber)。这也印证了我在RxJava系列2(基本概念及使用介绍)中说的,onSubscribe.call(subscriber)中的subscriber正是我们在subscribe()方法中new出来的观察者。

到这里,我们对RxJava的执行流程做个总结:首先我们调用crate()创建一个观察者,同时创建一个OnSubscribe作为该方法的入参;接着调用subscribe()来订阅我们自己创建的观察者Subscriber
一旦调用subscribe()方法后就会触发执行OnSubscribe.call()。然后我们就可以在call方法调用观察者subscriberonNext(),onCompleted(),onError()

*后我用张图来总结下之前的分析结果:

%title插图%num
RxJava基本流程分析

二、操作符原理分析

之前我们介绍过几十个操作符,要一一分析它们的源码显然不太现实。在这里我抛砖引玉,选取一个相对简单且常用的map操作符来分析。

我们先来看一个map操作符的简单应用:

示例B

Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
}).map(new Func1<Integer, String>() {
    @Override
    public String call(Integer integer) {
        return "This is " + integer;
    }
}).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted!");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.getMessage());
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

为了便于表述,我将上面的代码做了如下拆解:

Observable<Integer> observableA = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }
});

Subscriber<String> subscriberOne = new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted!");
    }
    @Override
    public void onError(Throwable e) {
        System.out.println(e.getMessage());
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
};

Observable<String> observableB = 
        observableA.map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return "This is " + integer;;
                }
            });

observableB.subscribe(subscriberOne);

map()的源码和上一小节介绍的create()一样位于Observable这个类中。

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return create(new OnSubscribeMap<T, R>(this, func));
}

通过查看源码我们发现调用map()的时候实际上是创建了一个新的被观察者Observable,我们姑且称它为ObservableB;一开始通过Observable.create()创建的Observable我们称之为ObservableA。在创建ObservableB的时候同时创建了一个OnSubscribeMap,而ObservableA和变换函数Func1则作为构造OnSubscribeMap的参数。

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;//ObservableA
    
    final Func1<? super T, ? extends R> transformer;//map操作符中的转换函数Func1。T为转换前的数据类型,在上面的例子中为Integer;R为转换后的数据类型,在该例中为String。

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }
    
    @Override
    public void call(final Subscriber<? super R> o) {//结合*小节的分析结果,我们知道这里的入参o其实就是我们自己new的观察者subscriberOne。
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }
    
    static final class MapSubscriber<T, R> extends Subscriber<T> {
        
        final Subscriber<? super R> actual;//这里的actual就是我们在调用subscribe()时创建的观察者mSubscriber
        final Func1<? super T, ? extends R> mapper;//变换函数
        boolean done;
        
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
        
        @Override
        public void onNext(T t) {
            R result;
            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
            actual.onNext(result);
        }
        
        @Override
        public void onError(Throwable e) {
            ...
            actual.onError(e);
        }
        
        @Override
        public void onCompleted() {
            ...
            actual.onCompleted();
        }
        
        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }
}

OnSubscribeMap实现了OnSubscribe接口,因此OnSubscribeMap就是一个OnSubscribe。在调用map()的时候创建了一个新的被观察者ObservableB,然后我们用ObservableB.subscribe(subscriberOne)订阅了观察者subscriberOne。结合我们在*小节的分析结果,所以OnSubscribeMap.call(o)中的o就是subscribe(subscriberOne)中的subscriberOne;一旦调用了ObservableB.subscribe(subscriberOne)就会执行OnSubscribeMap.call()

call()方法中,首先通过我们的观察者o和转换函数transformer构造了一个MapSubscriber,*后调用了source也就是observableAunsafeSubscribe()方法。即observableA订阅了一个观察者MapSubscriber

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    try {
        ...
        RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}

上面这段代码*终执行了onSubscribe也就是OnSubscribeMapcall()方法,call()方法中的参数就是之前在OnSubscribeMap.call()中new出来的MapSubscriber。*后在call()方法中执行了我们自己的业务代码:

subscriber.onNext(1);
subscriber.onCompleted();

其实也就是执行了MapSubscriberonNext()onCompleted()

@Override
public void onNext(T t) {
    R result;
    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        ...
        return;
    }
    actual.onNext(result);
}

onNext(T t)方法中的的mapper就是变换函数,actual就是我们在调用subscribe()时创建的观察者subscriberOne。这个T就是我们例子中的IntegerR就是String。在onNext()中首先调用变换函数mapper.call()T转换成R(在我们的例子中就是将Integer类型的1转换成了String类型的“This is 1”);接着调用subscriberOne.onNext(String result)。同样在调用MapSubscriber.onCompleted()时会执行subscriberOne.onCompleted()。这样就完成了一直完成的调用流程。

我承认太啰嗦了,花费了这么大的篇幅才将map()的转换原理解释清楚。我也是希望尽量的将每个细节都呈现出来方便大家理解,如果看我啰嗦了这么久还是没能理解,请看下面我画的这张执行流程图。

%title插图%num
加入Map操作符后的执行流程

三、线程调度原理分析

在前面的文章中我介绍过RxJava可以很方便的通过subscribeOn()observeOn()来指定数据流的每一部分运行在哪个线程。其中subscribeOn()指定了处理Observable的全部的过程(包括发射数据和通知)的线程;observeOn()指定了观察者的onNext()onError()onCompleted()执行的线程。接下来我们就分析分析源码,看看线程调度是如何实现的。

在分析源码前我们先看看一段常见的通过RxJava实现的线程调度代码:

示例C

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello RxJava!");
        subscriber.onCompleted();
    }
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {
        System.out.println("completed!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

1、subscribeOn()源码分析

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ...
    return create(new OperatorSubscribeOn<T>(this, scheduler));
}

通过上面的代码我们可以看到,subscribeOn()map()一样是创建了一个新的被观察者Observable。因此我大致就能猜到subscribeOn()的执行流程应该和map()差不多,OperatorSubscribeOn肯定也是一个OnSubscribe。那我们接下来就看看OperatorSubscribeOn的源码:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;//线程调度器,用来指定订阅事件发送、处理等所在的线程
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    
                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                source.unsafeSubscribe(s);
            }
        });
    }
}

OperatorSubscribeOn实现了OnSubscribe接口,call()中对Subscriber的处理也和OperatorMapSubscriber的处理类似。首先通过scheduler构建了一个Worker;然后用传进来的subscriber构造了一个新的Subscriber s,并将s丢到Worker.schedule()中来处理;*后用原Observable去订阅观察者s。而这个Worker就是线程调度的关键!前面的例子中我们通过subscribeOn(Schedulers.io())指定了Observable发射处理事件以及通知观察者的一系列操作的执行线程,正是通过这个Schedulers.io()创建了我们前面提到的Worker。所以我们来看看Schedulers.io()的实现。

首先通过Schedulers.io()获得了ioScheduler并返回,上面的OperatorSubscribeOn通过这个的SchedulercreateWorker()方法创建了我们前面提到的Worker

public static Scheduler io() {
    return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}

接着我们看看这个ioScheduler是怎么来的,下面的代码向我们展现了是如何在Schedulers的构造函数中通过RxJavaSchedulersHook.createIoScheduler()来初始化ioScheduler的。

private Schedulers() {

    ...

    Scheduler io = hook.getIOScheduler();
    if (io != null) {
        ioScheduler = io;
    } else {
        ioScheduler = RxJavaSchedulersHook.createIoScheduler();
    }

    ...
}

*终RxJavaSchedulersHook.createIoScheduler()返回了一个CachedThreadScheduler,并赋值给了ioScheduler

public static Scheduler createIoScheduler() {
    return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
    ...
    return new CachedThreadScheduler(threadFactory);
}

到这一步既然我们知道了ioScheduler就是一个CachedThreadScheduler,那我们就来看看它的createWorker()的实现。

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

上面的代码向我们赤裸裸的呈现了前面OperatorSubscribeOn中的Worker其实就是EventLoopWorker。我们重点要关注的是他的scheduleActual()

static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
    private final CompositeSubscription innerSubscription = new CompositeSubscription();
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;
    final AtomicBoolean once;

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.once = new AtomicBoolean();
        this.threadWorker = pool.get();
    }

    ...

    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        ...
        ScheduledAction s = threadWorker.scheduleActual(new Action0() {
            @Override
            public void call() {
                if (isUnsubscribed()) {
                    return;
                }
                action.call();
            }
        }, delayTime, unit);
        innerSubscription.add(s);
        s.addParent(innerSubscription);
        return s;
    }
}

通过对源码的一步步追踪,我们知道了前面OperatorSubscribeOn.call()中的inner.schedule()*终会执行到ThreadWorkerscheduleActual()方法。

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
    Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
    ScheduledAction run = new ScheduledAction(decoratedAction);
    Future<?> f;
    if (delayTime <= 0) {
        f = executor.submit(run);
    } else {
        f = executor.schedule(run, delayTime, unit);
    }
    run.add(f);
    return run;
}

scheduleActual()中的ScheduledAction实现了Runnable接口,通过线程池executor*终实现了线程切换。上面便是subscribeOn(Schedulers.io())实现线程切换的全部过程。

2、observeOn()源码分析

observeOn()切换线程是通过lift来实现的,相比subscribeOn()在实现原理上相对复杂些。不过本质上*终还是创建了一个新的Observable

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ...
    return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

OperatorObserveOn作为OnSubscribeLift构造函数的参数用来创建了一个新的OnSubscribeLift对象,接下来我们看看OnSubscribeLift的实现:

public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
    
    final OnSubscribe<T> parent;

    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                st.onStart();
                parent.call(st);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                st.onError(e);
            }
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            o.onError(e);
        }
    }
}

OnSubscribeLift继承自OnSubscribe,通过前面的分析我们知道一旦调用了subscribe()将观察者与被观察绑定后就会触发被观察者所对应的OnSubscribecall()方法,所以这里会触发OnSubscribeLift.call()。在call()中调用了OperatorObserveOn.call()并返回了一个新的观察者Subscriber st,接着调用了前一级Observable对应OnSubscriber.call(st)

我们再看看OperatorObserveOn.call()的实现:

public Subscriber<? super T> call(Subscriber<? super T> child) {
    ...
    ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
    parent.init();
    return parent;
}

OperatorObserveOn.call()中创建了一个ObserveOnSubscriber并调用init()进行了初始化。

static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {

    ...

    @Override
    public void onNext(final T t) {
        ...
        schedule();
    }

    @Override
    public void onCompleted() {
        ...
        schedule();
    }

    @Override
    public void onError(final Throwable e) {
        ...
        schedule();
    }

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            recursiveScheduler.schedule(this);
        }
    }

    @Override
    public void call() {
        long missed = 1L;
        long currentEmission = emitted;

        final Queue<Object> q = this.queue;
        final Subscriber<? super T> localChild = this.child;
        final NotificationLite<T> localOn = this.on;
        
        for (;;) {
            long requestAmount = requested.get();
            
            while (requestAmount != currentEmission) {
                boolean done = finished;
                Object v = q.poll();
                boolean empty = v == null;
                
                if (checkTerminated(done, empty, localChild, q)) {
                    return;
                }
                
                if (empty) {
                    break;
                }
                
                localChild.onNext(localOn.getValue(v));

                currentEmission++;
                if (currentEmission == limit) {
                    requestAmount = BackpressureUtils.produced(requested, currentEmission);
                    request(currentEmission);
                    currentEmission = 0L;
                }
            }
            
            if (requestAmount == currentEmission) {
                if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                    return;
                }
            }

            emitted = currentEmission;
            missed = counter.addAndGet(-missed);
            if (missed == 0L) {
                break;
            }
        }
    }
    
    ...
}

ObserveOnSubscriber继承自Subscriber,并实现了Action0接口。我们看到ObserveOnSubscriberonNext()onCompleted()onError()都有个schedule(),这个方法就是我们线程调度的关键;通过schedule()将新观察者ObserveOnSubscriber发送给subscriberOne的所有事件都切换到了recursiveScheduler所对应的线程,简单的说就是把subscriberOneonNext()onCompleted()onError()方法丢到了recursiveScheduler对应的线程中来执行。

那么schedule()又是如何做到这一点的呢?他内部调用了recursiveScheduler.schedule(this)recursiveScheduler其实就是一个Worker,和我们在介绍subscribeOn()时提到的worker一样,执行schedule()实际上*终是创建了一个runable,然后把这个runnable丢到了特定的线程池中去执行。在runnablerun()方法中调用了ObserveOnSubscriber.call(),看上面的代码大家就会发现在call()方法中*终调用了subscriberOneonNext()onCompleted()onError()方法。这便是它实现线程切换的原理。

好了,我们*后再看看示例C对应的执行流程图,帮助大家加深理解。

%title插图%num
RxJava执行流程

总结

这一章以执行流程操作符实现以及线程调度三个方面为切入点剖析了RxJava源码。下一章将站在更宏观的角度来分析整个RxJava的框架结构、设计思想等等。敬请期待~~ 🙂

RxJava系列 *佳实践

前言

有点标题党了,其实谈不上什么*佳实践。前段时间公司实行996,所以也没什么时间和精力来更新博客(好吧我承认是我懒)。因此这篇文章只是简单的通过两个例子介绍了RxJava在生产环境中的使用。不过本篇中的每个例子我都配上了完整的代码。

按照计划这一期是要介绍RxJava框架结构和设计思想的,但是考虑到Netflix将在十月底发布RxJava2.0正式版;因此决定将RxJava框架结构和设计思想分析放到2.0正式版发布后再做。后续我也会有一系列的文章来介绍RxJava1.x和2.x的区别。

示例一、获取手机上已安装的App

*个例子我们需要在Android设备上展示已安装的第三方app列表,关于环境搭建、依赖配置、RecyclerView的使用等这些基础内容我就不做陈述了。需要了解的同学可以去GitHub上把项目clone下来看看。这里我主要讲讲如何通过RxJava实现核心功能。

首选我们需要调用系统api来获取所有已安装的app,所以在OnSubscribecall方法中调用getApplicationInfoList()。但是getApplicationInfoList()获取的数据并不能完全满足我们的业务需求:

  1. 由于我们只需要展示手机上已安装的第三方App,因此需要通过filter操作符来过滤掉系统app;
  2. ApplicationInfo并不是我们所需要的类型,因此需要通过map操作符将其转换为AppInfo
  3. 由于获取ApplicationInfo、过滤数据、转换数据相对比较耗时,因此需要通过subscribeOn操作符将这一系列操作放到子线程中来处理;
  4. 而要将信息展示在页面上涉及到UI操作,因此需要通过observeOn操作符将onNextonCompletedonError调度到主线程,接着我们在这些方法中更新UI。

下面是核心代码:

final PackageManager pm = MainActivity.this.getPackageManager();
Observable.create(new Observable.OnSubscribe<ApplicationInfo>() {
        @Override
        public void call(Subscriber<? super ApplicationInfo> subscriber) {
            List<ApplicationInfo> infoList = getApplicationInfoList(pm);
            for (ApplicationInfo info : infoList) {
                subscriber.onNext(info);
            }
            subscriber.onCompleted();
        }
    }).filter(new Func1<ApplicationInfo, Boolean>() {
        @Override
        public Boolean call(ApplicationInfo applicationInfo) {
            return (applicationInfo.flags & ApplicationInfo.FLAG_SYSTEM) <= 0;
        }
    }).map(new Func1<ApplicationInfo, AppInfo>() {

        @Override
        public AppInfo call(ApplicationInfo applicationInfo) {
            AppInfo info = new AppInfo();
            info.setAppIcon(applicationInfo.loadIcon(pm));
            info.setAppName(applicationInfo.loadLabel(pm).toString());
            return info;
        }
    }).subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<AppInfo>() {
        @Override
        public void onCompleted() {
            mAppListAdapter.notifyDataSetChanged();
            mPullDownSRL.setRefreshing(false);
        }

        @Override
        public void onError(Throwable e) {
            mPullDownSRL.setRefreshing(false);
        }

        @Override
        public void onNext(AppInfo appInfo) {
            mAppInfoList.add(appInfo);
        }
    });

程序执行效果图:

%title插图%num
效果图

完整的代码我放到了GitHub上,有兴趣大家可以去clone下来自己运行看看。

源码地址:https://github.com/BaronZ88/HelloRxAndroid

示例二、RxJava+Retrofit2实现获取天气数据

RxJava + Retrofit2几乎是Android应用开发的标配了,这个例子中我们就来聊聊这二者是如何配合起来帮助我们快速开发的。

Retrofit2中一个标准的接口定义是这样的:

@GET("weather")
Observable<Weather> getWeather(@Query("cityId") String cityId);

现在有了RxJava,一个基本的网络请求我们便可以这样实现:

ApiClient.weatherService.getWeather(cityId)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Weather>() {
                    @Override
                    public void call(Weather weather) {
                        weatherView.displayWeatherInformation(weather);
                    }
                });

但有时候可能一开始我们并不知道cityId,我们只知道cityName。所以就需要我们先访问服务器,拿到对应城市名的cityId,然后通过这个cityId再去获取天气数据。

同样的,我们需要定义一个获取cityId的接口:

@GET("city")
Observable<String> getCityIdByName(@Query("cityName") String cityName);

紧接着我们便可以使用无所不能的RxJava来实现需求了。

ApiClient.weatherService.getCityIdByName("上海")
             .flatMap(new Func1<String, Observable<Weather>>() {
                 @Override
                 public Observable<Weather> call(String cityId) {
                     return ApiClient.weatherService.getWeather(cityId);
                 }
             }).subscribeOn(Schedulers.io())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Action1<Weather>() {
                 @Override
                 public void call(Weather weather) {
                     weatherView.displayWeatherInformation(weather);
                 }
             });

哇哦!~ so easy!!!妈妈再也不用担心….

 

WeatherStyle这个项目还在开发中,这个项目不只包含了RxJava和Retrofit的使用,同时还包含MVP、ORMLite、RetroLambda、ButterKnife等等开源库的使用

RxJava1.X的系列文章就到此结束了,由于本人对RxJava的理解有限,这一系列文章中如有错误还请大家指正。在使用RxJava过程中有任何疑问也欢迎大家和我交流。共同学习!共同进步!

 

友情链接: SITEMAP | 旋风加速器官网 | 旋风软件中心 | textarea | 黑洞加速器 | jiaohess | 老王加速器 | 烧饼哥加速器 | 小蓝鸟 | tiktok加速器 | 旋风加速度器 | 旋风加速 | quickq加速器 | 飞驰加速器 | 飞鸟加速器 | 狗急加速器 | hammer加速器 | trafficace | 原子加速器 | 葫芦加速器 | 麦旋风 | 油管加速器 | anycastly | INS加速器 | INS加速器免费版 | 免费vqn加速外网 | 旋风加速器 | 快橙加速器 | 啊哈加速器 | 迷雾通 | 优途加速器 | 海外播 | 坚果加速器 | 海外vqn加速 | 蘑菇加速器 | 毛豆加速器 | 接码平台 | 接码S | 西柚加速器 | 快柠檬加速器 | 黑洞加速 | falemon | 快橙加速器 | anycast加速器 | ibaidu | moneytreeblog | 坚果加速器 | 派币加速器 | 飞鸟加速器 | 毛豆APP | PIKPAK | 安卓vqn免费 | 一元机场加速器 | 一元机场 | 老王加速器 | 黑洞加速器 | 白石山 | 小牛加速器 | 黑洞加速 | 迷雾通官网 | 迷雾通 | 迷雾通加速器 | 十大免费加速神器 | 猎豹加速器 | 蚂蚁加速器 | 坚果加速器 | 黑洞加速 | 银河加速器 | 猎豹加速器 | 海鸥加速器 | 芒果加速器 | 小牛加速器 | 极光加速器 | 黑洞加速 | movabletype中文网 | 猎豹加速器官网 | 烧饼哥加速器官网 | 旋风加速器度器 | 哔咔漫画 | PicACG | 雷霆加速