Skip to main content

Java 异步编程与 CompletableFuture 详解

CompletableFuture是Java 8引入的异步编程工具,它实现了Future接口,并提供了丰富的API来支持异步编程、函数式编程和响应式编程。本文将详细介绍CompletableFuture的使用方法和最佳实践。

核心价值

CompletableFuture = 异步编程 + 函数式接口 + 任务编排 + 异常处理

  • 🚀 高性能:无需阻塞等待结果,提高程序吞吐量
  • 🧩 组合能力:支持复杂的异步任务编排和组合
  • 响应性:提高应用程序的响应性和用户体验
  • 🛡️ 健壮性:内置异常处理和超时控制机制
  • 🔄 流式API:支持函数式编程和链式调用

1. 异步编程概述

1.1 为什么需要异步编程?

核心概念

异步编程是一种编程范式,允许程序在等待某个操作完成时继续执行其他任务,从而提高程序的响应性和吞吐量。

1.2 同步 vs 异步

同步与异步编程对比

特性同步编程异步编程
执行方式顺序执行,阻塞等待并发执行,非阻塞
响应性低,容易阻塞高,保持响应
资源利用效率低,资源浪费效率高,资源充分利用
编程复杂度简单直观相对复杂
调试难度容易调试调试困难

1.3 异步编程的优势

java
1/**
2 * 同步执行示例
3 */
4public static class SynchronousExample {
5 public static void main(String[] args) {
6 long start = System.currentTimeMillis();
7
8 // 同步执行,每个任务需要1秒
9 String result1 = doTask1(); // 阻塞1秒
10 String result2 = doTask2(); // 阻塞1秒
11 String result3 = doTask3(); // 阻塞1秒
12
13 // 总耗时3秒
14 System.out.println("结果: " + result1 + ", " + result2 + ", " + result3);
15 System.out.println("总耗时: " + (System.currentTimeMillis() - start) + "ms");
16 }
17
18 private static String doTask1() {
19 try {
20 Thread.sleep(1000); // 模拟耗时操作
21 } catch (InterruptedException e) {
22 Thread.currentThread().interrupt();
23 }
24 return "Task1完成";
25 }
26
27 private static String doTask2() {
28 try {
29 Thread.sleep(1000); // 模拟耗时操作
30 } catch (InterruptedException e) {
31 Thread.currentThread().interrupt();
32 }
33 return "Task2完成";
34 }
35
36 private static String doTask3() {
37 try {
38 Thread.sleep(1000); // 模拟耗时操作
39 } catch (InterruptedException e) {
40 Thread.currentThread().interrupt();
41 }
42 return "Task3完成";
43 }
44}

同步代码特点:

  • 顺序执行,前一个任务完成后才会执行下一个任务
  • 线程会阻塞等待每个任务完成
  • 总耗时是所有任务耗时的总和
  • 任务之间有明确的执行顺序
  • 代码简单直观,容易理解和调试

1.4 Java中的异步编程模型

Java异步编程模型演进

版本特性优缺点
Java 1.0Thread类直接操作线程,灵活但复杂
Java 1.5Executor框架线程池管理,简化线程创建
Java 1.5Future接口异步结果封装,但功能有限
Java 5ExecutorService提供更完整的线程池管理
Java 5Callable和Future支持有返回值的任务
Java 7ForkJoinPool支持工作窃取算法,适合递归任务
Java 8CompletableFuture支持函数式、链式异步编程
Java 9+增强CompletableFuture添加超时控制等新功能

从Future到CompletableFuture的提升:

  • Future只能通过阻塞的get()获取结果
  • Future无法链式组合多个异步操作
  • Future缺乏异常处理机制
  • CompletableFuture解决了上述所有问题

2. CompletableFuture API 详解

2.1 创建 CompletableFuture

java
1import java.util.concurrent.CompletableFuture;
2import java.util.concurrent.ExecutorService;
3import java.util.concurrent.Executors;
4
5public class CompletableFutureCreation {
6 public static void main(String[] args) throws Exception {
7 // 1. 使用supplyAsync创建有返回值的异步任务
8 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
9 // 异步执行的代码
10 try {
11 Thread.sleep(1000);
12 } catch (InterruptedException e) {
13 Thread.currentThread().interrupt();
14 }
15 return "异步任务结果";
16 });
17
18 // 2. 使用runAsync创建无返回值的异步任务
19 CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
20 // 异步执行的代码,无返回值
21 try {
22 Thread.sleep(1000);
23 } catch (InterruptedException e) {
24 Thread.currentThread().interrupt();
25 }
26 System.out.println("异步任务执行完成");
27 });
28
29 // 3. 使用自定义线程池
30 ExecutorService executor = Executors.newFixedThreadPool(3);
31 CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
32 return "使用自定义线程池";
33 }, executor);
34
35 // 4. 手动完成Future
36 CompletableFuture<String> future4 = new CompletableFuture<>();
37 new Thread(() -> {
38 try {
39 Thread.sleep(1000);
40 future4.complete("手动完成的结果"); // 正常完成
41 // 或者异常完成
42 // future4.completeExceptionally(new RuntimeException("出错了"));
43 } catch (InterruptedException e) {
44 Thread.currentThread().interrupt();
45 future4.completeExceptionally(e); // 异常完成
46 }
47 }).start();
48
49 // 5. 使用completedFuture创建已完成的Future
50 CompletableFuture<String> future5 = CompletableFuture.completedFuture("立即完成的结果");
51
52 // 获取结果并打印
53 System.out.println("Future1结果: " + future1.get());
54 future2.get(); // 等待完成
55 System.out.println("Future3结果: " + future3.get());
56 System.out.println("Future4结果: " + future4.get());
57 System.out.println("Future5结果: " + future5.get());
58
59 // 关闭线程池
60 executor.shutdown();
61 }
62}

2.2 链式调用

CompletableFuture常用链式方法

数据转换方法:

  • thenApply(Function) - 转换结果并返回新值
  • thenApplyAsync(Function) - 异步线程中转换

结果消费方法:

  • thenAccept(Consumer) - 消费结果但不返回新值
  • thenRun(Runnable) - 完成后执行操作,不使用结果

组合方法:

  • thenCompose(Function) - 链接两个CompletableFuture
  • thenCombine(CompletableFuture, BiFunction) - 组合两个Future的结果

执行线程选择:

  • 不带Async后缀的方法 - 使用上一阶段的线程执行
  • 带Async后缀的方法 - 使用ForkJoinPool或指定线程池
java
1import java.util.concurrent.CompletableFuture;
2
3/**
4 * CompletableFuture链式调用示例
5 */
6public static void main(String[] args) throws Exception {
7 // 创建并链式调用CompletableFuture
8 CompletableFuture<String> future = CompletableFuture
9 .supplyAsync(() -> "Hello") // 返回Hello
10 .thenApply(s -> s + " World") // 转换结果为"Hello World"
11 .thenApply(s -> s + "!") // 转换结果为"Hello World!"
12 .thenApply(String::toUpperCase); // 转换为大写
13
14 // 获取最终结果
15 String result = future.get();
16 System.out.println("链式调用结果: " + result); // 输出: HELLO WORLD!
17
18 // 演示thenAccept和thenRun
19 CompletableFuture.supplyAsync(() -> "CompletableFuture")
20 .thenApply(s -> s + " is awesome")
21 .thenAccept(s -> System.out.println("消费结果: " + s)) // 消费结果
22 .thenRun(() -> System.out.println("处理完成")); // 最后执行
23}

2.3 组合多个 Future

CompletableFuture组合示例
java
1public class CompletableFutureCombination {
2
3 /**
4 * 组合多个Future
5 */
6 public static void main(String[] args) {
7 System.out.println("=== CompletableFuture组合 ===");
8
9 // 1. thenCombine - 组合两个Future的结果
10 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
11 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
12
13 CompletableFuture<String> combined = future1.thenCombine(future2, (result1, result2) ->
14 result1 + " " + result2);
15
16 try {
17 System.out.println("组合结果: " + combined.get());
18 } catch (Exception e) {
19 e.printStackTrace();
20 }
21
22 // 2. thenCompose - 链式组合
23 CompletableFuture<String> composed = future1.thenCompose(result ->
24 CompletableFuture.supplyAsync(() -> result + " 被处理"));
25
26 try {
27 System.out.println("链式组合结果: " + composed.get());
28 } catch (Exception e) {
29 e.printStackTrace();
30 }
31
32 // 3. allOf - 等待所有Future完成
33 CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "任务1");
34 CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "任务2");
35 CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> "任务3");
36
37 CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
38
39 allTasks.thenRun(() -> {
40 try {
41 System.out.println("所有任务完成: " + task1.get() + ", " + task2.get() + ", " + task3.get());
42 } catch (Exception e) {
43 e.printStackTrace();
44 }
45 });
46
47 allTasks.join();
48
49 // 4. anyOf - 等待任意一个Future完成
50 CompletableFuture<String> fastTask = CompletableFuture.supplyAsync(() -> {
51 try {
52 Thread.sleep(500);
53 } catch (InterruptedException e) {
54 Thread.currentThread().interrupt();
55 }
56 return "快速任务";
57 });
58
59 CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
60 try {
61 Thread.sleep(2000);
62 } catch (InterruptedException e) {
63 Thread.currentThread().interrupt();
64 }
65 return "慢速任务";
66 });
67
68 CompletableFuture<Object> anyTask = CompletableFuture.anyOf(fastTask, slowTask);
69 try {
70 System.out.println("第一个完成的任务: " + anyTask.get());
71 } catch (Exception e) {
72 e.printStackTrace();
73 }
74 }
75}

2.4 异常处理

CompletableFuture异常处理示例
java
1public class CompletableFutureExceptionHandling {
2
3 /**
4 * 异常处理示例
5 */
6 public static void main(String[] args) {
7 System.out.println("=== CompletableFuture异常处理 ===");
8
9 // 1. exceptionally - 处理异常并返回默认值
10 CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
11 if (Math.random() > 0.5) {
12 throw new RuntimeException("随机异常");
13 }
14 return "成功结果";
15 }).exceptionally(throwable -> {
16 System.out.println("捕获异常: " + throwable.getMessage());
17 return "默认值";
18 });
19
20 try {
21 System.out.println("Future1结果: " + future1.get());
22 } catch (Exception e) {
23 e.printStackTrace();
24 }
25
26 // 2. handle - 处理成功和异常情况
27 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
28 if (Math.random() > 0.5) {
29 throw new RuntimeException("随机异常");
30 }
31 return "成功结果";
32 }).handle((result, throwable) -> {
33 if (throwable != null) {
34 System.out.println("处理异常: " + throwable.getMessage());
35 return "异常时的默认值";
36 } else {
37 return result + " 处理成功";
38 }
39 });
40
41 try {
42 System.out.println("Future2结果: " + future2.get());
43 } catch (Exception e) {
44 e.printStackTrace();
45 }
46
47 // 3. whenComplete - 无论成功失败都执行
48 CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
49 if (Math.random() > 0.5) {
50 throw new RuntimeException("随机异常");
51 }
52 return "成功结果";
53 }).whenComplete((result, throwable) -> {
54 if (throwable != null) {
55 System.out.println("任务失败: " + throwable.getMessage());
56 } else {
57 System.out.println("任务成功: " + result);
58 }
59 });
60
61 try {
62 future3.get();
63 } catch (Exception e) {
64 System.out.println("Future3抛出异常: " + e.getMessage());
65 }
66 }
67}

3. 实际应用场景

3.1 并行数据处理

并行数据处理示例
java
1import java.util.Arrays;
2import java.util.List;
3import java.util.stream.Collectors;
4
5public class ParallelDataProcessing {
6
7 /**
8 * 并行数据处理
9 */
10 public static void main(String[] args) {
11 System.out.println("=== 并行数据处理 ===");
12
13 List<String> data = Arrays.asList("数据1", "数据2", "数据3", "数据4", "数据5");
14
15 // 并行处理数据
16 List<CompletableFuture<String>> futures = data.stream()
17 .map(item -> CompletableFuture.supplyAsync(() -> processData(item)))
18 .collect(Collectors.toList());
19
20 // 等待所有处理完成
21 CompletableFuture<Void> allFutures = CompletableFuture.allOf(
22 futures.toArray(new CompletableFuture[0]));
23
24 allFutures.thenRun(() -> {
25 List<String> results = futures.stream()
26 .map(CompletableFuture::join)
27 .collect(Collectors.toList());
28
29 System.out.println("处理结果: " + results);
30 });
31
32 allFutures.join();
33 }
34
35 private static String processData(String data) {
36 try {
37 Thread.sleep(1000); // 模拟处理时间
38 } catch (InterruptedException e) {
39 Thread.currentThread().interrupt();
40 }
41 return data + " 已处理";
42 }
43}

3.2 异步API调用

异步API调用示例
java
1public class AsyncApiCall {
2
3 /**
4 * 异步API调用
5 */
6 public static void main(String[] args) {
7 System.out.println("=== 异步API调用 ===");
8
9 // 模拟异步API调用
10 CompletableFuture<User> userFuture = getUserAsync(1L);
11 CompletableFuture<Order> orderFuture = getOrderAsync(1L);
12 CompletableFuture<Product> productFuture = getProductAsync(1L);
13
14 // 组合多个API调用结果
15 CompletableFuture<UserOrderInfo> combinedFuture = userFuture
16 .thenCombine(orderFuture, (user, order) -> new UserOrderInfo(user, order))
17 .thenCombine(productFuture, (userOrder, product) -> {
18 userOrder.setProduct(product);
19 return userOrder;
20 });
21
22 try {
23 UserOrderInfo result = combinedFuture.get();
24 System.out.println("组合结果: " + result);
25 } catch (Exception e) {
26 e.printStackTrace();
27 }
28 }
29
30 private static CompletableFuture<User> getUserAsync(Long userId) {
31 return CompletableFuture.supplyAsync(() -> {
32 try {
33 Thread.sleep(1000);
34 } catch (InterruptedException e) {
35 Thread.currentThread().interrupt();
36 }
37 return new User(userId, "用户" + userId);
38 });
39 }
40
41 private static CompletableFuture<Order> getOrderAsync(Long orderId) {
42 return CompletableFuture.supplyAsync(() -> {
43 try {
44 Thread.sleep(1000);
45 } catch (InterruptedException e) {
46 Thread.currentThread().interrupt();
47 }
48 return new Order(orderId, "订单" + orderId);
49 });
50 }
51
52 private static CompletableFuture<Product> getProductAsync(Long productId) {
53 return CompletableFuture.supplyAsync(() -> {
54 try {
55 Thread.sleep(1000);
56 } catch (InterruptedException e) {
57 Thread.currentThread().interrupt();
58 }
59 return new Product(productId, "产品" + productId);
60 });
61 }
62
63 static class User {
64 private Long id;
65 private String name;
66
67 public User(Long id, String name) {
68 this.id = id;
69 this.name = name;
70 }
71
72 @Override
73 public String toString() {
74 return "User{id=" + id + ", name='" + name + "'}";
75 }
76 }
77
78 static class Order {
79 private Long id;
80 private String name;
81
82 public Order(Long id, String name) {
83 this.id = id;
84 this.name = name;
85 }
86
87 @Override
88 public String toString() {
89 return "Order{id=" + id + ", name='" + name + "'}";
90 }
91 }
92
93 static class Product {
94 private Long id;
95 private String name;
96
97 public Product(Long id, String name) {
98 this.id = id;
99 this.name = name;
100 }
101
102 @Override
103 public String toString() {
104 return "Product{id=" + id + ", name='" + name + "'}";
105 }
106 }
107
108 static class UserOrderInfo {
109 private User user;
110 private Order order;
111 private Product product;
112
113 public UserOrderInfo(User user, Order order) {
114 this.user = user;
115 this.order = order;
116 }
117
118 public void setProduct(Product product) {
119 this.product = product;
120 }
121
122 @Override
123 public String toString() {
124 return "UserOrderInfo{user=" + user + ", order=" + order + ", product=" + product + "}";
125 }
126 }
127}

3.3 超时控制

超时控制示例
java
1import java.util.concurrent.TimeUnit;
2import java.util.concurrent.TimeoutException;
3
4public class CompletableFutureTimeout {
5
6 /**
7 * 超时控制
8 */
9 public static void main(String[] args) {
10 System.out.println("=== 超时控制 ===");
11
12 // 创建可能超时的任务
13 CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
14 try {
15 Thread.sleep(3000); // 3秒
16 } catch (InterruptedException e) {
17 Thread.currentThread().interrupt();
18 }
19 return "慢任务完成";
20 });
21
22 // 添加超时控制
23 CompletableFuture<String> timeoutTask = slowTask
24 .orTimeout(2, TimeUnit.SECONDS) // 2秒超时
25 .exceptionally(throwable -> {
26 if (throwable instanceof TimeoutException) {
27 return "任务超时";
28 }
29 return "其他异常: " + throwable.getMessage();
30 });
31
32 try {
33 String result = timeoutTask.get();
34 System.out.println("结果: " + result);
35 } catch (Exception e) {
36 e.printStackTrace();
37 }
38
39 // 使用completeOnTimeout设置超时时的默认值
40 CompletableFuture<String> defaultTimeoutTask = CompletableFuture.supplyAsync(() -> {
41 try {
42 Thread.sleep(3000);
43 } catch (InterruptedException e) {
44 Thread.currentThread().interrupt();
45 }
46 return "慢任务完成";
47 }).completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);
48
49 try {
50 String result = defaultTimeoutTask.get();
51 System.out.println("超时默认值结果: " + result);
52 } catch (Exception e) {
53 e.printStackTrace();
54 }
55 }
56}

4. 性能优化

4.1 使用自定义线程池

自定义线程池示例
java
1import java.util.concurrent.ExecutorService;
2import java.util.concurrent.Executors;
3import java.util.concurrent.ThreadPoolExecutor;
4
5public class CompletableFutureOptimization {
6
7 /**
8 * 使用自定义线程池
9 */
10 public static void main(String[] args) {
11 System.out.println("=== 自定义线程池优化 ===");
12
13 // 创建专用线程池
14 ExecutorService executor = Executors.newFixedThreadPool(10);
15
16 // 使用自定义线程池
17 CompletableFuture<String> future = CompletableFuture
18 .supplyAsync(() -> "任务1", executor)
19 .thenApplyAsync(s -> s + " 处理", executor)
20 .thenApplyAsync(s -> s + " 完成", executor);
21
22 try {
23 System.out.println("结果: " + future.get());
24 } catch (Exception e) {
25 e.printStackTrace();
26 }
27
28 executor.shutdown();
29 }
30
31 /**
32 * 线程池配置建议
33 */
34 public static class ThreadPoolConfig {
35
36 // CPU密集型任务
37 public static ExecutorService createCpuIntensivePool() {
38 int processors = Runtime.getRuntime().availableProcessors();
39 return Executors.newFixedThreadPool(processors + 1);
40 }
41
42 // I/O密集型任务
43 public static ExecutorService createIoIntensivePool() {
44 int processors = Runtime.getRuntime().availableProcessors();
45 return Executors.newFixedThreadPool(processors * 2);
46 }
47
48 // 混合型任务
49 public static ExecutorService createMixedPool() {
50 int processors = Runtime.getRuntime().availableProcessors();
51 return Executors.newFixedThreadPool(processors * 3);
52 }
53 }
54}

4.2 避免阻塞操作

避免阻塞操作示例
java
1public class NonBlockingOperations {
2
3 /**
4 * 避免阻塞操作
5 */
6 public static void main(String[] args) {
7 System.out.println("=== 避免阻塞操作 ===");
8
9 // 避免在异步任务中使用阻塞操作
10 CompletableFuture<String> future = CompletableFuture
11 .supplyAsync(() -> "数据")
12 .thenApplyAsync(data -> processData(data)) // 异步处理
13 .thenAcceptAsync(result -> System.out.println("处理结果: " + result)); // 异步消费
14
15 // 等待完成
16 future.join();
17 }
18
19 private static String processData(String data) {
20 // 模拟数据处理
21 return data + " 已处理";
22 }
23
24 /**
25 * 正确的异步处理方式
26 */
27 public static class CorrectAsyncProcessing {
28
29 public static CompletableFuture<String> processAsync(String input) {
30 return CompletableFuture.supplyAsync(() -> {
31 // 异步处理逻辑
32 return input + " 异步处理";
33 });
34 }
35
36 public static CompletableFuture<String> processWithTimeout(String input, long timeout) {
37 return CompletableFuture.supplyAsync(() -> {
38 // 异步处理逻辑
39 return input + " 异步处理";
40 }).orTimeout(timeout, TimeUnit.SECONDS);
41 }
42 }
43}

5. 最佳实践

5.1 异常处理最佳实践

异常处理最佳实践示例
java
1public class ExceptionHandlingBestPractices {
2
3 /**
4 * 异常处理最佳实践
5 */
6 public static void main(String[] args) {
7 System.out.println("=== 异常处理最佳实践 ===");
8
9 // 1. 使用exceptionally处理特定异常
10 CompletableFuture<String> future1 = CompletableFuture
11 .supplyAsync(() -> {
12 if (Math.random() > 0.5) {
13 throw new RuntimeException("业务异常");
14 }
15 return "成功";
16 })
17 .exceptionally(throwable -> {
18 if (throwable instanceof RuntimeException) {
19 return "处理业务异常: " + throwable.getMessage();
20 }
21 return "处理其他异常: " + throwable.getMessage();
22 });
23
24 // 2. 使用handle处理所有情况
25 CompletableFuture<String> future2 = CompletableFuture
26 .supplyAsync(() -> {
27 if (Math.random() > 0.5) {
28 throw new RuntimeException("业务异常");
29 }
30 return "成功";
31 })
32 .handle((result, throwable) -> {
33 if (throwable != null) {
34 return "异常处理: " + throwable.getMessage();
35 }
36 return "成功处理: " + result;
37 });
38
39 // 3. 使用whenComplete记录日志
40 CompletableFuture<String> future3 = CompletableFuture
41 .supplyAsync(() -> {
42 if (Math.random() > 0.5) {
43 throw new RuntimeException("业务异常");
44 }
45 return "成功";
46 })
47 .whenComplete((result, throwable) -> {
48 if (throwable != null) {
49 System.out.println("任务失败,记录日志: " + throwable.getMessage());
50 } else {
51 System.out.println("任务成功,记录日志: " + result);
52 }
53 });
54
55 try {
56 System.out.println("Future1: " + future1.get());
57 System.out.println("Future2: " + future2.get());
58 future3.get();
59 } catch (Exception e) {
60 e.printStackTrace();
61 }
62 }
63}

5.2 资源管理最佳实践

资源管理最佳实践示例
java
1public class ResourceManagementBestPractices {
2
3 /**
4 * 资源管理最佳实践
5 */
6 public static void main(String[] args) {
7 System.out.println("=== 资源管理最佳实践 ===");
8
9 ExecutorService executor = Executors.newFixedThreadPool(5);
10
11 try {
12 // 使用try-with-resources管理资源
13 CompletableFuture<String> future = CompletableFuture
14 .supplyAsync(() -> "任务", executor)
15 .thenApplyAsync(result -> result + " 处理", executor);
16
17 String result = future.get();
18 System.out.println("结果: " + result);
19
20 } catch (Exception e) {
21 e.printStackTrace();
22 } finally {
23 // 确保线程池关闭
24 executor.shutdown();
25 try {
26 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
27 executor.shutdownNow();
28 }
29 } catch (InterruptedException e) {
30 executor.shutdownNow();
31 Thread.currentThread().interrupt();
32 }
33 }
34 }
35
36 /**
37 * 自动资源管理
38 */
39 public static class AutoResourceManager {
40
41 public static <T> CompletableFuture<T> withExecutor(
42 Function<ExecutorService, CompletableFuture<T>> task,
43 int poolSize) {
44
45 ExecutorService executor = Executors.newFixedThreadPool(poolSize);
46
47 return task.apply(executor)
48 .whenComplete((result, throwable) -> {
49 executor.shutdown();
50 try {
51 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
52 executor.shutdownNow();
53 }
54 } catch (InterruptedException e) {
55 executor.shutdownNow();
56 Thread.currentThread().interrupt();
57 }
58 });
59 }
60 }
61}

6. 面试题

6.1 基础概念

Q: CompletableFuture和Future有什么区别?

A:

  • Future:只能通过get()方法获取结果,会阻塞
  • CompletableFuture:支持链式调用和异步处理
  • CompletableFuture:提供了丰富的API来处理异步操作
  • CompletableFuture:支持异常处理和超时控制

Q: thenApply和thenApplyAsync有什么区别?

A:

  • thenApply:在调用线程中执行
  • thenApplyAsync:在ForkJoinPool.commonPool()中执行
  • thenApplyAsync:可以指定自定义线程池
  • thenApplyAsync:更适合CPU密集型任务

6.2 异常处理

Q: 如何处理CompletableFuture中的异常?

A:

  • 使用exceptionally()处理异常并返回默认值
  • 使用handle()同时处理成功和异常情况
  • 使用whenComplete()无论成功失败都执行
  • 使用completeExceptionally()手动完成异常

Q: CompletableFuture.allOf和anyOf有什么区别?

A:

  • allOf:等待所有Future完成
  • anyOf:等待任意一个Future完成
  • allOf:返回Void类型
  • anyOf:返回Object类型(第一个完成的Future的结果)

6.3 性能优化

Q: 如何实现CompletableFuture的超时控制?

A:

  • 使用orTimeout()方法设置超时时间
  • 使用completeOnTimeout()设置超时时的默认值
  • 使用get()方法的超时版本
  • 结合Timer或ScheduledExecutorService实现自定义超时

Q: 如何选择合适的线程池?

A:

  • CPU密集型:线程数 = CPU核心数 + 1
  • I/O密集型:线程数 = CPU核心数 × 2
  • 混合型:线程数 = CPU核心数 × 3
  • 根据实际业务场景调整

6.4 实际应用

Q: CompletableFuture在微服务架构中的应用?

A:

  • 并行调用多个微服务
  • 异步处理用户请求
  • 实现熔断和降级
  • 提高系统响应性

Q: 如何避免CompletableFuture中的常见陷阱?

A:

  • 及时处理异常
  • 正确管理线程池资源
  • 避免在异步任务中使用阻塞操作
  • 合理设置超时时间

7. 总结

CompletableFuture为Java异步编程提供了强大而灵活的支持。

7.1 关键要点

  1. 异步编程优势:提高响应性、充分利用资源
  2. API丰富性:支持链式调用、组合、异常处理
  3. 性能优化:使用自定义线程池、避免阻塞操作
  4. 最佳实践:正确异常处理、资源管理

7.2 使用建议

场景推荐方式原因
简单异步任务supplyAsync/runAsync简单易用
复杂异步流程链式调用代码清晰
多个任务组合allOf/anyOf灵活控制
异常处理exceptionally/handle优雅处理
超时控制orTimeout防止阻塞

7.3 学习建议

  1. 理解原理:深入理解异步编程的工作原理
  2. 实践验证:通过编写代码验证不同API的效果
  3. 性能测试:对比不同实现方式的性能差异
  4. 场景应用:在实际项目中应用异步编程

通过深入理解和熟练运用CompletableFuture,我们能够构建出更加高效、健壮和可维护的Java异步应用程序。

参与讨论