RxJava应用场景:使用zip操作符等待多个网络请求完成

假设这样一种场景,我们利用github api开发一个app,在user界面,我既要请求user基本信息,又要列举user下的event数据,为此,我准备使用Retrofit来做网络请求,首先写好interfaces

public interface GitHubUser {
  @GET("users/{user}")
  Observable<JsonObject> getUser(@Path("user") String user);
}
public interface GitHubEvents {
  @GET("users/{user}/events")
  Observable<JsonArray> listEvents(@Path("user") String user);
}

然后定义好我们的两个Observable:

Retrofit repo = new Retrofit.Builder()
        .baseUrl("https://api.github.com")
        .addConverterFactory(GsonConverterFactory.create())
        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
        .build();
Observable<JsonObject> userObservable = repo
        .create(GitHubUser.class)
        .getUser(loginName)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread());
Observable<JsonArray> eventsObservable = repo
        .create(GitHubEvents.class)
        .listEvents(loginName)
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread());

分别是userObservable和eventsObservable,很显然的,我们将会需要两次请求。两次就两次嘛,但是这里有个问题。

虽然在后台有两次请求,但是在前台,我们希望用户打开这个页面,然后等待加载,然后显示。用户只有一次等待加载的过程。所以说,我们需要等待这两个请求都返回结果了,再开始显示数据。

怎么办?自己写判断两个都加载已完成的代码吗?逻辑好像也不是很复杂,但是代码看起来就没有那么高大上了啊。

其实既然你都用过了还有,那么直觉上你应该意识到也许RxJava可以解决这个问题。没错,就是RxJava,使用zip操作符。

zip( ):使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果

第一次知道这个操作符是在大头鬼翻译的这篇文章中:深入浅出RxJava四-在Android中使用响应式编程 。

“Retrofit对Observable的支持使得它可以很简单的将多个REST请求结合起来。比如我们有一个请求是获取照片的,还有一个请求是获取元数据的,我们就可以将这两个请求并发的发出,并且等待两个结果都返回之后再做处理:

Observable.zip(
    service.getUserPhoto(id),
    service.getPhotoMetadata(id),
    (photo, metadata) -> createPhotoWithData(photo, metadata))
    .subscribe(photoWithData -> showPhoto(photoWithData));

文中只是简短的提到了一下,看了你也不知道该如何用。这篇文章就来进一步解释一下。

zip操作符其实就是通过Observable.zip()方法把多个Observable组合成新的Observable,这个新的Observable对应的数据流由call方法决定:

Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() {
  @Override
  public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) {
    return new UserAndEvents(jsonObject, jsonElements);
  }
});

这里的UserAndEvents是我们自己定义的,目的是把原来的两个结果的信息柔和在一个对象中,毕竟新的Observable也只能返回一次。UserAndEvents定义如下:

public class UserAndEvents {
  public UserAndEvents(JsonObject user, JsonArray events) {
    this.events = events;
    this.user = user;
  }
  public JsonArray events;
  public JsonObject user;
}

最后我们调用新组合的combined Observable的subscribe方法:

combined.subscribe(new Subscriber<UserAndEvents>() {
          ...
          @Override
          public void onNext(UserAndEvents o) {
            // You can access the results of the 
            // two observabes via the POJO now
          }
        });

虽然我们这篇文章是以Retrofit为例子的,但是它适用于任何的Observable。还有就是本文只涉及到了两个请求,但是也可以是三个或者更多请求,自己触类旁通吧。

注:本文的代码和知识点都是来自于这篇文章:https://blog.denevell.org/android-rxjava-wait-for-network-calls-finish.html

来自:RxJava