Java8-Lambda编程[5] 并发与CompletableFuture类

昔日未来之星 Future接口

  上一章讲了并行流,这一张也该讲讲流的并发了。要用到并发,无非就是多开些线程,Thread、Runable之类的俗套咱就不多说了,先来简单讲讲Future接口。没有接触过并发编程的同学可以先自行了解一下相关知识,以免对后面的内容感到生涩难懂。Future顾名思义,就是未来要产生的值,如果我要耗费较长时间来计算出一个值,并且在后面用到这个值之前还会做一些简单的其他任务,就可以使用Future。

例5.0:
    Future<Integer> future =
            Executors.newSingleThreadExecutor().submit(() -> calculator());
    //do something else ...
    future.get();

  上面的例子中,我们Executors的工厂方法实例化了一个单线程线程池并将其开启,然后通过submit方法向其提交了一个任务calculator。后面这个Lambda表达式代表着一个匿名的Callable或Runnble对象,Callable的call方法相对于Runnable的run方法多了一个返回值,在call方法执行结束后会进行回调,将值传给我们在最前面声明的Future。在线程池执行任务期间,我们可以继续在当前线程中做其他事情,然后在最后需要用到计算结果的时候,调用Future的get方法来获取。如果这时已经计算出结果,get会直接返回此结果,否则将会一直阻塞下去,我们可以为其传入第二个参数设置超时时间,超出时限还没有计算出结果就会抛出异常,结束计算。我们也可以调用cancel方法来手动关停任务,另外还有两个方法isCancel与isDone来检查Future所对应任务的当前执行状态。

  了解了Furture,我们就来试试如何将其与函数式编程结合起来,和流结合起来。算了,不想试了,反正最后也不会用它,直接来看新东西吧。

迟早要完 CompletableFuture类

  Future固然很好,但是结合起Stream来还是不太方便,为此Java8新加入了CompletableFuture类,该类实现了Future接口与CompletionStage接口,它的新方法会更好的与函数式编程相结合。CompletableFuture意为可完成的未来,即未来要完成之事,由于名字较长,我们姑且称其为要完。老Future一般只能通过向线程池提交任务的方式来获取实例,而我们可爱的要完君则添加了supplyAsync工厂方法,可直接使用该类默认的线程池来异步执行任务。

例5.1:
   List<CompletableFuture<String>> completableFutures= Stream.iterate(1, i->i+1)
            .map(i-> CompletableFuture.supplyAsync(()->i.toString()))
            .collect(Collectors.toList());
   List<String> strings=completableFutures.stream()
           .map(CompletableFuture::join)
           .collect(Collectors.toList());

  为了方便大家理解,我沿用了上面迭代生成1~1000的例子,当然上面已经说过这样会带来频繁装包拆包的问题,但是我们不管它,直接看代码。在map方法中,要完类通过supplyAsync方法,以异步方式提交了一个将整数转化为字符串的任务,并返回相应的要完对象,然后由collect方法收集成一个List<CompletableFuture>。一套流程执行完毕后,又进行了第二次迭代,通过join方法将要完映射成字符串并收集起来。join方法与get一样都是用来获取运算结果的,只不过不会抛出异常,不然就需要在我们美丽的函数式语句中加入臃肿的try/catch块。

  可能有读者看了上面的代码会产生以下三个疑问:为何代码要分成两部分来写,而不是直接再调用一次map方法,然后收集成List?是否每一次的迭代都会单独执行在一个线程之中?相比并行,并发代码看起来很繁琐,效率上会比并发更高吗?下面我就来一一解答。

抽刀断水 分步迭代

  之所以拆成两部分写,是因为合在一起的写法会影响并发。上述代码中第二个map操作会通过join方法将要完映射为字符串,而这时如果这时第一个map所提交的异步任务还没有计算出结果,join方法显然会发生阻塞。一旦某一迭代元素的join方法发生了阻塞,那么后面的元素就只能苦苦等待,等待前面元素的join方法计算完毕再通过第一个map方法提交自己的异步任务,依次类推。如果所有元素的join方法都阻塞,我们这个并发和串行还有什么区别呢?相比之下,分开的写法会先将所有的任务提交而不进行求值,直接在最后收集成一个List,再在第二次迭代的时候调用join方法,这样不同线程上的任务就可以一并等待了。

化龙金池 Executor接口

  前面说过Future对象一般只能通过向线程池提交任务来获得,而要完则是通过supplyAsync工厂方法来获得。为什么它可以这么拽呢?我们通过分析源码便可得知,它原来是内置了ForkJoinPool。ForkJoin的意思就是分支合并,和我们讲并行是提到的分支合并框架都是一路货色,所以默认情况下的并发和并行在具体实现和运行效率没有多大差别。这里可以回答我们后面的两个问题,我们的并发会开启多少线程以及效率是否比并行要好。由于ForkJoinPool所提供的线程是有限的,所以肯定不会每次迭代都开启一个新线程,具体会开多少我也没有深入研究,大家还请继续自行查验,不过肯定不会很多,不能说你想提交多少个任务人家就给你开几个进程。效率方面,既然跟并行是一个路子的东西,肯定也差不多了多少。相比之下,并行只需要调用一个方法,简洁得很,那么并发还有用的必要吗?答案当然是肯定的,不然我也不会在这里讲,并发既然专门有个类了,那肯定就比只有一个方法的并行灵活的多。

  首先要解决的就是效率问题,之所以并发并行差不多,就是因为并发默认用了并行的那一套东西,如果我们想根据实际情况对并发进行优化,就必须要革换线程池。为此,要完提供了supplyAsync的重载版本,允许我们在第二个参数位置传入我们自己选择的线程池,即一个Executor类型的对象。可以将原代码进行如下修改。

例5.2:
    Executor executor = Executors.newFixedThreadPool(1000);
    Stream.iterate(1, i -> i + 1)
            .map(i -> CompletableFuture.supplyAsync(() -> i.toString(),executor))
            ...

  上述代码中,我新建了一个线程池,该线程池拥有固定数量为1000的线程池,平均每个元素都能分到一个线程专门给自己用来迭代。那么线程池中的线程是不是越多越好呢?当然不是,若果你没有那么多的任务,开那么多线程不也是浪费啊,具体开多少才合适,就要涉及到任务密集类型与CPU线程相关的内容了,这里就不多讲了。有关Executor的更多原理与详细用法,这里也不想多讲,真正要到的读者可以专门去看看Java并发编程相关的书籍资料。

奔流相继 then方法族

  前面两个例子中,都是只在map方法中简单提交了一个异步任务,要完本身似乎没有显露出什么函数式的风范。其实它的本事可大着呢,真操练起来丝毫不逊色于前面的Stream与Optional,它的可级联的方法皆以then开头,很符合要完用于提交异步任务的本质。

例5.3:
      CompletableFuture.supplyAsync(() -> "CF")
            .thenApply(s -> s + ".thenApply")
            .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ".thenCompose"))
            .thenAccept(System.out::println);

  这里用到了三个then族方法,看名字与参数大致可以推测一下它们的含义。thenApply方法会在任务执行完毕后再继续执行其他操作,消费之前的结果并产生新的结果,有一点像map类的方法。thenCompose方法跟thenApply功能类似,只是传入的表达式需要返回CompletionStage类型的值。CompletionStage前面说过,是要完除了Future外所实现的另一个接口,可以看做是它的养父。尽管在表达式的返回值上有区别,thenCompose方法的返回值却仍然和thenApply一样是单层泛型参数的要完,而不会发生嵌套,返回一个CompletableFuture<CompletableFuture>。这就很像flatMap类的方法。这两个比喻,已经被写在Javadoc中,这就说明,要完的设计思想与Stream、Optional、Collector等接口是很类似的,都体现了级联式函数式编程的思想。下面再来看看最后一个方法thenAccept,该方法的参数与前两者不同,前两者都是传入的Function可以产生新的运算结果,而它则传入一个Consumer直接把之前算出来的结果给干掉了。尽管如此,为了和其他then族方法保持一致,免得级联在它这里掉队,它还是很诚实的返回一个CompletableFuture,注意这里的Void是开头大写的包装类,我们如果对它返回的空要完再进行join、get之类的求值操作就只能得到空空如也的null。

并驾齐驱 双元then族

  我们虽然可以通过thenCompose方法将两个要完结合在一起,但是毕竟作为方法调用者的要完与作为表达式结果的要完是不平等的关系,他们中间隔着一个then,就意味着后者需要在前者执行完毕后,在处理它计算出来的结果,嚼人家吃剩的东西。而有些情况下,两个要完并不存在依赖关系,二者可以同时进行,只需要在最后将他们的结果合并到一起,这就需要用到thenCombine方法,传入一个BiFunction。

例5.4:
    CompletableFuture.supplyAsync(() -> "former")
            .thenCombine(CompletableFuture.supplyAsync(()->"latter"),
                    (former,latter)->former+latter)
            .thenRun(()->{});

  thenCombine方法有两个参数,第一个是要合并的要完,第二个就是合并的规则,当然你也可以选择无视前两者的运算结果,当他们都运行完毕后直接返回一个空要完,当然这一般没什么意义。上述代码中,级联到最后调用了一个thenRun,该方法传入一个Runable参数,来对级联进行收尾工作。我们知道Runnable接口中需要实现的run方法是没有返回值的,所以thenRun和thenAccept一个德行,执行后返回一个空要完,后面再接什么join、get甚至thenAccept都没有什么意义了。不过thenAccept后面到是可以接一个thenRun,在我们万事皆毕之后,用它来做一些收尾工作,比如发送个完成任务的消息给控制台或者用户界面。thenCombine的第二个参数是个BiFunction(二元函数),类似的还有BiConsumer(二元消费者),如果把第二个参数换成它,就得到了thenAcceptBoth方法。这个方法名字和thenAccept很像,thenAccept方法吃掉了前面计算出来的值后就不再往外吐东西,thenAcceptBoth比它多了个Both就变本加厉,可以吃掉两个要完计算出来的值。当然这话说得对于他们两个有失公正,既然设计了他们,就必然会有用处,Consumer和BiConsumer没有返回值,所以就只能通过在代码中产生副作用来对外部造成影响。

  then族方法到这里只讲了三分之一,不过不用担心,剩下的方法都是在前面方法的基础上设计出来的。他们的名字都是在前面讲到的方法后面加上Async,显然会异步的执行新增的任务,比如thenCombineAsync就会单独开启一个线程来执行两个要完的合并操作。这些方法还有一种重载,在后面加上了Executor参数,让我们自行选择线程池来执行异步操作。

更多惊喜 更多then族

  正当我天真的以为then族方法到这里就讲完了,却发现要完还有很多不姓then但是仍然属于then族的方法,比如下面这俩货。

例5.5:
      CompletableFuture.supplyAsync(() -> "first task")
            .applyToEither(CompletableFuture.supplyAsync(() -> "second task")
                    ,s -> s+" is quickest")
            .acceptEither(CompletableFuture.supplyAsync(() -> "third task is quickest")
                    ,System.out::println);

  applyToEither方法会在两个要完有一个先完的时候,将它的计算结果作为参数进行下一步运算,acceptEither与之同理,会消费掉先完犊子那货的计算结果。此外还有,runAterBoth与runAfterEither,看名字也知道一个是在都完成的时候run,一个是在有一个先完的时候run。与正统的then族方法一样,这些方法也都有对应的两种Async系方法。

  以上这些方法都没有考虑计算出错的问题,一旦在Lambda表达式中抛出了异常,后果便不堪设想,为此就出现了以下三个救世方法:exceptionally、whenComplete、handle。exceptionally方法用于对任务执行时发生了异常的要完进行补偿,它的参数是一个Function<Throwable,T>,需要传入一个并返回一个补偿后计算结果,有点像异常处理结构中的catch块。whenComplete方法与之类似,参数是一个BiConsumer,传入计算结果与异常,在计算结束的时候进行处理,如果顺利计算出结果,那么异常就会是null,否则计算结果是null。与thenAccept系方法不同,whenComplete方法不会只吃不吐,它的返回值的泛型参数是计算结果的类型而不是Void,调用该方法会返回要完自身。如果不想返回自身,而是在此基础上对结果进行进一步的操作,就像thenApply做的那样,我们可以调用handle方法,它的参数是一个BiFunction,我们在前面已经讲过它与BiConsumer的区别,相信大家看到这里已经能领悟到此方法的功能了。whenComplete与handle两个方法也都有对应的Async系方法。

要完不完 取值方法

  以上所有的方法都有一个共同的特点,就是返回值都是要完类型,所以它们才能进行级联。这些方法很像是Stream接口中的延时求值方法,而get和join就类似相应的及时求值方法。除了这两个方法,我们还可以getNow方法来获取要完的计算结果,它只有一个参数,代表当缺少计算结果是要提供的默认值,如果你很心急,现在就要,那么可以试试getNow。除此之外,还可以调用complete方法,传入一个计算结果给要完返回一个boolean,要玩这时候会处于完成状态,后面如果再调用join之类的方法,就会立即获得我们传入的值。除了complete,要完还从老Future那里继承来了一个cancel方法,两个方法名字看上去很类似,功能也很类似,但具体差别在哪里,我这里没做研究就不乱说了,以免误人子弟,只是推荐大家优先使用complete,毕竟新方法新气象不是?

终于要完 其他方法

  讲了这么多,要完的方法还没讲完,不过剩的也没几个。有的名字又长又生僻,让人看了就不想用,我就不讲了,还有的诸如isCancel、isDone一看就知道什么意思我也不讲了。最后就再提一个有意思的方法吧,名字叫做toCompletableFuture,该方法会返回要完本身。之所以会有这样的奇葩方法,不是为了让要完的方法再多一些,而是因为要完的养父CompletionStage接口里面有这么个方法要它来实现,所以也只好出此下策了。

还没有完 静态方法

  本来以为到这里终于要把要完的方法讲完了,却发现漏了静态方法,吐血。除了supplyAsync方法,要完类还有allOf、anyOf、CompleteFuture、runAsync四个工厂方法。allOf与anyOf方法通过传入不同泛型参数的要完数组来开启多个要完,不同的是allOf会在所有的要完都完了之后返回一个空要完,而anyOf会在任意一个要完完成后,返回一个包含了该要完计算结果的新要完。CompleteFuture直接返回一个包含了你想要的计算结果的新要完,而runAsync和supplyAsync很相似,只不过参数是一个Runable,所以按照惯例返回值也只能是个泛型参数为Void的空要完。

结语

  看了这么多的方法,单独使用他们其实还不够,最终的目的是要和流搭配起来。具体的写法很多样,可以在map方法里传入一个要完,再在后面级联的map方法中写入诸如cf->cf.thenCompose(CompletableFuture.supplyAsync(e->e))的表达式,也可以直接在第一个map中就直接调用要完的各个级联。具体的代码我就先不展示了,等有空写个大点的工程,挑点好的代码给大家一起玩味一番。