响应式编程(观察者模式)
ps: 如果不了解Lambda的话,最好先看下Lambda, 文中都是使用Lambda语法
I. 核心
被观察者: Observables (发出一系列事件)
观察者: Subscribers (处理这些事件)
Observable和Subscriber可以做任何事情
Observable和Subscriber是独立于中间的变换过程的。
II. 基本原型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| Observable<String> myObservable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> sub) { sub.onNext("Hello, world!"); sub.onCompleted(); } } );
Subscriber<String> mySubscriber = new Subscriber<String>() { @Override public void onNext(String s) { System.out.println(s); }
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { } };
myObservable.subscribe(mySubscriber);
|
III. 通用接口
1 2 3 4
| myObservable.subscribe(onNextAction, onErrorAction, onCompleteAction);
myObservable.subscribe(onNextAction);
|
上面的代码最终可以变成这样
1 2 3
| Observable.just("Hello, world!") .subscribe(s -> System.out.println(s));
|
IV. 一些典型的操作符(Operators)
操作符用于在Observable和最终的Subscriber之间修改Observable发出的时间(RxJava提供了很多有用的操作符)
1 2 3 4
| query(String) : Observable<List<String>> getTitle(String) : Observable<String> saveTitle(String) : boolean
|
1. map操作符
把一个事件转换为另一个事件 ( 不必返回Observable对象返回的类型,如下面就返回了int,而Observable返回的是String )
map官方文档
1 2 3 4
| Observable.just("Hello, world!") .map(s -> s.hashCode()) .map(i -> Integer.toString(i)) .subscribe(s -> System.out.println(s));
|
2. from操作符
接收一个集合作为输入,然后每次输出一个元素给subscriber
from官方文档
1 2
| Observable.from("url1", "url2", "url3") .subscribe(url -> System.out.println(url));
|
3. flatMap操作符
接收一个Observable的输出作为输入,同时输出另外一个Observable (可以用来很好的解决多重嵌套回调的问题)
flatMap官方文档
1 2 3 4 5 6
|
query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .subscribe(title -> System.out.println(title));
|
4. filter操作符
输入与输出为相同元素,过滤掉不满足检查条件的
filter官方文档
1 2 3 4 5
| query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .filter(title -> title != null) .subscribe(title -> System.out.println(title));
|
5. take操作符
输出最多指定数量的结果
take官方文档
1 2 3 4 5 6
| query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .filter(title -> title != null) .take(5) .subscribe(title -> System.out.println(title));
|
6. doOnNext操作符
在每次输出一个元素之前做一些额外的事情
1 2 3 4 5 6 7
| query("Hello, world!") .flatMap(urls -> Observable.from(urls)) .flatMap(url -> getTitle(url)) .filter(title -> title != null) .take(5) .doOnNext(title -> saveTitle(title)) .subscribe(title -> System.out.println(title));
|
7. subscribeOn/observerOn操作符
通过subscribeOn()
指定观察者运行的线程,observerOn()
指定订阅者运行的线程
What’s the difference between SubscribeOn and ObserveOn
1 2 3 4 5
| Observable.from(someSource) .map(data -> manipulate(data)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(data -> doSomething(data));
|
这里值得一提的是:看到了这里的Schedulers.io()
来定义I/O线程十分的欣喜,之前在看Fresco
的时候其中的一个Pipeline结构,它通过按照硬件资源占用比例,分类线程池,提高了Fresco
的整体速度,由于CPU
/GPU
的速度远快于其他模块, 可以利用尽量占满CPU资源的原则,创建了多个线程池(如CPU
、I/O
、NET
)来完成。使得资源得到最大的利用以提升速度。而Schedulers.io
这种方式,也是通过架构的层面达到这种效果。
V. 取消订阅(Subscriptions)
当调用Observable.subscribe()
,会返回一个Subscription
对象。这个对象代表了被观察者和订阅者之间的联系。
1 2 3 4 5 6
| ubscription subscription = Observable.just("Hello, World!") .subscribe(s -> System.out.println(s));
subscription.unsubscribe(); System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
|
VI. RxAndroid
是RxJava的一个针对Android平台的扩展。它包含了一些能够简化Android开发的工具
地址: https://github.com/ReactiveX/RxAndroid
1. AndroidSchedulers
提供了针对Android的线程系统的调度
1 2 3 4
| retrofitService.getImage(url) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(bitmap -> myImageView.setImageBitmap(bitmap));
|
2. AndroidObservable
它提供了跟踪Android生命周期的功能。bindActivity()
和bindFragment()
方法默认在UI线程调用,并且这两个方法会在生命周期结束的时候通知Observable停止发出新的消息。
1 2 3
| AndroidObservable.bindActivity(this, retrofitService.getImage(url)) .subscribeOn(Schedulers.io()) .subscribe(bitmap -> myImageView.setImageBitmap(bitmap);
|
3. AndroidObservable.fromBroadcast
功能类似BroadcastReceiver
1 2 3 4
| IntentFilter filter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION); AndroidObservable.fromBroadcast(context, filter) .subscribe(intent -> handleConnectivityChange(intent));
|
4. ViewObservable
可以很轻易的在View触发某些Action时,被通知
1 2 3 4 5
| ViewObservable.clicks(mCardNameEditText, false) .subscribe(view -> handleClick(view));
|
VII. 常见问题解决
1. 在configuration改变(比如转屏)之后继续之前的Subscription/使用Retrofit发出了一个REST请求,接着想在listview中展示结果。如果在网络请求的时候用户旋转了屏幕怎么办?你当然想继续刚才的请求,但是怎么搞?
通过RxJava内置缓存机制解决
原理: cache()
(或者replay()
)不会使unsubscribe
打断,网络请求,因此在unsubscribe
以后直接从cache()
的返回值中创建一个新的Observable
对象。
1 2 3 4 5 6 7 8
| Observable<Photo> request = service.getUserPhoto(id).cache(); Subscription sub = request.subscribe(photo -> handleUserPhoto(photo));
sub.unsubscribe();
request.subscribe(photo -> handleUserPhoto(photo));
|
2. Observable持有Context导致的内存泄露
参考解决方案: 在生命周期的某个时刻取消订阅
原理: 利用CompositeSubscription
持有所有的Subscriptions
,然后在onDestory()
或者onDestroyView()
里取消所有的订阅。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private CompositeSubscription mCompositeSubscription = new CompositeSubscription();
private void doSomething() { mCompositeSubscription.add( AndroidObservable.bindActivity(this, Observable.just("Hello, World!")) .subscribe(s -> System.out.println(s))); }
@Override protected void onDestroy() { super.onDestroy();
mCompositeSubscription.unsubscribe(); }
|
VIII. 拓展
1. Retrofit
功能: REST的网络架构,目前有测试结果比Volley、AsyncTask快
目前Retrofit库内置了对RxJava的支持
1 2 3 4 5 6 7 8 9 10 11 12 13
| @GET("/user/{id}/photo") Observable<Photo> getUserPhoto(@Path("id") int id);
@GET("/user/{id}/photo/metadata") Observable<Photo> getPhotoMetadata(@Path("id") int id);
Observable.zip( service.getUserPhoto(id), service.getPhotoMetadata(id), (photo, metadata) -> createPhotoWithData(photo, metadata)) .subscribe(photoWithData -> showPhoto(photoWithData));
|
2. 旧代码整合RxJava
比较简单的办法
如果oldMethod
足够快:
1 2 3 4 5
| private Object oldMethod() { ... }
public Observable<Object> newMethod() { return Observable.just(oldMethod()); }
|
如果oldMethod
很慢,为了防止阻塞所在线程:
1 2 3 4 5
| private Object slowBlockingMethod() { ... }
public Observable<Object> newMethod() { return Observable.defer(() -> Observable.just(slowBlockingMethod())); }
|
简单案例
案例1
1 2 3 4
| Observable.just("Alpha","Beta","Gamma","Delta","Epsilon") .map(s -> s.length()) .distinct() .subscribe(l -> System.out.println(l));
|
输出
案例2
1 2 3 4
| Observable.just("1/5/8", "1/9/11/58/16/", "9/15/56/49/21"); .flatMap(s -> Observable.from(s.split("/"))) .map(s -> Integer.valueOf(s)) .subscribe(i -> System.out.println(i));
|
输出
1 2 3 4 5 6 7 8 9 10 11 12 13
| 1 5 8 1 9 11 58 16 9 15 56 49 21
|
案例3
提供多个Observable根据不同的数据进行网络请求,当其中有一个成功,就停止请求,如果所有请求都失败就失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| modelProvider.getItems() .flatMap(retroApiInterface::doBackendRequest) .takeUntil(response -> response.isSuccessful()) .lastOrDefault(ServerResponse.createUnsuccessful()) .toSingle() .subscribe(response -> { if (response.isSuccessful()) { } else { } }, throwable -> { })
modelProvider.getItems() .flatMap(retroApiInterface::doBackendRequest) .firstOrDefault(ServerResponse.createUnsuccessful(), response -> response.isSuccessful()) .toSingle()
|
更多了解请移步>>
参考以下文档整理:
参考以下博客的翻译校对:
大头鬼Bruce
拓展阅读: