Java 异步编程与 CompletableFuture 详解
CompletableFuture是Java 8引入的异步编程工具,它实现了Future接口,并提供了丰富的API来支持异步编程、函数式编程和响应式编程。本文将详细介绍CompletableFuture的使用方法和最佳实践。
核心价值
CompletableFuture = 异步编程 + 函数式接口 + 任务编排 + 异常处理
- 🚀 高性能:无需阻塞等待结果,提高程序吞吐量
- 🧩 组合能力:支持复杂的异步任务编排和组合
- ⚡ 响应性:提高应用程序的响应性和用户体验
- 🛡️ 健壮性:内置异常处理和超时控制机制
- 🔄 流式API:支持函数式编程和链式调用
1. 异步编程概述
1.1 为什么需要异步编程?
核心概念
异步编程是一种编程范式,允许程序在等待某个操作完成时继续执行其他任务,从而提高程序的响应性和吞吐量。
1.2 同步 vs 异步
同步与异步编程对比
| 特性 | 同步编程 | 异步编程 |
|---|---|---|
| 执行方式 | 顺序执行,阻塞等待 | 并发执行,非阻塞 |
| 响应性 | 低,容易阻塞 | 高,保持响应 |
| 资源利用 | 效率低,资源浪费 | 效率高,资源充分利用 |
| 编程复杂度 | 简单直观 | 相对复杂 |
| 调试难度 | 容易调试 | 调试困难 |
1.3 异步编程的优势
- 同步代码
- 异步代码
- 性能对比
java
1/**2 * 同步执行示例3 */4public static class SynchronousExample {5 public static void main(String[] args) {6 long start = System.currentTimeMillis();7 8 // 同步执行,每个任务需要1秒9 String result1 = doTask1(); // 阻塞1秒10 String result2 = doTask2(); // 阻塞1秒11 String result3 = doTask3(); // 阻塞1秒12 13 // 总耗时3秒14 System.out.println("结果: " + result1 + ", " + result2 + ", " + result3);15 System.out.println("总耗时: " + (System.currentTimeMillis() - start) + "ms");16 }17 18 private static String doTask1() {19 try {20 Thread.sleep(1000); // 模拟耗时操作21 } catch (InterruptedException e) {22 Thread.currentThread().interrupt();23 }24 return "Task1完成";25 }26 27 private static String doTask2() {28 try {29 Thread.sleep(1000); // 模拟耗时操作30 } catch (InterruptedException e) {31 Thread.currentThread().interrupt();32 }33 return "Task2完成";34 }35 36 private static String doTask3() {37 try {38 Thread.sleep(1000); // 模拟耗时操作39 } catch (InterruptedException e) {40 Thread.currentThread().interrupt();41 }42 return "Task3完成";43 }44}同步代码特点:
- 顺序执行,前一个任务完成后才会执行下一个任务
- 线程会阻塞等待每个任务完成
- 总耗时是所有任务耗时的总和
- 任务之间有明确的执行顺序
- 代码简单直观,容易理解和调试
java
1import java.util.concurrent.CompletableFuture;23/**4 * 异步执行示例5 */6public static class AsynchronousExample {7 public static void main(String[] args) {8 long start = System.currentTimeMillis();9 10 // 异步执行,所有任务并行运行11 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> doTask1());12 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> doTask2());13 CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> doTask3());14 15 // 等待所有任务完成16 CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);17 18 // 当所有任务完成后处理结果19 allFutures.thenRun(() -> {20 try {21 String result1 = future1.get(); // 此时已完成,不会阻塞22 String result2 = future2.get();23 String result3 = future3.get();24 25 System.out.println("结果: " + result1 + ", " + result2 + ", " + result3);26 System.out.println("总耗时: " + (System.currentTimeMillis() - start) + "ms");27 } catch (Exception e) {28 e.printStackTrace();29 }30 });31 32 // 等待完成33 allFutures.join();34 }35 36 // 任务方法同上37 private static String doTask1() { /* 同上 */ return "Task1完成"; }38 private static String doTask2() { /* 同上 */ return "Task2完成"; }39 private static String doTask3() { /* 同上 */ return "Task3完成"; }40}异步代码特点:
- 并行执行,所有任务同时启动
- 主线程不会阻塞,可以执行其他操作
- 总耗时接近最长任务的耗时
- 任务之间的执行顺序不确定
- 使用回调处理完成的任务
同步与异步执行性能对比:
假设有10个相互独立的任务,每个任务执行时间为1秒:
| 执行方式 | 执行时间 | CPU利用率 | 内存占用 | 线程数 |
|---|---|---|---|---|
| 同步执行 | 10秒 | 低 | 低 | 1 |
| 异步执行 | 1秒左右 | 高 | 略高 | 多个 |
异步编程的主要优势:
- 性能提升:并行处理多个任务,总体耗时减少
- 资源利用:充分利用系统资源,提高吞吐量
- 响应性:不阻塞主线程,保持UI响应或快速响应请求
- 可扩展性:更容易处理大量并发操作
适用场景:
- 网络请求
- 文件I/O操作
- 多API调用
- 复杂计算任务
- 需要保持UI响应的应用
1.4 Java中的异步编程模型
Java异步编程模型演进
| 版本 | 特性 | 优缺点 |
|---|---|---|
| Java 1.0 | Thread类 | 直接操作线程,灵活但复杂 |
| Java 1.5 | Executor框架 | 线程池管理,简化线程创建 |
| Java 1.5 | Future接口 | 异步结果封装,但功能有限 |
| Java 5 | ExecutorService | 提供更完整的线程池管理 |
| Java 5 | Callable和Future | 支持有返回值的任务 |
| Java 7 | ForkJoinPool | 支持工作窃取算法,适合递归任务 |
| Java 8 | CompletableFuture | 支持函数式、链式异步编程 |
| Java 9+ | 增强CompletableFuture | 添加超时控制等新功能 |
从Future到CompletableFuture的提升:
- Future只能通过阻塞的get()获取结果
- Future无法链式组合多个异步操作
- Future缺乏异常处理机制
- CompletableFuture解决了上述所有问题
2. CompletableFuture API 详解
2.1 创建 CompletableFuture
- 创建方法
- 创建方式对比
- 线程池策略
java
1import java.util.concurrent.CompletableFuture;2import java.util.concurrent.ExecutorService;3import java.util.concurrent.Executors;45public class CompletableFutureCreation {6 public static void main(String[] args) throws Exception {7 // 1. 使用supplyAsync创建有返回值的异步任务8 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {9 // 异步执行的代码10 try {11 Thread.sleep(1000);12 } catch (InterruptedException e) {13 Thread.currentThread().interrupt();14 }15 return "异步任务结果";16 });17 18 // 2. 使用runAsync创建无返回值的异步任务19 CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {20 // 异步执行的代码,无返回值21 try {22 Thread.sleep(1000);23 } catch (InterruptedException e) {24 Thread.currentThread().interrupt();25 }26 System.out.println("异步任务执行完成");27 });28 29 // 3. 使用自定义线程池30 ExecutorService executor = Executors.newFixedThreadPool(3);31 CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {32 return "使用自定义线程池";33 }, executor);34 35 // 4. 手动完成Future36 CompletableFuture<String> future4 = new CompletableFuture<>();37 new Thread(() -> {38 try {39 Thread.sleep(1000);40 future4.complete("手动完成的结果"); // 正常完成41 // 或者异常完成42 // future4.completeExceptionally(new RuntimeException("出错了"));43 } catch (InterruptedException e) {44 Thread.currentThread().interrupt();45 future4.completeExceptionally(e); // 异常完成46 }47 }).start();48 49 // 5. 使用completedFuture创建已完成的Future50 CompletableFuture<String> future5 = CompletableFuture.completedFuture("立即完成的结果");51 52 // 获取结果并打印53 System.out.println("Future1结果: " + future1.get());54 future2.get(); // 等待完成55 System.out.println("Future3结果: " + future3.get());56 System.out.println("Future4结果: " + future4.get());57 System.out.println("Future5结果: " + future5.get());58 59 // 关闭线程池60 executor.shutdown();61 }62}不同创建方式对比:
| 创建方式 | 返回值 | 执行线程池 | 使用场景 | 示例 |
|---|---|---|---|---|
| supplyAsync() | 有 | ForkJoinPool.commonPool() | 需要返回结果的任务 | supplyAsync(() -> "result") |
| supplyAsync(executor) | 有 | 自定义线程池 | 定制线程池策略 | supplyAsync(() -> "result", myExecutor) |
| runAsync() | 无 | ForkJoinPool.commonPool() | 不需要返回结果的任务 | runAsync(() -> { doWork(); }) |
| runAsync(executor) | 无 | 自定义线程池 | 定制线程池的无返回值任务 | runAsync(() -> { doWork(); }, myExecutor) |
| completedFuture() | 有 | 不使用线程池 | 提供默认值或已知结果 | completedFuture("default") |
| new CompletableFuture() | 取决于complete时提供的值 | 不使用线程池 | 手动控制完成时机 | future.complete("result") |
选择指南:
- 有返回值使用
supplyAsync(),无返回值使用runAsync() - 高并发场景建议使用自定义线程池
- 已知结果时直接使用
completedFuture() - 需要精确控制完成时机时使用手动完成方式
java
1import java.util.concurrent.ExecutorService;2import java.util.concurrent.Executors;34/**5 * 不同场景下的线程池策略6 */7public static class ThreadPoolStrategies {8 // CPU密集型任务的线程池9 public static ExecutorService cpuIntensivePool() {10 int processors = Runtime.getRuntime().availableProcessors();11 return Executors.newFixedThreadPool(processors);12 }13 14 // IO密集型任务的线程池15 public static ExecutorService ioIntensivePool() {16 int processors = Runtime.getRuntime().availableProcessors();17 return Executors.newFixedThreadPool(processors * 2);18 }19 20 // 定时任务的线程池21 public static ExecutorService scheduledPool() {22 return Executors.newScheduledThreadPool(5);23 }24 25 // 使用示例26 public static void main(String[] args) {27 ExecutorService cpuPool = cpuIntensivePool();28 ExecutorService ioPool = ioIntensivePool();29 30 // CPU密集型任务31 CompletableFuture<Integer> cpuTask = CompletableFuture.supplyAsync(() -> {32 // 计算密集型任务33 return performComplexCalculation();34 }, cpuPool);35 36 // IO密集型任务37 CompletableFuture<String> ioTask = CompletableFuture.supplyAsync(() -> {38 // IO密集型任务39 return fetchDataFromNetwork();40 }, ioPool);41 42 // 不要忘记关闭线程池43 cpuPool.shutdown();44 ioPool.shutdown();45 }46 47 private static int performComplexCalculation() {48 // 模拟复杂计算49 return 42;50 }51 52 private static String fetchDataFromNetwork() {53 // 模拟网络请求54 return "data";55 }56}2.2 链式调用
CompletableFuture常用链式方法
数据转换方法:
thenApply(Function)- 转换结果并返回新值thenApplyAsync(Function)- 异步线程中转换
结果消费方法:
thenAccept(Consumer)- 消费结果但不返回新值thenRun(Runnable)- 完成后执行操作,不使用结果
组合方法:
thenCompose(Function)- 链接两个CompletableFuturethenCombine(CompletableFuture, BiFunction)- 组合两个Future的结果
执行线程选择:
- 不带Async后缀的方法 - 使用上一阶段的线程执行
- 带Async后缀的方法 - 使用ForkJoinPool或指定线程池
- 基本链式调用
- 异步链式调用
- 方法对比
java
1import java.util.concurrent.CompletableFuture;23/**4 * CompletableFuture链式调用示例5 */6public static void main(String[] args) throws Exception {7 // 创建并链式调用CompletableFuture8 CompletableFuture<String> future = CompletableFuture9 .supplyAsync(() -> "Hello") // 返回Hello10 .thenApply(s -> s + " World") // 转换结果为"Hello World"11 .thenApply(s -> s + "!") // 转换结果为"Hello World!"12 .thenApply(String::toUpperCase); // 转换为大写13 14 // 获取最终结果15 String result = future.get();16 System.out.println("链式调用结果: " + result); // 输出: HELLO WORLD!17 18 // 演示thenAccept和thenRun19 CompletableFuture.supplyAsync(() -> "CompletableFuture")20 .thenApply(s -> s + " is awesome")21 .thenAccept(s -> System.out.println("消费结果: " + s)) // 消费结果22 .thenRun(() -> System.out.println("处理完成")); // 最后执行23}java
1import java.util.concurrent.CompletableFuture;2import java.util.concurrent.ExecutorService;3import java.util.concurrent.Executors;45/**6 * 异步链式调用示例7 */8public static void main(String[] args) throws Exception {9 // 创建自定义线程池10 ExecutorService executor = Executors.newFixedThreadPool(3);11 12 // 异步链式调用13 CompletableFuture<String> asyncFuture = CompletableFuture14 .supplyAsync(() -> {15 System.out.println("第一步:" + Thread.currentThread().getName());16 return "第一步";17 }, executor)18 .thenApplyAsync(s -> {19 System.out.println("第二步:" + Thread.currentThread().getName());20 return s + " -> 第二步";21 }, executor)22 .thenApplyAsync(s -> {23 System.out.println("第三步:" + Thread.currentThread().getName());24 return s + " -> 第三步";25 }, executor);26 27 // 获取最终结果28 String result = asyncFuture.get();29 System.out.println("异步链式调用结果: " + result);30 31 // 关闭线程池32 executor.shutdown();33}| 方法类别 | 方法名 | 入参 | 出参 | 常用场景 |
|---|---|---|---|---|
| 转换类 | thenApply | Function | CompletableFuture | 将结果转换为新类型 |
| 转换类 | thenApplyAsync | Function | CompletableFuture | 异步线程中转换结果 |
| 消费类 | thenAccept | Consumer | CompletableFuture | 使用结果但不需要返回值 |
| 消费类 | thenAcceptAsync | Consumer | CompletableFuture | 异步消费结果 |
| 执行类 | thenRun | Runnable | CompletableFuture | 完成后执行,不使用结果 |
| 执行类 | thenRunAsync | Runnable | CompletableFuture | 异步执行后续操作 |
| 组合类 | thenCompose | Function | CompletableFuture | 链接两个Future |
| 组合类 | thenCombine | CompletableFuture, BiFunction | CompletableFuture | 组合两个Future结果 |
代码示例:
java
1// thenApply - 转换结果2CompletableFuture<Integer> future1 = 3 CompletableFuture.supplyAsync(() -> "42")4 .thenApply(Integer::parseInt);5 6// thenAccept - 消费结果7CompletableFuture<Void> future2 = 8 CompletableFuture.supplyAsync(() -> "Hello")9 .thenAccept(System.out::println);10 11// thenRun - 完成后执行12CompletableFuture<Void> future3 = 13 CompletableFuture.supplyAsync(() -> "Hello")14 .thenRun(() -> System.out.println("完成"));15 16// thenCompose - 链接另一个Future17CompletableFuture<String> future4 = 18 CompletableFuture.supplyAsync(() -> "Hello")19 .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));2.3 组合多个 Future
CompletableFuture组合示例
java
1public class CompletableFutureCombination {2 3 /**4 * 组合多个Future5 */6 public static void main(String[] args) {7 System.out.println("=== CompletableFuture组合 ===");8 9 // 1. thenCombine - 组合两个Future的结果10 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");11 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");12 13 CompletableFuture<String> combined = future1.thenCombine(future2, (result1, result2) -> 14 result1 + " " + result2);15 16 try {17 System.out.println("组合结果: " + combined.get());18 } catch (Exception e) {19 e.printStackTrace();20 }21 22 // 2. thenCompose - 链式组合23 CompletableFuture<String> composed = future1.thenCompose(result -> 24 CompletableFuture.supplyAsync(() -> result + " 被处理"));25 26 try {27 System.out.println("链式组合结果: " + composed.get());28 } catch (Exception e) {29 e.printStackTrace();30 }31 32 // 3. allOf - 等待所有Future完成33 CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "任务1");34 CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "任务2");35 CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> "任务3");36 37 CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);38 39 allTasks.thenRun(() -> {40 try {41 System.out.println("所有任务完成: " + task1.get() + ", " + task2.get() + ", " + task3.get());42 } catch (Exception e) {43 e.printStackTrace();44 }45 });46 47 allTasks.join();48 49 // 4. anyOf - 等待任意一个Future完成50 CompletableFuture<String> fastTask = CompletableFuture.supplyAsync(() -> {51 try {52 Thread.sleep(500);53 } catch (InterruptedException e) {54 Thread.currentThread().interrupt();55 }56 return "快速任务";57 });58 59 CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {60 try {61 Thread.sleep(2000);62 } catch (InterruptedException e) {63 Thread.currentThread().interrupt();64 }65 return "慢速任务";66 });67 68 CompletableFuture<Object> anyTask = CompletableFuture.anyOf(fastTask, slowTask);69 try {70 System.out.println("第一个完成的任务: " + anyTask.get());71 } catch (Exception e) {72 e.printStackTrace();73 }74 }75}2.4 异常处理
CompletableFuture异常处理示例
java
1public class CompletableFutureExceptionHandling {2 3 /**4 * 异常处理示例5 */6 public static void main(String[] args) {7 System.out.println("=== CompletableFuture异常处理 ===");8 9 // 1. exceptionally - 处理异常并返回默认值10 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {11 if (Math.random() > 0.5) {12 throw new RuntimeException("随机异常");13 }14 return "成功结果";15 }).exceptionally(throwable -> {16 System.out.println("捕获异常: " + throwable.getMessage());17 return "默认值";18 });19 20 try {21 System.out.println("Future1结果: " + future1.get());22 } catch (Exception e) {23 e.printStackTrace();24 }25 26 // 2. handle - 处理成功和异常情况27 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {28 if (Math.random() > 0.5) {29 throw new RuntimeException("随机异常");30 }31 return "成功结果";32 }).handle((result, throwable) -> {33 if (throwable != null) {34 System.out.println("处理异常: " + throwable.getMessage());35 return "异常时的默认值";36 } else {37 return result + " 处理成功";38 }39 });40 41 try {42 System.out.println("Future2结果: " + future2.get());43 } catch (Exception e) {44 e.printStackTrace();45 }46 47 // 3. whenComplete - 无论成功失败都执行48 CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {49 if (Math.random() > 0.5) {50 throw new RuntimeException("随机异常");51 }52 return "成功结果";53 }).whenComplete((result, throwable) -> {54 if (throwable != null) {55 System.out.println("任务失败: " + throwable.getMessage());56 } else {57 System.out.println("任务成功: " + result);58 }59 });60 61 try {62 future3.get();63 } catch (Exception e) {64 System.out.println("Future3抛出异常: " + e.getMessage());65 }66 }67}3. 实际应用场景
3.1 并行数据处理
并行数据处理示例
java
1import java.util.Arrays;2import java.util.List;3import java.util.stream.Collectors;45public class ParallelDataProcessing {6 7 /**8 * 并行数据处理9 */10 public static void main(String[] args) {11 System.out.println("=== 并行数据处理 ===");12 13 List<String> data = Arrays.asList("数据1", "数据2", "数据3", "数据4", "数据5");14 15 // 并行处理数据16 List<CompletableFuture<String>> futures = data.stream()17 .map(item -> CompletableFuture.supplyAsync(() -> processData(item)))18 .collect(Collectors.toList());19 20 // 等待所有处理完成21 CompletableFuture<Void> allFutures = CompletableFuture.allOf(22 futures.toArray(new CompletableFuture[0]));23 24 allFutures.thenRun(() -> {25 List<String> results = futures.stream()26 .map(CompletableFuture::join)27 .collect(Collectors.toList());28 29 System.out.println("处理结果: " + results);30 });31 32 allFutures.join();33 }34 35 private static String processData(String data) {36 try {37 Thread.sleep(1000); // 模拟处理时间38 } catch (InterruptedException e) {39 Thread.currentThread().interrupt();40 }41 return data + " 已处理";42 }43}3.2 异步API调用
异步API调用示例
java
1public class AsyncApiCall {2 3 /**4 * 异步API调用5 */6 public static void main(String[] args) {7 System.out.println("=== 异步API调用 ===");8 9 // 模拟异步API调用10 CompletableFuture<User> userFuture = getUserAsync(1L);11 CompletableFuture<Order> orderFuture = getOrderAsync(1L);12 CompletableFuture<Product> productFuture = getProductAsync(1L);13 14 // 组合多个API调用结果15 CompletableFuture<UserOrderInfo> combinedFuture = userFuture16 .thenCombine(orderFuture, (user, order) -> new UserOrderInfo(user, order))17 .thenCombine(productFuture, (userOrder, product) -> {18 userOrder.setProduct(product);19 return userOrder;20 });21 22 try {23 UserOrderInfo result = combinedFuture.get();24 System.out.println("组合结果: " + result);25 } catch (Exception e) {26 e.printStackTrace();27 }28 }29 30 private static CompletableFuture<User> getUserAsync(Long userId) {31 return CompletableFuture.supplyAsync(() -> {32 try {33 Thread.sleep(1000);34 } catch (InterruptedException e) {35 Thread.currentThread().interrupt();36 }37 return new User(userId, "用户" + userId);38 });39 }40 41 private static CompletableFuture<Order> getOrderAsync(Long orderId) {42 return CompletableFuture.supplyAsync(() -> {43 try {44 Thread.sleep(1000);45 } catch (InterruptedException e) {46 Thread.currentThread().interrupt();47 }48 return new Order(orderId, "订单" + orderId);49 });50 }51 52 private static CompletableFuture<Product> getProductAsync(Long productId) {53 return CompletableFuture.supplyAsync(() -> {54 try {55 Thread.sleep(1000);56 } catch (InterruptedException e) {57 Thread.currentThread().interrupt();58 }59 return new Product(productId, "产品" + productId);60 });61 }62 63 static class User {64 private Long id;65 private String name;66 67 public User(Long id, String name) {68 this.id = id;69 this.name = name;70 }71 72 @Override73 public String toString() {74 return "User{id=" + id + ", name='" + name + "'}";75 }76 }77 78 static class Order {79 private Long id;80 private String name;81 82 public Order(Long id, String name) {83 this.id = id;84 this.name = name;85 }86 87 @Override88 public String toString() {89 return "Order{id=" + id + ", name='" + name + "'}";90 }91 }92 93 static class Product {94 private Long id;95 private String name;96 97 public Product(Long id, String name) {98 this.id = id;99 this.name = name;100 }101 102 @Override103 public String toString() {104 return "Product{id=" + id + ", name='" + name + "'}";105 }106 }107 108 static class UserOrderInfo {109 private User user;110 private Order order;111 private Product product;112 113 public UserOrderInfo(User user, Order order) {114 this.user = user;115 this.order = order;116 }117 118 public void setProduct(Product product) {119 this.product = product;120 }121 122 @Override123 public String toString() {124 return "UserOrderInfo{user=" + user + ", order=" + order + ", product=" + product + "}";125 }126 }127}3.3 超时控制
超时控制示例
java
1import java.util.concurrent.TimeUnit;2import java.util.concurrent.TimeoutException;34public class CompletableFutureTimeout {5 6 /**7 * 超时控制8 */9 public static void main(String[] args) {10 System.out.println("=== 超时控制 ===");11 12 // 创建可能超时的任务13 CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {14 try {15 Thread.sleep(3000); // 3秒16 } catch (InterruptedException e) {17 Thread.currentThread().interrupt();18 }19 return "慢任务完成";20 });21 22 // 添加超时控制23 CompletableFuture<String> timeoutTask = slowTask24 .orTimeout(2, TimeUnit.SECONDS) // 2秒超时25 .exceptionally(throwable -> {26 if (throwable instanceof TimeoutException) {27 return "任务超时";28 }29 return "其他异常: " + throwable.getMessage();30 });31 32 try {33 String result = timeoutTask.get();34 System.out.println("结果: " + result);35 } catch (Exception e) {36 e.printStackTrace();37 }38 39 // 使用completeOnTimeout设置超时时的默认值40 CompletableFuture<String> defaultTimeoutTask = CompletableFuture.supplyAsync(() -> {41 try {42 Thread.sleep(3000);43 } catch (InterruptedException e) {44 Thread.currentThread().interrupt();45 }46 return "慢任务完成";47 }).completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);48 49 try {50 String result = defaultTimeoutTask.get();51 System.out.println("超时默认值结果: " + result);52 } catch (Exception e) {53 e.printStackTrace();54 }55 }56}4. 性能优化
4.1 使用自定义线程池
自定义线程池示例
java
1import java.util.concurrent.ExecutorService;2import java.util.concurrent.Executors;3import java.util.concurrent.ThreadPoolExecutor;45public class CompletableFutureOptimization {6 7 /**8 * 使用自定义线程池9 */10 public static void main(String[] args) {11 System.out.println("=== 自定义线程池优化 ===");12 13 // 创建专用线程池14 ExecutorService executor = Executors.newFixedThreadPool(10);15 16 // 使用自定义线程池17 CompletableFuture<String> future = CompletableFuture18 .supplyAsync(() -> "任务1", executor)19 .thenApplyAsync(s -> s + " 处理", executor)20 .thenApplyAsync(s -> s + " 完成", executor);21 22 try {23 System.out.println("结果: " + future.get());24 } catch (Exception e) {25 e.printStackTrace();26 }27 28 executor.shutdown();29 }30 31 /**32 * 线程池配置建议33 */34 public static class ThreadPoolConfig {35 36 // CPU密集型任务37 public static ExecutorService createCpuIntensivePool() {38 int processors = Runtime.getRuntime().availableProcessors();39 return Executors.newFixedThreadPool(processors + 1);40 }41 42 // I/O密集型任务43 public static ExecutorService createIoIntensivePool() {44 int processors = Runtime.getRuntime().availableProcessors();45 return Executors.newFixedThreadPool(processors * 2);46 }47 48 // 混合型任务49 public static ExecutorService createMixedPool() {50 int processors = Runtime.getRuntime().availableProcessors();51 return Executors.newFixedThreadPool(processors * 3);52 }53 }54}4.2 避免阻塞操作
避免阻塞操作示例
java
1public class NonBlockingOperations {2 3 /**4 * 避免阻塞操作5 */6 public static void main(String[] args) {7 System.out.println("=== 避免阻塞操作 ===");8 9 // 避免在异步任务中使用阻塞操作10 CompletableFuture<String> future = CompletableFuture11 .supplyAsync(() -> "数据")12 .thenApplyAsync(data -> processData(data)) // 异步处理13 .thenAcceptAsync(result -> System.out.println("处理结果: " + result)); // 异步消费14 15 // 等待完成16 future.join();17 }18 19 private static String processData(String data) {20 // 模拟数据处理21 return data + " 已处理";22 }23 24 /**25 * 正确的异步处理方式26 */27 public static class CorrectAsyncProcessing {28 29 public static CompletableFuture<String> processAsync(String input) {30 return CompletableFuture.supplyAsync(() -> {31 // 异步处理逻辑32 return input + " 异步处理";33 });34 }35 36 public static CompletableFuture<String> processWithTimeout(String input, long timeout) {37 return CompletableFuture.supplyAsync(() -> {38 // 异步处理逻辑39 return input + " 异步处理";40 }).orTimeout(timeout, TimeUnit.SECONDS);41 }42 }43}5. 最佳实践
5.1 异常处理最佳实践
异常处理最佳实践示例
java
1public class ExceptionHandlingBestPractices {2 3 /**4 * 异常处理最佳实践5 */6 public static void main(String[] args) {7 System.out.println("=== 异常处理最佳实践 ===");8 9 // 1. 使用exceptionally处理特定异常10 CompletableFuture<String> future1 = CompletableFuture11 .supplyAsync(() -> {12 if (Math.random() > 0.5) {13 throw new RuntimeException("业务异常");14 }15 return "成功";16 })17 .exceptionally(throwable -> {18 if (throwable instanceof RuntimeException) {19 return "处理业务异常: " + throwable.getMessage();20 }21 return "处理其他异常: " + throwable.getMessage();22 });23 24 // 2. 使用handle处理所有情况25 CompletableFuture<String> future2 = CompletableFuture26 .supplyAsync(() -> {27 if (Math.random() > 0.5) {28 throw new RuntimeException("业务异常");29 }30 return "成功";31 })32 .handle((result, throwable) -> {33 if (throwable != null) {34 return "异常处理: " + throwable.getMessage();35 }36 return "成功处理: " + result;37 });38 39 // 3. 使用whenComplete记录日志40 CompletableFuture<String> future3 = CompletableFuture41 .supplyAsync(() -> {42 if (Math.random() > 0.5) {43 throw new RuntimeException("业务异常");44 }45 return "成功";46 })47 .whenComplete((result, throwable) -> {48 if (throwable != null) {49 System.out.println("任务失败,记录日志: " + throwable.getMessage());50 } else {51 System.out.println("任务成功,记录日志: " + result);52 }53 });54 55 try {56 System.out.println("Future1: " + future1.get());57 System.out.println("Future2: " + future2.get());58 future3.get();59 } catch (Exception e) {60 e.printStackTrace();61 }62 }63}5.2 资源管理最佳实践
资源管理最佳实践示例
java
1public class ResourceManagementBestPractices {2 3 /**4 * 资源管理最佳实践5 */6 public static void main(String[] args) {7 System.out.println("=== 资源管理最佳实践 ===");8 9 ExecutorService executor = Executors.newFixedThreadPool(5);10 11 try {12 // 使用try-with-resources管理资源13 CompletableFuture<String> future = CompletableFuture14 .supplyAsync(() -> "任务", executor)15 .thenApplyAsync(result -> result + " 处理", executor);16 17 String result = future.get();18 System.out.println("结果: " + result);19 20 } catch (Exception e) {21 e.printStackTrace();22 } finally {23 // 确保线程池关闭24 executor.shutdown();25 try {26 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {27 executor.shutdownNow();28 }29 } catch (InterruptedException e) {30 executor.shutdownNow();31 Thread.currentThread().interrupt();32 }33 }34 }35 36 /**37 * 自动资源管理38 */39 public static class AutoResourceManager {40 41 public static <T> CompletableFuture<T> withExecutor(42 Function<ExecutorService, CompletableFuture<T>> task,43 int poolSize) {44 45 ExecutorService executor = Executors.newFixedThreadPool(poolSize);46 47 return task.apply(executor)48 .whenComplete((result, throwable) -> {49 executor.shutdown();50 try {51 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {52 executor.shutdownNow();53 }54 } catch (InterruptedException e) {55 executor.shutdownNow();56 Thread.currentThread().interrupt();57 }58 });59 }60 }61}6. 面试题
6.1 基础概念
Q: CompletableFuture和Future有什么区别?
A:
- Future:只能通过get()方法获取结果,会阻塞
- CompletableFuture:支持链式调用和异步处理
- CompletableFuture:提供了丰富的API来处理异步操作
- CompletableFuture:支持异常处理和超时控制
Q: thenApply和thenApplyAsync有什么区别?
A:
- thenApply:在调用线程中执行
- thenApplyAsync:在ForkJoinPool.commonPool()中执行
- thenApplyAsync:可以指定自定义线程池
- thenApplyAsync:更适合CPU密集型任务
6.2 异常处理
Q: 如何处理CompletableFuture中的异常?
A:
- 使用exceptionally()处理异常并返回默认值
- 使用handle()同时处理成功和异常情况
- 使用whenComplete()无论成功失败都执行
- 使用completeExceptionally()手动完成异常
Q: CompletableFuture.allOf和anyOf有什么区别?
A:
- allOf:等待所有Future完成
- anyOf:等待任意一个Future完成
- allOf:返回Void类型
- anyOf:返回Object类型(第一个完成的Future的结果)
6.3 性能优化
Q: 如何实现CompletableFuture的超时控制?
A:
- 使用orTimeout()方法设置超时时间
- 使用completeOnTimeout()设置超时时的默认值
- 使用get()方法的超时版本
- 结合Timer或ScheduledExecutorService实现自定义超时
Q: 如何选择合适的线程池?
A:
- CPU密集型:线程数 = CPU核心数 + 1
- I/O密集型:线程数 = CPU核心数 × 2
- 混合型:线程数 = CPU核心数 × 3
- 根据实际业务场景调整
6.4 实际应用
Q: CompletableFuture在微服务架构中的应用?
A:
- 并行调用多个微服务
- 异步处理用户请求
- 实现熔断和降级
- 提高系统响应性
Q: 如何避免CompletableFuture中的常见陷阱?
A:
- 及时处理异常
- 正确管理线程池资源
- 避免在异步任务中使用阻塞操作
- 合理设置超时时间
7. 总结
CompletableFuture为Java异步编程提供了强大而灵活的支持。
7.1 关键要点
- 异步编程优势:提高响应性、充分利用资源
- API丰富性:支持链式调用、组合、异常处理
- 性能优化:使用自定义线程池、避免阻塞操作
- 最佳实践:正确异常处理、资源管理
7.2 使用建议
| 场景 | 推荐方式 | 原因 |
|---|---|---|
| 简单异步任务 | supplyAsync/runAsync | 简单易用 |
| 复杂异步流程 | 链式调用 | 代码清晰 |
| 多个任务组合 | allOf/anyOf | 灵活控制 |
| 异常处理 | exceptionally/handle | 优雅处理 |
| 超时控制 | orTimeout | 防止阻塞 |
7.3 学习建议
- 理解原理:深入理解异步编程的工作原理
- 实践验证:通过编写代码验证不同API的效果
- 性能测试:对比不同实现方式的性能差异
- 场景应用:在实际项目中应用异步编程
通过深入理解和熟练运用CompletableFuture,我们能够构建出更加高效、健壮和可维护的Java异步应用程序。
评论