RxJava应用-过滤一定时间内的重复输入源

文/ BlackSwift(简书作者)

原文链接:http://www.jianshu.com/p/94a1a016a461 

简单的一个RxJava操作符的笔记

需求

过滤一定时间内重复的输入源。

举个例子,许多用户同时对服务器进行投票请求,为了防止刷票,我希望实现24小时内过滤重复IP的投票。

再举例一枚,用户在APP中下拉刷新时,可能无意识进行了多次刷新,但是这样大多数情况是低效的,我只希望获得在3秒内刷新动画中的首次请求。

如果用传统方法实现,可能一堆if与全局变量嵌在代码中,非常丑陋,既然RxJava是面向事件编程的方法,为何不将其转为管道操作呢?

实现

自己写一个操作符,它实现RxJava中的接口Observable.Operator

代码主要参考了distinct与debounce。

public class DistinctWithTimeout<T, U> implements Observable.Operator<T, T> {
  //调度器,建议用主线程
  final Scheduler scheduler;
  //与Distinct中的select含义相同,用于判断两个时间是否重复
  final Func1<? super T, ? extends U> selector;
  final TimeUnit timeUnit;
  final long timeInMilliseconds;
  //被过滤时的回调
  final Action1<T> action1;
  public DistinctWithTimeout(long timeInMilliseconds, TimeUnit timeUnit,
      Func1<? super T, ? extends U> selector, Scheduler scheduler, Action1<T> duplicateAction) {
    this.timeUnit = timeUnit;
    this.selector = selector;
    this.action1 = duplicateAction;
    this.timeInMilliseconds = timeInMilliseconds;
    this.scheduler = scheduler;
  }
  @Override public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
    return new Subscriber<T>() {
      private Map<U, Long> lastMap = new WeakHashMap<>();
      @Override public void onCompleted() {
        subscriber.onCompleted();
      }
      @Override public void onError(Throwable e) {
        subscriber.onError(e);
      }
      @Override public void onNext(T t) {
        long now = scheduler.now();
        U u = selector.call(t);
        //解开装箱
        long last = lastMap.get(u) == null ? (0) : (lastMap.get(u));
        //如果没有超时
        if (now - last >= timeUnit.toMillis(timeInMilliseconds)) {
          lastMap.put(u, now);
          subscriber.onNext(t);
        } else {
            //调用超时后的Action
          action1.call(t);
        }
      }
    };
  }
}

本方法仅仅用于并发量不大的请求,比如界面编程中使用。如果是长间隔、大对象下,Map可能会内存爆炸的。在服务端真正使用的话可以用Redis替换掉上文的Map,比如它的expire属性。

举例

以过滤投票为例,首先确认了超时时间,接着确定了如何去验证请求是否重复,此步骤进行了一个同步的查询数据库请求。

RxBus.getInstance().observe()
    .lift(
    new DistinctWithTimeout<>(
        24, TimeUnit.HOURS, /*超时时间*/
        requst -> DatabeseHelper.getInstance().get(requst),/*验证重复的方法*/ 
        Schedulers.io(),/*处理此操作符的线程池*/
        requst -> showToast("在1天内不需要重复发送"))/*如果重复,返回的错误消息*/)
    .doOnNext(...);

在Android中,下面是去掉重复请求事件的例子

EventBus.<SMSRequst>getInstance().observe()
        .lift(
       new DistinctWithTimeout<>(
           10, TimeUnit.SECONDS, 
           SMSRequst::getMessage,
           AndroidSchedulers.mainThread(),
           requst -> showToast("在10秒内不需要重复发送")))

总结

  1. 函数式编程在面向事件的开发中使用起来非常爽。

  2. 多看RxJava的操作符源码,可以秒杀很多笔试题。

来自:RxJava