RxAndroid(RxJava) 与 AsyncTask

整理自使用RxJava.Observable取代AsyncTask和AsyncTaskLoader

I. AsyncTask包含的问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
private class CallWebServiceTask extends AsyncTask<String, Result, Void> {
protected Result doInBackground(String... someData) {
Result result = webService.doSomething(someData);
return result;
}
protected void onPostExecute(Result result) {
if (result.isSuccess() {
resultText.setText("It worked!");
}
}
}
  1. 书写复杂
  2. 异常处理困难(try/catch?)
  3. Activity/Fragment的生命周期导致AsyncTask有不可预见的问题
  4. 无法足够简单的做复杂的异步(串行异步/并行异步 同步UI)
  5. 难以测试(如何成功测试AyncTask的帖子
  6. 异步数据无法得到良好的缓存

II. 利用RxAndroid(RxJava)解决这些问题

1. 关于书写复杂的问题:

RxJava结合lambda是一个很好的解决方案

1
2
3
4
5
6
webService.doSomething(someData)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> resultText.setText("It worked!")),
e -> handleError(e)
);

2. 关于异常处理困难的问题:

RxJava 中所有的错误都会回调到onError

1
2
3
4
5
6
7
8
9
10
11
12
Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
// 如果正确的终结,最后会调到这里
@Override
public void onCompleted() { }
// 只要有异常抛出(包括操作符中的调用),会调到这里
@Override
public void onError(Throwable e) { }
};

3. 关于Activity/Fragment生命周期导致AsyncTask难以维护的问题:

RxAndroid给出了很好的解决方案

1
2
3
4
5
AppObservable.bindFragment(this, webService.doSomething(someData))
.subscribe(
result -> resultText.setText("It worked!")),
e -> handleError(e)
);

4. 关于AsyncTask无法足够简单做复杂的异步的问题:

RxJava中的各类”操作”可以解决这个问题,这里如果有较多的线程切换,可以考虑使用compose

1
2
3
4
5
6
7
8
//这里是一个链式Web Service调用的例子,这些请求互相依赖,在线程池中运行第二批并行调用,然后在将结果返回给Observer之前,对数据进行合并和排序。
public Observable<List<CityWeather>> getWeatherForLargeUsCapitals() {
return cityDirectory.getUsCapitals()
.flatMap(cityList -> Observable.from(cityList))
.filter(city -> city.getPopulation() > 500,000)
.flatMap(city -> weatherService.getCurrentWeather(city)) //each runs in parallel
.toSortedList((cw1,cw2) -> cw1.getCityName().compare(cw2.getCityName()));
}

5. 关于AsyncTask难以测试:

RxJava通过toblocking()将一个异步方法变为同步方法来完成测试

1
2
List results = getWeatherForLargeUsCapitals().toBlocking().first();
assertEquals(12, results.size());

6. 关于AsyncTask异步数据无法得到良好的缓存

通过RxAndroid提供的方法,保存一个对Observable 的缓存的引用

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setRetainInstance(true);
weatherObservable = weatherManager.getWeatherForLargeUsCapitals().cache();
}
public void onViewCreated(...) {
super.onViewCreated(...)
bind(weatherObservable).subscribe(this);
}

如果你想要避免缓存的Fragment,可以通过使用AsyncSubject实现缓存(无论何时被订阅,AsyncSubject 都会重新发出最后的事件。或者我们可以使用BehaviorSubject获得最后的值和新值改变整个应用程序。)

1
2
3
4
5
6
//WeatherListFragment.java
public void onViewCreated() {
super.onViewCreated()
bind(weatherManager.getWeatherForLargeUsCapitals()).subscribe(this);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//WeatherManager.java
public Observable<List<CityWeather>> getWeatherForLargeUsCapitals() {
if (weatherSubject == null) {
weatherSubject = AsyncSubject.create();
cityDirectory.getUsCapitals()
.flatMap(cityList -> Observable.from(cityList))
.filter(city -> city.getPopulation() > 500,000)
.flatMap(city -> weatherService.getCurrentWeather(city))
.toSortedList((cw1,cw2) -> cw1.getCityName().compare(cw2.getCityName()))
.subscribe(weatherSubject);
}
return weatherSubject;
}
// weatherManager.invalidate(); //invalidate cache on fresh start

Jacksgong wechat
欢迎关注Jacks Blog公众号,第一时间接收原创技术沉淀干货。