Observable、 回调与线程

泡在网上的日子 / 文 发表于2016-05-10 09:35 次阅读 RxJava

英文原文:Observables, and Callbacks and Threading, Oh My! 

所以你想让你的安卓app响应式化?good!欢迎入坑!如果你是跟我一样性格的人你可能会从RxBindings 的几个库开始上手,把安卓上那些丑陋的UI回调代码替换成性感的Observable。也许你还会跟我一样,在使用了一段时间之后,你会发现RxBinding缺少了一些非常需要的callback。

那么你准备自己写吗!恭喜!但是等等,你还完全不知道如何开始做这件事呢...让我们走一遍某些代码看看如何才能在现有的回调之上创建自己的Observable,并注意一些“陷阱”。

Listener vs. RxJava

让我们从每个人都熟悉的回调开始,View.OnClickListener。

这里是原始的非Rx版本:

view.setOnClickListener(new OnClickListener() {
  @Override
  public void onClick(View v) {
    doSomething();
  }
});

而这里是使用了RxJava和RxBindings的版本:

RxView.clicks(view).subscribe(new Action1<Void>() {
  @Override
  public void call(Void aVoid) {
    doSomething();
  }
});

RxView.clicks: The What and the How

让我们浏览一遍RxView.clicks()方法看看底层做了些什么。

@CheckResult @NonNull
public static Observable<Void> clicks(@NonNull View view) {
  checkNotNull(view, "view == null");
  return Observable.create(new ViewClickOnSubscribe(view));
}

首先,检查传递过来的View是不是null。即使RxJava也不能免去我们必须预防空指针异常。

然后,我们使用create方法创建了一个新的Observable。与可以用任何一个对象创建Observable的Observable.just() 或者 Observable.from()方法不同,create方法需要接收一个实现了OnSubscribe接口的对象。

告诉我更多关于这个OnSubscribe的事情

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>>

OnSubscribe的文档简短到位,即:

在Observable.subscribe被调用的时候被触发。

看起来挺简单。要使用 Observable.create()创建一个Observable你需要一个实现了OnSubscribe接口的对象。有了这个对象,你就能写你想在Observable的subscribe被调用时执行的任何神奇功能。虽然技术上讲是这样,但并是不那么简单。

如果你查看create()的文档,你可以看到作者对于如何实现OnSubscribe接口有很好的解释。

写入传递到 {@code create} 的函数,那样它就会表现为一个Observable。注:Java不允许单独的方法存在,所以在Java里其实是拥有这个函数的接口。

意思是你创建的方法应该接收一个Subscriber<T> 并恰当的调用 Subscriber<T>的 onNext, onError, 和 onCompleted 方法。

现在我们已经有了知道了实现OnSubcribe的要求,让我们回到View.OnClickListener的例子并看看RxBindings是如何实现在ViewClickOnSubscribe类中实现它的。

ViewClickOnSubscribe: An OnSubscribe Implementation

final class ViewClickOnSubscribe implements Observable.OnSubscribe<Void> {
  final View view;

  ViewClickOnSubscribe(View view) {
    this.view = view;
  }

  @Override
  public void call(final Subscriber<? super Void> subscriber) {
    checkUiThread();

    View.OnClickListener listener = new View.OnClickListener() {
      @Override
      public void onClick(View v) {
        if (!subscriber.isUnsubscribed()) {
          subscriber.onNext(null);
        }
      }
    };

    view.setOnClickListener(listener);

    subscriber.add(new MainThreadSubscription() {
      @Override
      protected void onUnsubscribe() {
        view.setOnClickListener(null);
      }
    });
  }
}

看到熟悉的东西没?我们的老朋友View.OnClickListener!这里我们最终看到了从listener(callback)到Observable转换的过程。我们还能看到实现前面提到的OnSubscribe的具体过程。

但是这里还有其它几个事情也在进行。你可能注意到了这个类持有一个对被观察View的强引用。因此为了内存管理的需要,取消对Observable的订阅以释放引用也是很重要的。

But there’s something else going on here: there’s a good deal of talk about the main thread.

线程与Observable

Observable可以使用Scheduler实现“观察于”和“订阅于”特定的线程。有时你可以手动设置这些线程而有时它是设置好了的。关于线程和Observable,有大量比解释得我详细得多的资料。不过目前,让我们把注意力放在线程是如何在ViewClickOnSubscribe中使用的上面。

首先,让我们进入checkUiThread()方法。就如View类中声明的那样:

整个View树都是单线程的。当你调用View上的任何方法的时候都必须是在UI线程。

因此我们必须确保我们创建的Observable是在主线程被订阅。就如必须在主线程调用Observable上的subscribe()一样,在主线程调用Observable的unSubscribe()方法也同等重要。

在这个类的末尾我们可以看到ViewClickOnSubscribe通过一个MainThreadSubscription为我们处理好了这点。这个Subscription确保了unSubscribe是在主线程执行的。

Lather, Rinse, Repeat

瞧,我们现在把丑陋的回调变成了性感的Observable!在我们继续之前,让我们回顾一下从一个listener创建一个Observable的基本步骤。

  1. 创建一个实现了Observable.OnSubscribe<T>的类,其中T是将要传递到subscriber.onNext()中的对象的类型。

  2. 在call(final Subscriber<? super T> subscriber)的实现中,创建一个代转换的listener的实例。

  3. 确保在call() 方法中使用Subscriber的onNext, onError or onCompleted方法,这样你的函数就“表现得像个Observable”。

Easy peasy! What can go wrong?

就如你在ViewClickOnSubscribe类中看到的,当你在处理UI 相关的回调时,你肯定知道所有的工作都是发生在主线程的。但是如果我们不是处理UI回调呢?

在ViewClickOnSubscribe类中,当我设置内部listener监听来自View的事件时,我们需要注意后台所发生的事情。回调是从什么线程触发的?此时,它是主线程。但并不都是这么明显。

回调被触发的线程是非常重要的。它影响到OnSubscribe的实现将如何工作。在call()方法中传递的Subscriber将接收Observable在回调被触发所在线程上的事件(比如onNext(), onError(), 以及 onCompleted())。

实现了OnSubscribe接口的对象在触发回调的线程上发出事件。

那么,为什么我们应该关心呢?

我知道我说过不会讲太多线程的东西,但是要理解为什么需要关心回调是从什么线程触发的,一点点背景知识还是必要的。

subscribeOn 与 observeOn

就如Michael Parkers的 Effective-RxJava中所陈述的那样:

By default, Observable instances specify an execution policy of “immediate.”

虽然多数情况下都没有问题,但对于可能阻塞主线程的更大,更密集的操作可能就不一样了。幸运的是RxJava给了我们subscribeOn(Scheduler scheduler) 和 observeOn(Scheduler scheduler)来解决这个问题。

subscribeOn(Scheduler scheduler)应用到上游的Observable实例,它的scheduler指定上游的subscribe方法被调用所在的线程。在subscribeOn(Scheduler scheduler)之上的每个Observable 和 call 都将发生在subscribeOn()指定的线程。

反过来,observeOn(Scheduler scheduler)应用到下游的Observable实例。它的Scheduler参数指定被下游被观察的事件所在的线程。

好吧,但是严格说来,为什么我要在意呢?

使用observeOn和subscribeOn意味着你可以恰当的假定什么事情在什么线程完成。但是如果你在使用一个与这样的Observable结合的方法,你就不能得到保证了,一个拥有从别的线程上的回调触发的内部listener的Observable。因为现在你的自定义Observable在内部切换线程。

那意味着你对于subscribeOn 或者 observeOn的调用在遇到这个自定义Observable的时候讲会被忽略,因为它会在你不知道的情况下切换线程。


引用伟大的波基猪(动画)的话,“就这么多了伙计们!”。


译完的感受:感觉作者的确想阐明什么,但是都在很关键的时候嘎然而止,不知道是不是自己水平有限。最大的收获可能是了解了一下ViewClickOnSubscribe的代码。另外,有些地方的翻译有点难,如果再给我一次机会,我的选择是不翻译,不过做事要有始有终。

收藏 赞 (3) 踩 (0)
上一篇:AlarmManager实践
利用AlarmManager安排非必要数据下载
下一篇:用一张图解释RxJava中的线程控制
上周五和团队一起讨论了 RxJava 的用法和实现机制。在讨论中,@坚坚老师 问了一个有趣的问题:如果调用链中包含多个subscribeOn和observeOn,会是什么情况? 这实际上是一个至关重要的问题,因为在任何情况下,我们都应该弄清楚我们写的每一行代码到底是运行