Skip to main content

Java 并发工具类详解

Java并发包(java.util.concurrent)提供了丰富的并发工具类,用于简化多线程编程。这些工具类提供了比传统synchronized更灵活和高效的并发控制机制。本文将详细介绍各种并发工具类的使用方法和最佳实践。

核心价值

并发工具类 = 灵活同步 + 性能优化 + 易用性 + 丰富功能

  • 🔄 灵活控制:提供比synchronized更灵活的同步机制
  • 🚀 高性能:采用更高效的算法和实现,优化竞争处理
  • 🧩 功能丰富:针对不同场景提供专用工具
  • 🛡️ 健壮性:内置超时、中断、异常处理等机制
  • 📊 可伸缩性:适应各种复杂并发场景

1. 并发工具类概述

1.1 什么是并发工具类?

核心概念

并发工具类是Java并发包中提供的一组高级同步工具,它们基于AQS(AbstractQueuedSynchronizer)实现,提供了比传统synchronized更灵活、更高效的并发控制机制。

1.2 并发工具类的优势

并发工具类与传统同步机制对比

优势具体体现业务价值
灵活性支持超时、中断、公平性等特性适应复杂业务场景
高性能基于AQS实现,性能优于synchronized提高系统吞吐量
功能丰富提供多种同步机制简化编程复杂度
可扩展性支持自定义同步器满足特殊需求
可读性语义清晰,代码易理解提高代码质量

1.3 并发工具类分类

同步工具类主要用于协调多个线程的执行顺序和状态:

工具类主要功能特点适用场景
CountDownLatch让一个或多个线程等待一组操作完成一次性使用,计数器为0后无法重置启动信号、资源初始化等待
CyclicBarrier让一组线程在某个点相互等待可重复使用,自动重置计数分阶段计算、并行迭代算法
Semaphore控制同时访问某个资源的线程数量支持公平/非公平模式,可动态调整许可连接池管理、资源限制
Phaser允许多阶段同步,动态调整参与方数量功能最强大、最复杂复杂多阶段并行任务

2. CountDownLatch(倒计时门闩)

2.1 CountDownLatch 基本概念

CountDownLatch核心特性

CountDownLatch是一个同步工具类,允许一个或多个线程等待其他线程完成操作。

关键特性:

  • 初始化时指定计数值
  • 调用countDown()方法使计数值减1
  • 调用await()方法等待计数值变为0
  • 一次性使用:计数值为0后无法重置
  • 支持超时等待:await(long timeout, TimeUnit unit)

主要应用场景:

  • 启动信号:等待所有服务就绪后再开始
  • 任务分解:将大任务分解为多个小任务并行处理
  • 资源初始化:等待多个资源初始化完成
java
1import java.util.concurrent.CountDownLatch;
2import java.util.concurrent.ExecutorService;
3import java.util.concurrent.Executors;
4
5/**
6 * CountDownLatch基本用法
7 */
8public static class BasicUsage {
9 public static void main(String[] args) throws InterruptedException {
10 int threadCount = 3;
11 // 创建一个初始计数为3的CountDownLatch
12 CountDownLatch latch = new CountDownLatch(threadCount);
13 ExecutorService executor = Executors.newFixedThreadPool(threadCount);
14
15 // 启动多个工作线程
16 for (int i = 0; i < threadCount; i++) {
17 final int taskId = i;
18 executor.submit(() -> {
19 try {
20 System.out.println("任务 " + taskId + " 开始执行");
21 Thread.sleep(1000 + taskId * 500); // 模拟工作
22 System.out.println("任务 " + taskId + " 执行完成");
23 } catch (InterruptedException e) {
24 Thread.currentThread().interrupt();
25 } finally {
26 latch.countDown(); // 计数器减1
27 System.out.println("任务 " + taskId + " 计数器减1,剩余: " + latch.getCount());
28 }
29 });
30 }
31
32 System.out.println("主线程等待所有任务完成...");
33 latch.await(); // 等待所有任务完成
34 System.out.println("所有任务已完成!");
35
36 executor.shutdown();
37 }
38}

工作原理:

  1. 创建CountDownLatch并设置初始计数值
  2. 启动多个线程执行任务
  3. 每个任务完成后调用countDown()使计数值减1
  4. 主线程调用await()等待计数值变为0
  5. 当所有任务完成后,主线程继续执行

2.2 CountDownLatch vs CyclicBarrier

CountDownLatch与CyclicBarrier对比

特性CountDownLatchCyclicBarrier
等待方向一个或多个线程等待其他线程一组线程相互等待
重用性一次性使用,不可重置可重复使用,自动重置
计数操作countDown()减少计数调用await()增加计数
回调动作不支持支持屏障动作(barrier action)
超时支持支持超时等待支持超时等待
创建方式指定计数值指定参与线程数和可选的屏障动作
使用场景启动信号、任务分解分阶段计算、并行迭代

2.3 CountDownLatch 实现原理

CountDownLatch内部实现细节

CountDownLatch基于AQS(AbstractQueuedSynchronizer)实现,它使用AQS的共享模式:

  1. 状态管理

    • AQS的state变量用作计数器
    • 初始化时设置为指定的计数值
    • 每次countDown()时将state减1
    • 当state变为0时,所有等待的线程被释放
  2. 关键方法实现

    • countDown():调用AQS的releaseShared()方法,减少计数
    • await():调用AQS的acquireShared()方法,如果计数不为0则阻塞
    • await(timeout, unit):调用AQS的tryAcquireSharedNanos()方法,支持超时
  3. 线程排队机制

    • 使用AQS内部的FIFO队列来管理等待的线程
    • 当计数器变为0时,AQS会释放所有等待的线程
  4. 为什么不可重用

    • CountDownLatch的计数器只能减不能增
    • 一旦计数器达到0,就无法恢复到初始状态
    • 这是设计决策,而非技术限制

3. CyclicBarrier(循环屏障)

3.1 CyclicBarrier 基本概念

CyclicBarrier是一个同步工具类,允许一组线程互相等待,直到所有线程都到达某个公共屏障点。

CyclicBarrier基本用法示例
java
1import java.util.concurrent.CyclicBarrier;
2import java.util.concurrent.ExecutorService;
3import java.util.concurrent.Executors;
4import java.util.concurrent.BrokenBarrierException;
5
6public class CyclicBarrierExamples {
7
8 /**
9 * CyclicBarrier基本用法
10 */
11 public static class BasicUsage {
12 public static void main(String[] args) {
13 int threadCount = 3;
14 CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
15 System.out.println("所有线程都到达屏障点,开始下一轮");
16 });
17
18 ExecutorService executor = Executors.newFixedThreadPool(threadCount);
19
20 System.out.println("=== CyclicBarrier基本用法 ===");
21
22 for (int i = 0; i < threadCount; i++) {
23 final int threadId = i;
24 executor.submit(() -> {
25 try {
26 for (int round = 0; round < 3; round++) {
27 System.out.println("线程 " + threadId + " 执行第 " + round + " 轮");
28 Thread.sleep(1000 + threadId * 200);
29 System.out.println("线程 " + threadId + " 到达屏障点");
30 barrier.await(); // 等待其他线程
31 }
32 } catch (InterruptedException | BrokenBarrierException e) {
33 Thread.currentThread().interrupt();
34 }
35 });
36 }
37
38 executor.shutdown();
39 }
40}
41
42 /**
43 * CyclicBarrier超时处理
44 */
45 public static class TimeoutUsage {
46 public static void main(String[] args) {
47 int threadCount = 3;
48 CyclicBarrier barrier = new CyclicBarrier(threadCount);
49
50 for (int i = 0; i < threadCount; i++) {
51 final int threadId = i;
52 new Thread(() -> {
53 try {
54 for (int round = 0; round < 2; round++) {
55 System.out.println("线程 " + threadId + " 开始第 " + round + " 轮");
56
57 // 模拟不同线程的执行时间
58 Thread.sleep(1000 + threadId * 1000);
59
60 try {
61 // 等待最多3秒
62 barrier.await(3, TimeUnit.SECONDS);
63 System.out.println("线程 " + threadId + " 第 " + round + " 轮完成");
64 } catch (TimeoutException e) {
65 System.out.println("线程 " + threadId + " 等待超时");
66 barrier.reset(); // 重置屏障
67 break;
68 }
69 }
70 } catch (InterruptedException | BrokenBarrierException e) {
71 Thread.currentThread().interrupt();
72 }
73 }).start();
74 }
75 }
76 }
77
78 /**
79 * CyclicBarrier实际应用场景
80 */
81 public static class PracticalApplications {
82
83 /**
84 * 矩阵处理场景
85 */
86 public static class MatrixProcessor {
87 public void processMatrix(int[][] matrix) {
88 int rows = matrix.length;
89 CyclicBarrier barrier = new CyclicBarrier(rows, () -> {
90 System.out.println("所有行处理完成,开始下一阶段");
91 });
92
93 for (int i = 0; i < rows; i++) {
94 final int rowIndex = i;
95 new Thread(() -> {
96 try {
97 // 第一阶段:处理矩阵的每一行
98 processRow(matrix[rowIndex]);
99 barrier.await();
100
101 // 第二阶段:等待所有行处理完成后,进行下一阶段
102 processRowPhase2(matrix[rowIndex]);
103 barrier.await();
104
105 // 第三阶段:最终处理
106 processRowPhase3(matrix[rowIndex]);
107 barrier.await();
108
109 } catch (InterruptedException | BrokenBarrierException e) {
110 Thread.currentThread().interrupt();
111 }
112 }).start();
113 }
114 }
115
116 private void processRow(int[] row) {
117 try {
118 System.out.println("处理行数据: " + Arrays.toString(row));
119 Thread.sleep(500);
120 } catch (InterruptedException e) {
121 Thread.currentThread().interrupt();
122 }
123 }
124
125 private void processRowPhase2(int[] row) {
126 try {
127 System.out.println("第二阶段处理行数据: " + Arrays.toString(row));
128 Thread.sleep(300);
129 } catch (InterruptedException e) {
130 Thread.currentThread().interrupt();
131 }
132 }
133
134 private void processRowPhase3(int[] row) {
135 try {
136 System.out.println("第三阶段处理行数据: " + Arrays.toString(row));
137 Thread.sleep(200);
138 } catch (InterruptedException e) {
139 Thread.currentThread().interrupt();
140 }
141 }
142 }
143
144 /**
145 * 游戏同步场景
146 */
147 public static class GameSynchronizer {
148 public void startGame(int playerCount) {
149 CyclicBarrier barrier = new CyclicBarrier(playerCount, () -> {
150 System.out.println("所有玩家准备就绪,开始新一轮");
151 });
152
153 for (int i = 0; i < playerCount; i++) {
154 final int playerId = i;
155 new Thread(() -> {
156 try {
157 for (int round = 0; round < 3; round++) {
158 System.out.println("玩家 " + playerId + " 准备第 " + round + " 轮");
159 Thread.sleep(1000 + playerId * 200);
160
161 barrier.await();
162
163 System.out.println("玩家 " + playerId + " 完成第 " + round + " 轮");
164 }
165 } catch (InterruptedException | BrokenBarrierException e) {
166 Thread.currentThread().interrupt();
167 }
168 }).start();
169 }
170 }
171 }
172 }
173}

4. Semaphore(信号量)

4.1 Semaphore 基本概念

Semaphore是一个计数信号量,用于控制同时访问特定资源的线程数量。

Semaphore基本用法示例
java
1import java.util.concurrent.Semaphore;
2import java.util.concurrent.ExecutorService;
3import java.util.concurrent.Executors;
4import java.util.concurrent.TimeUnit;
5
6public class SemaphoreExamples {
7
8 /**
9 * Semaphore基本用法
10 */
11 public static class BasicUsage {
12 public static void main(String[] args) {
13 // 限制同时访问的线程数为2
14 Semaphore semaphore = new Semaphore(2);
15 ExecutorService executor = Executors.newFixedThreadPool(5);
16
17 System.out.println("=== Semaphore基本用法 ===");
18
19 for (int i = 0; i < 5; i++) {
20 final int taskId = i;
21 executor.submit(() -> {
22 try {
23 System.out.println("任务 " + taskId + " 尝试获取许可");
24 semaphore.acquire(); // 获取许可
25
26 System.out.println("任务 " + taskId + " 获得许可,开始执行");
27 Thread.sleep(2000); // 模拟工作
28 System.out.println("任务 " + taskId + " 执行完成");
29
30 } catch (InterruptedException e) {
31 Thread.currentThread().interrupt();
32 } finally {
33 semaphore.release(); // 释放许可
34 System.out.println("任务 " + taskId + " 释放许可");
35 }
36 });
37 }
38
39 executor.shutdown();
40 }
41}
42
43 /**
44 * Semaphore公平模式
45 */
46 public static class FairUsage {
47 public static void main(String[] args) {
48 // 创建公平模式的信号量
49 Semaphore semaphore = new Semaphore(2, true);
50
51 for (int i = 0; i < 4; i++) {
52 final int threadId = i;
53 new Thread(() -> {
54 try {
55 System.out.println("线程 " + threadId + " 尝试获取许可");
56 semaphore.acquire();
57
58 System.out.println("线程 " + threadId + " 获得许可");
59 Thread.sleep(1000);
60
61 } catch (InterruptedException e) {
62 Thread.currentThread().interrupt();
63 } finally {
64 semaphore.release();
65 System.out.println("线程 " + threadId + " 释放许可");
66 }
67 }).start();
68 }
69 }
70 }
71
72 /**
73 * Semaphore超时控制
74 */
75 public static class TimeoutUsage {
76 public static void main(String[] args) {
77 Semaphore semaphore = new Semaphore(1);
78
79 // 线程1:长时间持有许可
80 new Thread(() -> {
81 try {
82 semaphore.acquire();
83 System.out.println("线程1获得许可,开始长时间工作");
84 Thread.sleep(5000);
85 System.out.println("线程1完成工作");
86 } catch (InterruptedException e) {
87 Thread.currentThread().interrupt();
88 } finally {
89 semaphore.release();
90 }
91 }).start();
92
93 // 线程2:尝试获取许可,但会超时
94 new Thread(() -> {
95 try {
96 System.out.println("线程2尝试获取许可");
97 boolean acquired = semaphore.tryAcquire(2, TimeUnit.SECONDS);
98 if (acquired) {
99 System.out.println("线程2获得许可");
100 semaphore.release();
101 } else {
102 System.out.println("线程2获取许可超时");
103 }
104 } catch (InterruptedException e) {
105 Thread.currentThread().interrupt();
106 }
107 }).start();
108 }
109 }
110
111 /**
112 * Semaphore实际应用场景
113 */
114 public static class PracticalApplications {
115
116 /**
117 * 连接池实现
118 */
119 public static class ConnectionPool {
120 private final Semaphore semaphore;
121 private final List<Connection> connections;
122
123 public ConnectionPool(int poolSize) {
124 this.semaphore = new Semaphore(poolSize);
125 this.connections = new ArrayList<>();
126
127 // 初始化连接池
128 for (int i = 0; i < poolSize; i++) {
129 connections.add(new Connection("Connection-" + i));
130 }
131 }
132
133 public Connection getConnection() throws InterruptedException {
134 semaphore.acquire(); // 获取许可
135 synchronized (connections) {
136 return connections.remove(0);
137 }
138 }
139
140 public void releaseConnection(Connection connection) {
141 synchronized (connections) {
142 connections.add(connection);
143 }
144 semaphore.release(); // 释放许可
145 }
146
147 public int getAvailableConnections() {
148 return semaphore.availablePermits();
149 }
150
151 static class Connection {
152 private final String name;
153
154 public Connection(String name) {
155 this.name = name;
156 }
157
158 public String getName() {
159 return name;
160 }
161
162 @Override
163 public String toString() {
164 return "Connection{name='" + name + "'}";
165 }
166 }
167 }
168
169 /**
170 * 限流器实现
171 */
172 public static class RateLimiter {
173 private final Semaphore semaphore;
174 private final int maxPermits;
175
176 public RateLimiter(int maxPermits) {
177 this.maxPermits = maxPermits;
178 this.semaphore = new Semaphore(maxPermits);
179 }
180
181 public boolean tryAcquire() {
182 return semaphore.tryAcquire();
183 }
184
185 public boolean tryAcquire(long timeout, TimeUnit unit) {
186 try {
187 return semaphore.tryAcquire(timeout, unit);
188 } catch (InterruptedException e) {
189 Thread.currentThread().interrupt();
190 return false;
191 }
192 }
193
194 public void acquire() throws InterruptedException {
195 semaphore.acquire();
196 }
197
198 public void release() {
199 semaphore.release();
200 }
201
202 public int getAvailablePermits() {
203 return semaphore.availablePermits();
204 }
205
206 public void reset() {
207 semaphore.drainPermits();
208 semaphore.release(maxPermits);
209 }
210 }
211 }
212}

5. Exchanger(数据交换器)

5.1 Exchanger 基本概念

Exchanger是一个同步工具类,用于两个线程之间交换数据。

Exchanger基本用法示例
java
1import java.util.concurrent.Exchanger;
2import java.util.concurrent.ExecutorService;
3import java.util.concurrent.Executors;
4
5public class ExchangerExamples {
6
7 /**
8 * Exchanger基本用法
9 */
10 public static class BasicUsage {
11 public static void main(String[] args) {
12 Exchanger<String> exchanger = new Exchanger<>();
13 ExecutorService executor = Executors.newFixedThreadPool(2);
14
15 System.out.println("=== Exchanger基本用法 ===");
16
17 // 生产者线程
18 executor.submit(() -> {
19 try {
20 String data = "生产者数据";
21 System.out.println("生产者准备交换数据: " + data);
22 String received = exchanger.exchange(data);
23 System.out.println("生产者收到数据: " + received);
24 } catch (InterruptedException e) {
25 Thread.currentThread().interrupt();
26 }
27 });
28
29 // 消费者线程
30 executor.submit(() -> {
31 try {
32 Thread.sleep(1000); // 模拟处理时间
33 String data = "消费者数据";
34 System.out.println("消费者准备交换数据: " + data);
35 String received = exchanger.exchange(data);
36 System.out.println("消费者收到数据: " + received);
37 } catch (InterruptedException e) {
38 Thread.currentThread().interrupt();
39 }
40 });
41
42 executor.shutdown();
43 }
44}
45
46 /**
47 * Exchanger超时控制
48 */
49 public static class TimeoutUsage {
50 public static void main(String[] args) {
51 Exchanger<String> exchanger = new Exchanger<>();
52
53 // 线程1:立即交换
54 new Thread(() -> {
55 try {
56 String data = "数据1";
57 System.out.println("线程1准备交换: " + data);
58 String received = exchanger.exchange(data, 2, TimeUnit.SECONDS);
59 System.out.println("线程1收到: " + received);
60 } catch (InterruptedException | TimeoutException e) {
61 System.out.println("线程1交换超时或中断");
62 }
63 }).start();
64
65 // 线程2:延迟交换
66 new Thread(() -> {
67 try {
68 Thread.sleep(3000); // 延迟3秒
69 String data = "数据2";
70 System.out.println("线程2准备交换: " + data);
71 String received = exchanger.exchange(data);
72 System.out.println("线程2收到: " + received);
73 } catch (InterruptedException e) {
74 Thread.currentThread().interrupt();
75 }
76 }).start();
77 }
78 }
79
80 /**
81 * Exchanger实际应用场景
82 */
83 public static class PracticalApplications {
84
85 /**
86 * 生产者消费者模式
87 */
88 public static class ProducerConsumer {
89 public static void main(String[] args) {
90 Exchanger<List<String>> exchanger = new Exchanger<>();
91
92 // 生产者
93 new Thread(() -> {
94 try {
95 List<String> buffer = new ArrayList<>();
96 for (int i = 0; i < 5; i++) {
97 buffer.add("数据-" + i);
98 if (buffer.size() == 3) {
99 System.out.println("生产者交换缓冲区");
100 buffer = exchanger.exchange(buffer);
101 Thread.sleep(1000);
102 }
103 }
104 } catch (InterruptedException e) {
105 Thread.currentThread().interrupt();
106 }
107 }).start();
108
109 // 消费者
110 new Thread(() -> {
111 try {
112 List<String> buffer = new ArrayList<>();
113 for (int i = 0; i < 5; i++) {
114 if (buffer.isEmpty()) {
115 System.out.println("消费者等待数据");
116 buffer = exchanger.exchange(buffer);
117 }
118 String data = buffer.remove(0);
119 System.out.println("消费者处理: " + data);
120 }
121 } catch (InterruptedException e) {
122 Thread.currentThread().interrupt();
123 }
124 }).start();
125 }
126 }
127
128 /**
129 * 数据校验场景
130 */
131 public static class DataValidator {
132 public static void main(String[] args) {
133 Exchanger<DataPacket> exchanger = new Exchanger<>();
134
135 // 数据生成器
136 new Thread(() -> {
137 try {
138 for (int i = 0; i < 3; i++) {
139 DataPacket packet = new DataPacket("数据包-" + i, i * 100);
140 System.out.println("生成数据包: " + packet);
141
142 DataPacket validatedPacket = exchanger.exchange(packet);
143 System.out.println("收到校验结果: " + validatedPacket);
144 }
145 } catch (InterruptedException e) {
146 Thread.currentThread().interrupt();
147 }
148 }).start();
149
150 // 数据校验器
151 new Thread(() -> {
152 try {
153 for (int i = 0; i < 3; i++) {
154 DataPacket packet = exchanger.exchange(null);
155 System.out.println("校验数据包: " + packet);
156
157 // 模拟校验过程
158 Thread.sleep(500);
159 packet.setValidated(true);
160
161 exchanger.exchange(packet);
162 }
163 } catch (InterruptedException e) {
164 Thread.currentThread().interrupt();
165 }
166 }).start();
167 }
168
169 static class DataPacket {
170 private String data;
171 private int value;
172 private boolean validated;
173
174 public DataPacket(String data, int value) {
175 this.data = data;
176 this.value = value;
177 this.validated = false;
178 }
179
180 public void setValidated(boolean validated) {
181 this.validated = validated;
182 }
183
184 @Override
185 public String toString() {
186 return "DataPacket{data='" + data + "', value=" + value + ", validated=" + validated + "}";
187 }
188 }
189 }
190 }
191}

6. Phaser(阶段器)

6.1 Phaser 基本概念

Phaser是一个更灵活的同步工具类,可以动态调整参与同步的线程数量。

Phaser基本用法示例
java
1import java.util.concurrent.Phaser;
2
3public class PhaserExamples {
4
5 /**
6 * Phaser基本用法
7 */
8 public static class BasicUsage {
9 public static void main(String[] args) {
10 Phaser phaser = new Phaser(3); // 初始参与线程数为3
11
12 System.out.println("=== Phaser基本用法 ===");
13
14 for (int i = 0; i < 3; i++) {
15 final int threadId = i;
16 new Thread(() -> {
17 try {
18 System.out.println("线程 " + threadId + " 开始第一阶段");
19 Thread.sleep(1000);
20 phaser.arriveAndAwaitAdvance(); // 等待所有线程完成第一阶段
21
22 System.out.println("线程 " + threadId + " 开始第二阶段");
23 Thread.sleep(1000);
24 phaser.arriveAndAwaitAdvance(); // 等待所有线程完成第二阶段
25
26 System.out.println("线程 " + threadId + " 完成所有阶段");
27 phaser.arriveAndDeregister(); // 退出同步
28
29 } catch (InterruptedException e) {
30 Thread.currentThread().interrupt();
31 }
32 }).start();
33 }
34 }
35}
36
37 /**
38 * Phaser动态注册
39 */
40 public static class DynamicRegistration {
41 public static void main(String[] args) {
42 Phaser phaser = new Phaser(1); // 主线程注册
43
44 for (int i = 0; i < 3; i++) {
45 final int threadId = i;
46 new Thread(() -> {
47 phaser.register(); // 动态注册
48 try {
49 System.out.println("线程 " + threadId + " 注册,当前参与数: " + phaser.getRegisteredParties());
50
51 System.out.println("线程 " + threadId + " 开始工作");
52 Thread.sleep(1000);
53
54 phaser.arriveAndDeregister(); // 完成工作并注销
55 System.out.println("线程 " + threadId + " 注销,当前参与数: " + phaser.getRegisteredParties());
56
57 } catch (InterruptedException e) {
58 Thread.currentThread().interrupt();
59 }
60 }).start();
61 }
62
63 // 主线程等待所有工作线程完成
64 phaser.arriveAndAwaitAdvance();
65 System.out.println("所有工作线程完成");
66 }
67 }
68
69 /**
70 * Phaser实际应用场景
71 */
72 public static class PracticalApplications {
73
74 /**
75 * 多阶段任务处理
76 */
77 public static class MultiPhaseProcessor {
78 public void processMultiPhaseTask() {
79 Phaser phaser = new Phaser(1);
80
81 for (int i = 0; i < 3; i++) {
82 final int workerId = i;
83 new Thread(() -> {
84 phaser.register();
85 try {
86 // 第一阶段:数据准备
87 System.out.println("工作者 " + workerId + " 开始数据准备");
88 Thread.sleep(1000);
89 phaser.arriveAndAwaitAdvance();
90
91 // 第二阶段:数据处理
92 System.out.println("工作者 " + workerId + " 开始数据处理");
93 Thread.sleep(1500);
94 phaser.arriveAndAwaitAdvance();
95
96 // 第三阶段:结果输出
97 System.out.println("工作者 " + workerId + " 开始结果输出");
98 Thread.sleep(800);
99 phaser.arriveAndDeregister();
100
101 } catch (InterruptedException e) {
102 Thread.currentThread().interrupt();
103 }
104 }).start();
105 }
106
107 // 主线程等待所有阶段完成
108 phaser.arriveAndAwaitAdvance();
109 System.out.println("所有阶段完成");
110 }
111 }
112
113 /**
114 * 游戏回合制处理
115 */
116 public static class GameRoundProcessor {
117 public void processGameRounds(int playerCount, int roundCount) {
118 Phaser phaser = new Phaser(playerCount);
119
120 for (int i = 0; i < playerCount; i++) {
121 final int playerId = i;
122 new Thread(() -> {
123 try {
124 for (int round = 0; round < roundCount; round++) {
125 System.out.println("玩家 " + playerId + " 开始第 " + round + " 轮");
126
127 // 模拟玩家行动
128 Thread.sleep(500 + playerId * 100);
129
130 System.out.println("玩家 " + playerId + " 完成第 " + round + " 轮");
131 phaser.arriveAndAwaitAdvance(); // 等待所有玩家完成当前轮
132 }
133 } catch (InterruptedException e) {
134 Thread.currentThread().interrupt();
135 }
136 }).start();
137 }
138 }
139 }
140 }
141}

7. 并发工具类最佳实践

7.1 工具类选择指南

核心原则

选择合适的并发工具类需要考虑以下因素:

  • 同步需求:一次性等待还是循环等待
  • 线程数量:固定数量还是动态变化
  • 超时要求:是否需要超时控制
  • 公平性:是否需要公平性保证
工具类选择指南示例
java
1public class ToolSelectionGuide {
2
3 /**
4 * 工具类选择指南
5 */
6 public static void selectionGuide() {
7 System.out.println("=== 并发工具类选择指南 ===");
8
9 // 1. 一次性等待多个任务完成
10 System.out.println("1. 一次性等待多个任务完成 -> CountDownLatch");
11 System.out.println(" 适用场景:服务启动、数据加载、任务完成等待");
12
13 // 2. 循环等待多个线程同步
14 System.out.println("2. 循环等待多个线程同步 -> CyclicBarrier");
15 System.out.println(" 适用场景:多阶段处理、游戏回合、矩阵计算");
16
17 // 3. 控制并发访问数量
18 System.out.println("3. 控制并发访问数量 -> Semaphore");
19 System.out.println(" 适用场景:连接池、限流器、资源控制");
20
21 // 4. 两个线程交换数据
22 System.out.println("4. 两个线程交换数据 -> Exchanger");
23 System.out.println(" 适用场景:生产者消费者、数据校验、缓冲区交换");
24
25 // 5. 动态调整参与线程数量
26 System.out.println("5. 动态调整参与线程数量 -> Phaser");
27 System.out.println(" 适用场景:多阶段任务、游戏处理、复杂同步");
28 }
29}

7.2 性能优化技巧

性能优化示例
java
1public class PerformanceOptimization {
2
3 /**
4 * 避免过度同步
5 */
6 public static class AvoidOverSynchronization {
7 public void optimizedApproach() {
8 // 不推荐:过度使用同步
9 CountDownLatch latch = new CountDownLatch(1);
10 Semaphore semaphore = new Semaphore(1);
11
12 // 推荐:选择合适的工具
13 // 简单等待 -> CountDownLatch
14 // 资源控制 -> Semaphore
15 // 数据交换 -> Exchanger
16 }
17 }
18
19 /**
20 * 合理设置超时
21 */
22 public static class TimeoutOptimization {
23 public void optimizedTimeout() {
24 // 设置合理的超时时间
25 Semaphore semaphore = new Semaphore(1);
26
27 try {
28 // 根据业务需求设置超时时间
29 boolean acquired = semaphore.tryAcquire(5, TimeUnit.SECONDS);
30 if (acquired) {
31 // 处理业务逻辑
32 semaphore.release();
33 } else {
34 // 处理超时情况
35 System.out.println("获取资源超时");
36 }
37 } catch (InterruptedException e) {
38 Thread.currentThread().interrupt();
39 }
40 }
41 }
42
43 /**
44 * 避免死锁
45 */
46 public static class DeadlockPrevention {
47 public void preventDeadlock() {
48 // 1. 固定资源获取顺序
49 Semaphore sem1 = new Semaphore(1);
50 Semaphore sem2 = new Semaphore(1);
51
52 // 总是先获取sem1,再获取sem2
53 try {
54 sem1.acquire();
55 sem2.acquire();
56 // 使用资源
57 } finally {
58 sem2.release();
59 sem1.release();
60 }
61
62 // 2. 使用超时机制
63 try {
64 if (sem1.tryAcquire(1, TimeUnit.SECONDS)) {
65 if (sem2.tryAcquire(1, TimeUnit.SECONDS)) {
66 // 使用资源
67 sem2.release();
68 }
69 sem1.release();
70 }
71 } catch (InterruptedException e) {
72 Thread.currentThread().interrupt();
73 }
74 }
75 }
76}

7.3 常见陷阱与解决方案

陷阱问题解决方案
CountDownLatch重用CountDownLatch是一次性的使用CyclicBarrier或Phaser
CyclicBarrier线程数不匹配线程数变化导致死锁使用Phaser动态调整
Semaphore忘记释放导致其他线程无法获取资源使用try-finally确保释放
Exchanger超时处理一个线程超时导致另一个线程阻塞使用超时版本的exchange方法
Phaser过度注册注册过多线程影响性能合理控制注册数量

8. 总结

Java并发工具类为多线程编程提供了强大而灵活的支持。

8.1 关键要点

  1. 工具类特性:CountDownLatch、CyclicBarrier、Semaphore、Exchanger、Phaser
  2. 使用场景:根据具体需求选择合适的工具类
  3. 性能优化:避免过度同步、合理设置超时、防止死锁
  4. 最佳实践:正确使用、及时清理、异常处理

8.2 选择建议

场景推荐工具类原因
等待多个任务完成CountDownLatch一次性等待,简单高效
循环同步CyclicBarrier支持重置,适合多轮处理
资源控制Semaphore控制并发数量,支持公平性
数据交换Exchanger专门用于两个线程交换数据
动态同步Phaser支持动态调整参与线程数量

8.3 学习建议

  1. 理解原理:深入理解各种工具类的工作原理
  2. 实践验证:通过编写代码验证不同工具类的效果
  3. 场景选择:根据具体需求选择合适的工具类
  4. 性能测试:对比不同实现方式的性能差异

通过深入理解和熟练运用这些并发工具类,我们能够构建出更加高效、健壮和可维护的Java并发应用程序。

参与讨论