# CompletableFuture
对于异步线程编排
# API
# 线程船行化
runAsync 无返回值
- runAsync(Runnable)
- runAsync(Runnable, Executor)
supplyAsync 有返回值
- supplyAsync(Runnable)
- supplyAsync(Runnable, Executor)
whenComplete/whenCompleteAsync 任务编排,将上一个结果获取执行下一个任务
以Async结尾的,当上一个任务执行完之后,下一个任务放入线程池,由线程池执行线程,相反不带Async则是用上一个执行完的线程继续执行
- exceptionally 感知异常,并且处理错误内容
- handle 执行完成后的处理(无论成功还是失败)
- thenRun 接下来做什么(不获取上一步任务结果)
- thenAccept 接下来做什么(获取上一步任务结果)
- thenApply 接下来做什么(获取上一步结果,并返回本次结果)
# 两任务组合
- runAfterBoth 组合两个任务,不需要获取结果,只需要执行任务
- thenAcceptBoth 组合两个任务,获取上一次执行结果
- thenCombine 组合两个任务,获取上一次执行结果,并返回本次任务返回值
# 两任务组合,任意一个完成
- applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
- acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
- runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。
# 多任务组合
- allOf:所有任务完成
- anyOf:只有一个任务完成
# 线程串行化
# runAsync
@Slf4j
public class Demo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture.runAsync(() -> {
log.info("CompletableFuture");
}, executorService);
executorService.shutdown();
}
}
# supplyAsync7
@Slf4j
public class Demo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> suppliedAsync = CompletableFuture.supplyAsync(() -> {
log.info("CompletableFuture");
return 1;
}, executorService);
log.info("{}", suppliedAsync.get());
executorService.shutdown();
}
}
# whenComplete
@Slf4j
public class Demo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> suppliedAsync = CompletableFuture.supplyAsync(() -> {
log.info("CompletableFuture");
return 1;
}, executorService)
.whenComplete((res, err) -> {
log.info("上一个结果返回的内容 :{}", res);
int i = res + 1;
log.info("计算的新结果 :{}", i);
});
log.info("{}", suppliedAsync.get());
executorService.shutdown();
}
}
# exceptionally 异常处理
@Slf4j
public class Demo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> suppliedAsync = CompletableFuture.supplyAsync(() -> {
log.info("CompletableFuture");
int i = 1 / 0;
return 1;
}, executorService)
.whenComplete((res, err) -> {
//虽然能感知到异常信息,但是没法修改返回数据
log.info("上一个结果返回的内容 :{}", res);
log.error("错误信息:{}", err.getMessage());
int i = res + 1;
log.info("计算的新结果 :{}", i);
})
.exceptionally(err -> {
log.error("专门处理错误信息,感知到错误");
log.error(err.getMessage());
//上一步产生错误信息,返回默认数值
return 10;
});
log.info("{}", suppliedAsync.get());
executorService.shutdown();
}
# handle
@Slf4j
public class Demo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> suppliedAsync = CompletableFuture.supplyAsync(() -> {
log.info("CompletableFuture");
int i = 1 / 0;
return 1;
}, executorService)
.handle((res, err) -> {
//虽然能感知到异常信息,但是没法修改返回数据
log.info("上一个结果返回的内容 :{}", res);
log.error("错误信息:{}", err.getMessage());
if (res != null) {
return res + 10;
}
if (err != null) {
return 10;
}
return 20;
});
log.info("{}", suppliedAsync.get());
executorService.shutdown();
}
}
# 两任务组合
@Slf4j
public class Demo {
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture<Integer> firstThread = CompletableFuture.supplyAsync(() -> {
log.info("线程一开始");
return 10;
}, executorService);
CompletableFuture<Integer> secondThread = CompletableFuture.supplyAsync(() -> {
log.info("线程二开始");
return 10;
}, executorService);
firstThread.runAfterBothAsync(secondThread, () -> {
log.info("线程三开始");
},executorService);
}
}
← Cyclicbarrier 原理流程 →