• 首页

  • 归档

  • 标签

  • 分类

  • 友链
M S B l o g
M S B l o g

ms

获取中...

07
11
java
总结
教程

Java8 CompletableFuture异步编程

发表于 2021-07-11 • java 总结 java8 多线程 • 被 920 人看爆

CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过 回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。CompletableFuture实现了Future, CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类

为什么使用CompletableFuture,不使用Future?

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行。

虽然Future以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。

1,创建异步方法

CompletableFuture 提供了四个静态方法来创建一个异步操作。

static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

  • runAsync方法不支持返回值。
  • supplyAsync可以支持返回值。

示例:

public class ThreadTest2 {

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main..........start");
//        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
//            System.out.println("当前线程:" + Thread.currentThread().getId());
//            int i = 10 / 2;
//            System.out.println("运行结果:" + i);
//        }, service);

//        CompletableFuture<Integer> future = C ompletableFuture.supplyAsync(() -> {
//            System.out.println("当前线程:" + Thread.currentThread().getId());
//            int i = 10 / 0;
//            System.out.println("运行结果:" + i);
//            return i;
//        }, service).whenCompleteAsync((res,exc)->{
//            System.out.println("异步任务完成了。。。。结果是:"+res+",异常是:"+exc);
//        }).exceptionally(throwable -> {
//            return 10;
//        });

//        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
//            System.out.println("当前线程:" + Thread.currentThread().getId());
//            int i = 10 / 1;
//            System.out.println("运行结果:" + i);
//            return i;
//        }, service).handle((res,thr)->{
//            if(res != null){
//                return res*2;
//            }
//            if(thr != null){
//                return 0;
//            }
//            return 0;
//        });

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 1;
            System.out.println("运行结果:" + i);
            return i;
        }, service).thenApplyAsync(res->{
            System.out.println("线程2启动了......."+res*10);
            return res*2;
        },service);

        System.out.println("main................end..."+future.get());
    }

    public static class Runable01 implements Runnable{

        @Override
        public void run() {
            System.out.println("当前线程:"+Thread.currentThread().getId());
            int i = 10/2;
            System.out.println("运行结果:"+i);
        }
    }
}

2,计算完成时回调方法

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor);

public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);

whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。BiConsumer<? super T,? super Throwable>可以定义处理业务

whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

示例:

public class CompletableFutureDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
            @Override
            public Object get() {
                System.out.println(Thread.currentThread().getName() + "\t completableFuture");
                int i = 10 / 0;
                return 1024;
            }
        }).whenComplete(new BiConsumer<Object, Throwable>() {
            @Override
            public void accept(Object o, Throwable throwable) {
                System.out.println("-------o=" + o.toString());
                System.out.println("-------throwable=" + throwable);
            }
        }).exceptionally(new Function<Throwable, Object>() {
            @Override
            public Object apply(Throwable throwable) {
                System.out.println("throwable=" + throwable);
                return 6666;
            }
        });
        System.out.println(future.get());
    }
}

3,handle 方法

handle 是执行任务完成时对结果的处理。
handle 是在任务完成后再执行,还可以处理异常的任务。

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

示例:

public class ThreadTest2 {

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main..........start");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 1;
            System.out.println("运行结果:" + i);
            return i;
        }, service).handle((res,thr)->{
            if(res != null){
                return res*2;
            }
            if(thr != null){
                return 0;
            }
            return 0;
        });

        System.out.println("main................end..."+future.get());
    }

    public static class Runable01 implements Runnable{

        @Override
        public void run() {
            System.out.println("当前线程:"+Thread.currentThread().getId());
            int i = 10/2;
            System.out.println("运行结果:"+i);
        }
    }
}

4,线程串行化方法

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。

thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行 thenRun的后续操作

带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型

示例:

public class ThreadTest2 {

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main..........start");

        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 1;
            System.out.println("运行结果:" + i);
            return i;
        }, service).thenApplyAsync(res->{
            System.out.println("线程2启动了......."+res*10);
            return res*2;
        },service);

        System.out.println("main................end..."+future.get());
    }

    public static class Runable01 implements Runnable{

        @Override
        public void run() {
            System.out.println("当前线程:"+Thread.currentThread().getId());
            int i = 10/2;
            System.out.println("运行结果:"+i);
        }
    }
}

5,两任务组合 - 都要完成

两个任务必须都完成,触发该任务。

  • thenCombine:组合两个future,获取两个future的返回结果,并返回当前任务的返回值

  • thenAcceptBoth:组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值。

  • runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。

public <U,V> CompletableFuture<V> thenCombine(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn, Executor executor);
    

public <U> CompletableFuture<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action);

public <U> CompletableFuture<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConsumer<? super T, ? super U> action, Executor executor);


public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                            Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action);

public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                 Runnable action,
                                                 Executor executor);

示例:

public class ThreadTest3 {

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main..........start");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程1:" + Thread.currentThread().getId());
            int i = 10 / 1;
            System.out.println("线程1结束");
            return i;
        }, service);

        CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程2:" + Thread.currentThread().getId());
            int i = 10 / 1;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程2结束:");
            return "hello";
        }, service);

        future01.runAfterBothAsync(future02,()->{
            System.out.println("任务3开始...");
        },service);

        future01.thenAcceptBothAsync(future02,(f1,f2)->{
            System.out.println("任务3开始...之前的结果:"+f1+"--->"+f2);
        },service);
        System.out.println("main................end...");
    }

    public static class Runable01 implements Runnable{

        @Override
        public void run() {
            System.out.println("当前线程:"+Thread.currentThread().getId());
            int i = 10/2;
            System.out.println("运行结果:"+i);
        }
    }
}

6,两任务组合 - 一个完成

当两个任务中,任意一个future任务完成的时候,执行任务。

  • applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。

  • acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。

  • runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

public <U> CompletableFuture<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn);

public <U> CompletableFuture<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn,
    Executor executor);

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action);

public CompletableFuture<Void> acceptEitherAsync(
    CompletionStage<? extends T> other, Consumer<? super T> action,
    Executor executor);

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                              Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action);

public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                   Runnable action,
                                                   Executor executor);

示例:

public class ThreadTest3 {

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main..........start");

        CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程1:" + Thread.currentThread().getId());
            int i = 10 / 1;
            System.out.println("线程1结束");
            return i;
        }, service);

        CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("线程2:" + Thread.currentThread().getId());
            int i = 10 / 1;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("线程2结束:");
            return "hello";
        }, service);


        future01.runAfterEitherAsync(future02,()->{
            System.out.println("任务3开始...");
        });

        System.out.println("main................end...");
    }

    public static class Runable01 implements Runnable{

        @Override
        public void run() {
            System.out.println("当前线程:"+Thread.currentThread().getId());
            int i = 10/2;
            System.out.println("运行结果:"+i);
        }
    }
}

7.多任务组合

allOf:等待所有任务完成

anyOf:只要有一个任务完成

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

示例:

public class Thread4 {

    public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main..........start");

        CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品页面");
            return "hello.jsp";
        }, service);

        CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("查询商品属性");
            return "黑色+256G";
        }, service);

        CompletableFuture<String> futureDesc = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品介绍");
            return "华为";
        }, service);

//        CompletableFuture<Void> allOf = CompletableFuture.allOf(futureImg, futureAttr, futureDesc);
//
//        System.out.println("结果:"+futureImg.get()+"----"+futureAttr.get()+"--------"+futureDesc.get());

        CompletableFuture<Object> anyOf = CompletableFuture.anyOf(futureImg, futureAttr, futureDesc);

        System.out.println("结果:"+anyOf.get());

        System.out.println("main..........end");
    }
    public static class Runable01 implements Runnable{

        @Override
        public void run() {
            System.out.println("当前线程:"+Thread.currentThread().getId());
            int i = 10/2;
            System.out.println("运行结果:"+i);
        }
    }
}
分享到:
springcloud 整合OAuth2社交登录
Java多线程实现的四种方式
  • 文章目录
  • 站点概览
ms

MSms

⚓️HelloWorld⚓️

QQ Email RSS
看爆 Top5
  • MyBatis-Plus分页查询 5,936次看爆
  • @Autowired与@Resource的区别 4,754次看爆
  • feign远程调用及异步调用丢失请求头问题 4,526次看爆
  • spring cloud中OpenFeign整合Sentinel启动报错 4,423次看爆
  • Certbot查看证书过期时间,手动续期以及自动续期 3,302次看爆

Copyright © 2025 ms · 湘ICP备20015239号

Proudly published with Halo · Theme by fyang · 站点地图