Java 并发工具类详解
Java并发包(java.util.concurrent)提供了丰富的并发工具类,用于简化多线程编程。这些工具类提供了比传统synchronized更灵活和高效的并发控制机制。本文将详细介绍各种并发工具类的使用方法和最佳实践。
本文内容概览
核心价值
并发工具类 = 灵活同步 + 性能优化 + 易用性 + 丰富功能
- 🔄 灵活控制:提供比synchronized更灵活的同步机制
- 🚀 高性能:采用更高效的算法和实现,优化竞争处理
- 🧩 功能丰富:针对不同场景提供专用工具
- 🛡️ 健壮性:内置超时、中断、异常处理等机制
- 📊 可伸缩性:适应各种复杂并发场景
1. 并发工具类概述
1.1 什么是并发工具类?
核心概念
并发工具类是Java并发包中提供的一组高级同步工具,它们基于AQS(AbstractQueuedSynchronizer)实现,提供了比传统synchronized更灵活、更高效的并发控制机制。
1.2 并发工具类的优势
并发工具类与传统同步机制对比
| 优势 | 具体体现 | 业务价值 |
|---|---|---|
| 灵活性 | 支持超时、中断、公平性等特性 | 适应复杂业务场景 |
| 高性能 | 基于AQS实现,性能优于synchronized | 提高系统吞吐量 |
| 功能丰富 | 提供多种同步机制 | 简化编程复杂度 |
| 可扩展性 | 支持自定义同步器 | 满足特殊需求 |
| 可读性 | 语义清晰,代码易理解 | 提高代码质量 |
1.3 并发工具类分类
- 同步工具类
- 交换工具类
- 异步工具类
- 原子工具类
同步工具类主要用于协调多个线程的执行顺序和状态:
| 工具类 | 主要功能 | 特点 | 适用场景 |
|---|---|---|---|
| CountDownLatch | 让一个或多个线程等待一组操作完成 | 一次性使用,计数器为0后无法重置 | 启动信号、资源初始化等待 |
| CyclicBarrier | 让一组线程在某个点相互等待 | 可重复使用,自动重置计数 | 分阶段计算、并行迭代算法 |
| Semaphore | 控制同时访问某个资源的线程数量 | 支持公平/非公平模式,可动态调整许可 | 连接池管理、资源限制 |
| Phaser | 允许多阶段同步,动态调整参与方数量 | 功能最强大、最复杂 | 复杂多阶段并行任务 |
交换工具类用于线程间数据交换:
| 工具类 | 主要功能 | 特点 | 适用场景 |
|---|---|---|---|
| Exchanger | 两个线程之间交换数据 | 双向交换,支持超时 | 生产者/消费者模式、双向数据流 |
异步工具类用于处理异步计算结果:
| 工具类 | 主要功能 | 特点 | 适用场景 |
|---|---|---|---|
| FutureTask | 表示一个可取消的异步计算 | 可获取结果、检查状态、取消任务 | 单个异步任务处理 |
| CompletableFuture | 支持组合式异步编程 | 支持任务链式处理、异常处理 | 复杂异步工作流 |
原子工具类用于原子操作:
| 工具类 | 主要功能 | 特点 | 适用场景 |
|---|---|---|---|
| AtomicInteger | 原子更新整型值 | CAS操作,无锁 | 计数器、序列号生成 |
| AtomicReference | 原子更新引用类型 | 支持引用的原子更新 | 原子更新对象 |
| LongAdder | 高并发计数器 | 分段计数,减少竞争 | 高并发统计场景 |
2. CountDownLatch(倒计时门闩)
2.1 CountDownLatch 基本概念
CountDownLatch核心特性
CountDownLatch是一个同步工具类,允许一个或多个线程等待其他线程完成操作。
关键特性:
- 初始化时指定计数值
- 调用
countDown()方法使计数值减1 - 调用
await()方法等待计数值变为0 - 一次性使用:计数值为0后无法重置
- 支持超时等待:
await(long timeout, TimeUnit unit)
主要应用场景:
- 启动信号:等待所有服务就绪后再开始
- 任务分解:将大任务分解为多个小任务并行处理
- 资源初始化:等待多个资源初始化完成
- 基本用法
- 超时等待
- 实际应用
java
1import java.util.concurrent.CountDownLatch;2import java.util.concurrent.ExecutorService;3import java.util.concurrent.Executors;45/**6 * CountDownLatch基本用法7 */8public static class BasicUsage {9 public static void main(String[] args) throws InterruptedException {10 int threadCount = 3;11 // 创建一个初始计数为3的CountDownLatch12 CountDownLatch latch = new CountDownLatch(threadCount);13 ExecutorService executor = Executors.newFixedThreadPool(threadCount);14 15 // 启动多个工作线程16 for (int i = 0; i < threadCount; i++) {17 final int taskId = i;18 executor.submit(() -> {19 try {20 System.out.println("任务 " + taskId + " 开始执行");21 Thread.sleep(1000 + taskId * 500); // 模拟工作22 System.out.println("任务 " + taskId + " 执行完成");23 } catch (InterruptedException e) {24 Thread.currentThread().interrupt();25 } finally {26 latch.countDown(); // 计数器减127 System.out.println("任务 " + taskId + " 计数器减1,剩余: " + latch.getCount());28 }29 });30 }31 32 System.out.println("主线程等待所有任务完成...");33 latch.await(); // 等待所有任务完成34 System.out.println("所有任务已完成!");35 36 executor.shutdown();37 }38}工作原理:
- 创建
CountDownLatch并设置初始计数值 - 启动多个线程执行任务
- 每个任务完成后调用
countDown()使计数值减1 - 主线程调用
await()等待计数值变为0 - 当所有任务完成后,主线程继续执行
java
1import java.util.concurrent.CountDownLatch;2import java.util.concurrent.TimeUnit;34/**5 * CountDownLatch超时等待6 */7public static class TimeoutUsage {8 public static void main(String[] args) throws InterruptedException {9 CountDownLatch latch = new CountDownLatch(3);10 11 // 启动工作线程12 for (int i = 0; i < 3; i++) {13 final int taskId = i;14 new Thread(() -> {15 try {16 // 每个任务执行时间不同17 Thread.sleep(2000 + taskId * 1000); // 模拟长时间工作18 System.out.println("任务 " + taskId + " 完成");19 latch.countDown();20 } catch (InterruptedException e) {21 Thread.currentThread().interrupt();22 }23 }).start();24 }25 26 // 最多等待5秒27 boolean completed = latch.await(5, TimeUnit.SECONDS);28 if (completed) {29 System.out.println("所有任务在超时时间内完成");30 } else {31 System.out.println("等待超时,还有 " + latch.getCount() + " 个任务未完成");32 }33 }34}超时处理的优势:
- 避免无限期等待
- 提供失败快速路径
- 增强系统鲁棒性
- 支持优雅降级策略
java
1import java.util.concurrent.CountDownLatch;23/**4 * 数据加载场景5 */6public static class DataLoader {7 public void loadData() throws InterruptedException {8 // 创建用于等待三类数据加载完成的CountDownLatch9 CountDownLatch latch = new CountDownLatch(3);10 11 // 并行加载不同类型的数据12 new Thread(() -> {13 try {14 loadUserData();15 System.out.println("用户数据加载完成");16 } finally {17 latch.countDown();18 }19 }).start();20 21 new Thread(() -> {22 try {23 loadProductData();24 System.out.println("产品数据加载完成");25 } finally {26 latch.countDown();27 }28 }).start();29 30 new Thread(() -> {31 try {32 loadOrderData();33 System.out.println("订单数据加载完成");34 } finally {35 latch.countDown();36 }37 }).start();38 39 // 等待所有数据加载完成40 latch.await();41 System.out.println("所有数据加载完成,开始处理业务逻辑");42 processAllData();43 }44 45 // 数据加载和处理方法46 private void loadUserData() { /* 实现省略 */ }47 private void loadProductData() { /* 实现省略 */ }48 private void loadOrderData() { /* 实现省略 */ }49 private void processAllData() { /* 实现省略 */ }50}应用场景示例:
- 微服务启动:等待多个依赖服务就绪
- 并行数据处理:分片处理大型数据集
- 测试多线程:确保多个线程同时开始执行
- 资源初始化:等待多个资源准备完毕
2.2 CountDownLatch vs CyclicBarrier
CountDownLatch与CyclicBarrier对比
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 等待方向 | 一个或多个线程等待其他线程 | 一组线程相互等待 |
| 重用性 | 一次性使用,不可重置 | 可重复使用,自动重置 |
| 计数操作 | countDown()减少计数 | 调用await()增加计数 |
| 回调动作 | 不支持 | 支持屏障动作(barrier action) |
| 超时支持 | 支持超时等待 | 支持超时等待 |
| 创建方式 | 指定计数值 | 指定参与线程数和可选的屏障动作 |
| 使用场景 | 启动信号、任务分解 | 分阶段计算、并行迭代 |
2.3 CountDownLatch 实现原理
CountDownLatch内部实现细节
CountDownLatch基于AQS(AbstractQueuedSynchronizer)实现,它使用AQS的共享模式:
-
状态管理:
- AQS的state变量用作计数器
- 初始化时设置为指定的计数值
- 每次countDown()时将state减1
- 当state变为0时,所有等待的线程被释放
-
关键方法实现:
countDown():调用AQS的releaseShared()方法,减少计数await():调用AQS的acquireShared()方法,如果计数不为0则阻塞await(timeout, unit):调用AQS的tryAcquireSharedNanos()方法,支持超时
-
线程排队机制:
- 使用AQS内部的FIFO队列来管理等待的线程
- 当计数器变为0时,AQS会释放所有等待的线程
-
为什么不可重用:
- CountDownLatch的计数器只能减不能增
- 一旦计数器达到0,就无法恢复到初始状态
- 这是设计决策,而非技术限制
3. CyclicBarrier(循环屏障)
3.1 CyclicBarrier 基本概念
CyclicBarrier是一个同步工具类,允许一组线程互相等待,直到所有线程都到达某个公共屏障点。
CyclicBarrier基本用法示例
java
1import java.util.concurrent.CyclicBarrier;2import java.util.concurrent.ExecutorService;3import java.util.concurrent.Executors;4import java.util.concurrent.BrokenBarrierException;56public class CyclicBarrierExamples {7 8 /**9 * CyclicBarrier基本用法10 */11 public static class BasicUsage {12 public static void main(String[] args) {13 int threadCount = 3;14 CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {15 System.out.println("所有线程都到达屏障点,开始下一轮");16 });17 18 ExecutorService executor = Executors.newFixedThreadPool(threadCount);19 20 System.out.println("=== CyclicBarrier基本用法 ===");21 22 for (int i = 0; i < threadCount; i++) {23 final int threadId = i;24 executor.submit(() -> {25 try {26 for (int round = 0; round < 3; round++) {27 System.out.println("线程 " + threadId + " 执行第 " + round + " 轮");28 Thread.sleep(1000 + threadId * 200);29 System.out.println("线程 " + threadId + " 到达屏障点");30 barrier.await(); // 等待其他线程31 }32 } catch (InterruptedException | BrokenBarrierException e) {33 Thread.currentThread().interrupt();34 }35 });36 }37 38 executor.shutdown();39 }40}41 42 /**43 * CyclicBarrier超时处理44 */45 public static class TimeoutUsage {46 public static void main(String[] args) {47 int threadCount = 3;48 CyclicBarrier barrier = new CyclicBarrier(threadCount);49 50 for (int i = 0; i < threadCount; i++) {51 final int threadId = i;52 new Thread(() -> {53 try {54 for (int round = 0; round < 2; round++) {55 System.out.println("线程 " + threadId + " 开始第 " + round + " 轮");56 57 // 模拟不同线程的执行时间58 Thread.sleep(1000 + threadId * 1000);59 60 try {61 // 等待最多3秒62 barrier.await(3, TimeUnit.SECONDS);63 System.out.println("线程 " + threadId + " 第 " + round + " 轮完成");64 } catch (TimeoutException e) {65 System.out.println("线程 " + threadId + " 等待超时");66 barrier.reset(); // 重置屏障67 break;68 }69 }70 } catch (InterruptedException | BrokenBarrierException e) {71 Thread.currentThread().interrupt();72 }73 }).start();74 }75 }76 }77 78 /**79 * CyclicBarrier实际应用场景80 */81 public static class PracticalApplications {82 83 /**84 * 矩阵处理场景85 */86 public static class MatrixProcessor {87 public void processMatrix(int[][] matrix) {88 int rows = matrix.length;89 CyclicBarrier barrier = new CyclicBarrier(rows, () -> {90 System.out.println("所有行处理完成,开始下一阶段");91 });92 93 for (int i = 0; i < rows; i++) {94 final int rowIndex = i;95 new Thread(() -> {96 try {97 // 第一阶段:处理矩阵的每一行98 processRow(matrix[rowIndex]);99 barrier.await();100 101 // 第二阶段:等待所有行处理完成后,进行下一阶段102 processRowPhase2(matrix[rowIndex]);103 barrier.await();104 105 // 第三阶段:最终处理106 processRowPhase3(matrix[rowIndex]);107 barrier.await();108 109 } catch (InterruptedException | BrokenBarrierException e) {110 Thread.currentThread().interrupt();111 }112 }).start();113 }114 }115 116 private void processRow(int[] row) {117 try {118 System.out.println("处理行数据: " + Arrays.toString(row));119 Thread.sleep(500);120 } catch (InterruptedException e) {121 Thread.currentThread().interrupt();122 }123 }124 125 private void processRowPhase2(int[] row) {126 try {127 System.out.println("第二阶段处理行数据: " + Arrays.toString(row));128 Thread.sleep(300);129 } catch (InterruptedException e) {130 Thread.currentThread().interrupt();131 }132 }133 134 private void processRowPhase3(int[] row) {135 try {136 System.out.println("第三阶段处理行数据: " + Arrays.toString(row));137 Thread.sleep(200);138 } catch (InterruptedException e) {139 Thread.currentThread().interrupt();140 }141 }142 }143 144 /**145 * 游戏同步场景146 */147 public static class GameSynchronizer {148 public void startGame(int playerCount) {149 CyclicBarrier barrier = new CyclicBarrier(playerCount, () -> {150 System.out.println("所有玩家准备就绪,开始新一轮");151 });152 153 for (int i = 0; i < playerCount; i++) {154 final int playerId = i;155 new Thread(() -> {156 try {157 for (int round = 0; round < 3; round++) {158 System.out.println("玩家 " + playerId + " 准备第 " + round + " 轮");159 Thread.sleep(1000 + playerId * 200);160 161 barrier.await();162 163 System.out.println("玩家 " + playerId + " 完成第 " + round + " 轮");164 }165 } catch (InterruptedException | BrokenBarrierException e) {166 Thread.currentThread().interrupt();167 }168 }).start();169 }170 }171 }172 }173}4. Semaphore(信号量)
4.1 Semaphore 基本概念
Semaphore是一个计数信号量,用于控制同时访问特定资源的线程数量。
Semaphore基本用法示例
java
1import java.util.concurrent.Semaphore;2import java.util.concurrent.ExecutorService;3import java.util.concurrent.Executors;4import java.util.concurrent.TimeUnit;56public class SemaphoreExamples {7 8 /**9 * Semaphore基本用法10 */11 public static class BasicUsage {12 public static void main(String[] args) {13 // 限制同时访问的线程数为214 Semaphore semaphore = new Semaphore(2);15 ExecutorService executor = Executors.newFixedThreadPool(5);16 17 System.out.println("=== Semaphore基本用法 ===");18 19 for (int i = 0; i < 5; i++) {20 final int taskId = i;21 executor.submit(() -> {22 try {23 System.out.println("任务 " + taskId + " 尝试获取许可");24 semaphore.acquire(); // 获取许可25 26 System.out.println("任务 " + taskId + " 获得许可,开始执行");27 Thread.sleep(2000); // 模拟工作28 System.out.println("任务 " + taskId + " 执行完成");29 30 } catch (InterruptedException e) {31 Thread.currentThread().interrupt();32 } finally {33 semaphore.release(); // 释放许可34 System.out.println("任务 " + taskId + " 释放许可");35 }36 });37 }38 39 executor.shutdown();40 }41}42 43 /**44 * Semaphore公平模式45 */46 public static class FairUsage {47 public static void main(String[] args) {48 // 创建公平模式的信号量49 Semaphore semaphore = new Semaphore(2, true);50 51 for (int i = 0; i < 4; i++) {52 final int threadId = i;53 new Thread(() -> {54 try {55 System.out.println("线程 " + threadId + " 尝试获取许可");56 semaphore.acquire();57 58 System.out.println("线程 " + threadId + " 获得许可");59 Thread.sleep(1000);60 61 } catch (InterruptedException e) {62 Thread.currentThread().interrupt();63 } finally {64 semaphore.release();65 System.out.println("线程 " + threadId + " 释放许可");66 }67 }).start();68 }69 }70 }71 72 /**73 * Semaphore超时控制74 */75 public static class TimeoutUsage {76 public static void main(String[] args) {77 Semaphore semaphore = new Semaphore(1);78 79 // 线程1:长时间持有许可80 new Thread(() -> {81 try {82 semaphore.acquire();83 System.out.println("线程1获得许可,开始长时间工作");84 Thread.sleep(5000);85 System.out.println("线程1完成工作");86 } catch (InterruptedException e) {87 Thread.currentThread().interrupt();88 } finally {89 semaphore.release();90 }91 }).start();92 93 // 线程2:尝试获取许可,但会超时94 new Thread(() -> {95 try {96 System.out.println("线程2尝试获取许可");97 boolean acquired = semaphore.tryAcquire(2, TimeUnit.SECONDS);98 if (acquired) {99 System.out.println("线程2获得许可");100 semaphore.release();101 } else {102 System.out.println("线程2获取许可超时");103 }104 } catch (InterruptedException e) {105 Thread.currentThread().interrupt();106 }107 }).start();108 }109 }110 111 /**112 * Semaphore实际应用场景113 */114 public static class PracticalApplications {115 116 /**117 * 连接池实现118 */119 public static class ConnectionPool {120 private final Semaphore semaphore;121 private final List<Connection> connections;122 123 public ConnectionPool(int poolSize) {124 this.semaphore = new Semaphore(poolSize);125 this.connections = new ArrayList<>();126 127 // 初始化连接池128 for (int i = 0; i < poolSize; i++) {129 connections.add(new Connection("Connection-" + i));130 }131 }132 133 public Connection getConnection() throws InterruptedException {134 semaphore.acquire(); // 获取许可135 synchronized (connections) {136 return connections.remove(0);137 }138 }139 140 public void releaseConnection(Connection connection) {141 synchronized (connections) {142 connections.add(connection);143 }144 semaphore.release(); // 释放许可145 }146 147 public int getAvailableConnections() {148 return semaphore.availablePermits();149 }150 151 static class Connection {152 private final String name;153 154 public Connection(String name) {155 this.name = name;156 }157 158 public String getName() {159 return name;160 }161 162 @Override163 public String toString() {164 return "Connection{name='" + name + "'}";165 }166 }167 }168 169 /**170 * 限流器实现171 */172 public static class RateLimiter {173 private final Semaphore semaphore;174 private final int maxPermits;175 176 public RateLimiter(int maxPermits) {177 this.maxPermits = maxPermits;178 this.semaphore = new Semaphore(maxPermits);179 }180 181 public boolean tryAcquire() {182 return semaphore.tryAcquire();183 }184 185 public boolean tryAcquire(long timeout, TimeUnit unit) {186 try {187 return semaphore.tryAcquire(timeout, unit);188 } catch (InterruptedException e) {189 Thread.currentThread().interrupt();190 return false;191 }192 }193 194 public void acquire() throws InterruptedException {195 semaphore.acquire();196 }197 198 public void release() {199 semaphore.release();200 }201 202 public int getAvailablePermits() {203 return semaphore.availablePermits();204 }205 206 public void reset() {207 semaphore.drainPermits();208 semaphore.release(maxPermits);209 }210 }211 }212}5. Exchanger(数据交换器)
5.1 Exchanger 基本概念
Exchanger是一个同步工具类,用于两个线程之间交换数据。
Exchanger基本用法示例
java
1import java.util.concurrent.Exchanger;2import java.util.concurrent.ExecutorService;3import java.util.concurrent.Executors;45public class ExchangerExamples {6 7 /**8 * Exchanger基本用法9 */10 public static class BasicUsage {11 public static void main(String[] args) {12 Exchanger<String> exchanger = new Exchanger<>();13 ExecutorService executor = Executors.newFixedThreadPool(2);14 15 System.out.println("=== Exchanger基本用法 ===");16 17 // 生产者线程18 executor.submit(() -> {19 try {20 String data = "生产者数据";21 System.out.println("生产者准备交换数据: " + data);22 String received = exchanger.exchange(data);23 System.out.println("生产者收到数据: " + received);24 } catch (InterruptedException e) {25 Thread.currentThread().interrupt();26 }27 });28 29 // 消费者线程30 executor.submit(() -> {31 try {32 Thread.sleep(1000); // 模拟处理时间33 String data = "消费者数据";34 System.out.println("消费者准备交换数据: " + data);35 String received = exchanger.exchange(data);36 System.out.println("消费者收到数据: " + received);37 } catch (InterruptedException e) {38 Thread.currentThread().interrupt();39 }40 });41 42 executor.shutdown();43 }44}45 46 /**47 * Exchanger超时控制48 */49 public static class TimeoutUsage {50 public static void main(String[] args) {51 Exchanger<String> exchanger = new Exchanger<>();52 53 // 线程1:立即交换54 new Thread(() -> {55 try {56 String data = "数据1";57 System.out.println("线程1准备交换: " + data);58 String received = exchanger.exchange(data, 2, TimeUnit.SECONDS);59 System.out.println("线程1收到: " + received);60 } catch (InterruptedException | TimeoutException e) {61 System.out.println("线程1交换超时或中断");62 }63 }).start();64 65 // 线程2:延迟交换66 new Thread(() -> {67 try {68 Thread.sleep(3000); // 延迟3秒69 String data = "数据2";70 System.out.println("线程2准备交换: " + data);71 String received = exchanger.exchange(data);72 System.out.println("线程2收到: " + received);73 } catch (InterruptedException e) {74 Thread.currentThread().interrupt();75 }76 }).start();77 }78 }79 80 /**81 * Exchanger实际应用场景82 */83 public static class PracticalApplications {84 85 /**86 * 生产者消费者模式87 */88 public static class ProducerConsumer {89 public static void main(String[] args) {90 Exchanger<List<String>> exchanger = new Exchanger<>();91 92 // 生产者93 new Thread(() -> {94 try {95 List<String> buffer = new ArrayList<>();96 for (int i = 0; i < 5; i++) {97 buffer.add("数据-" + i);98 if (buffer.size() == 3) {99 System.out.println("生产者交换缓冲区");100 buffer = exchanger.exchange(buffer);101 Thread.sleep(1000);102 }103 }104 } catch (InterruptedException e) {105 Thread.currentThread().interrupt();106 }107 }).start();108 109 // 消费者110 new Thread(() -> {111 try {112 List<String> buffer = new ArrayList<>();113 for (int i = 0; i < 5; i++) {114 if (buffer.isEmpty()) {115 System.out.println("消费者等待数据");116 buffer = exchanger.exchange(buffer);117 }118 String data = buffer.remove(0);119 System.out.println("消费者处理: " + data);120 }121 } catch (InterruptedException e) {122 Thread.currentThread().interrupt();123 }124 }).start();125 }126 }127 128 /**129 * 数据校验场景130 */131 public static class DataValidator {132 public static void main(String[] args) {133 Exchanger<DataPacket> exchanger = new Exchanger<>();134 135 // 数据生成器136 new Thread(() -> {137 try {138 for (int i = 0; i < 3; i++) {139 DataPacket packet = new DataPacket("数据包-" + i, i * 100);140 System.out.println("生成数据包: " + packet);141 142 DataPacket validatedPacket = exchanger.exchange(packet);143 System.out.println("收到校验结果: " + validatedPacket);144 }145 } catch (InterruptedException e) {146 Thread.currentThread().interrupt();147 }148 }).start();149 150 // 数据校验器151 new Thread(() -> {152 try {153 for (int i = 0; i < 3; i++) {154 DataPacket packet = exchanger.exchange(null);155 System.out.println("校验数据包: " + packet);156 157 // 模拟校验过程158 Thread.sleep(500);159 packet.setValidated(true);160 161 exchanger.exchange(packet);162 }163 } catch (InterruptedException e) {164 Thread.currentThread().interrupt();165 }166 }).start();167 }168 169 static class DataPacket {170 private String data;171 private int value;172 private boolean validated;173 174 public DataPacket(String data, int value) {175 this.data = data;176 this.value = value;177 this.validated = false;178 }179 180 public void setValidated(boolean validated) {181 this.validated = validated;182 }183 184 @Override185 public String toString() {186 return "DataPacket{data='" + data + "', value=" + value + ", validated=" + validated + "}";187 }188 }189 }190 }191}6. Phaser(阶段器)
6.1 Phaser 基本概念
Phaser是一个更灵活的同步工具类,可以动态调整参与同步的线程数量。
Phaser基本用法示例
java
1import java.util.concurrent.Phaser;23public class PhaserExamples {4 5 /**6 * Phaser基本用法7 */8 public static class BasicUsage {9 public static void main(String[] args) {10 Phaser phaser = new Phaser(3); // 初始参与线程数为311 12 System.out.println("=== Phaser基本用法 ===");13 14 for (int i = 0; i < 3; i++) {15 final int threadId = i;16 new Thread(() -> {17 try {18 System.out.println("线程 " + threadId + " 开始第一阶段");19 Thread.sleep(1000);20 phaser.arriveAndAwaitAdvance(); // 等待所有线程完成第一阶段21 22 System.out.println("线程 " + threadId + " 开始第二阶段");23 Thread.sleep(1000);24 phaser.arriveAndAwaitAdvance(); // 等待所有线程完成第二阶段25 26 System.out.println("线程 " + threadId + " 完成所有阶段");27 phaser.arriveAndDeregister(); // 退出同步28 29 } catch (InterruptedException e) {30 Thread.currentThread().interrupt();31 }32 }).start();33 }34 }35}36 37 /**38 * Phaser动态注册39 */40 public static class DynamicRegistration {41 public static void main(String[] args) {42 Phaser phaser = new Phaser(1); // 主线程注册43 44 for (int i = 0; i < 3; i++) {45 final int threadId = i;46 new Thread(() -> {47 phaser.register(); // 动态注册48 try {49 System.out.println("线程 " + threadId + " 注册,当前参与数: " + phaser.getRegisteredParties());50 51 System.out.println("线程 " + threadId + " 开始工作");52 Thread.sleep(1000);53 54 phaser.arriveAndDeregister(); // 完成工作并注销55 System.out.println("线程 " + threadId + " 注销,当前参与数: " + phaser.getRegisteredParties());56 57 } catch (InterruptedException e) {58 Thread.currentThread().interrupt();59 }60 }).start();61 }62 63 // 主线程等待所有工作线程完成64 phaser.arriveAndAwaitAdvance();65 System.out.println("所有工作线程完成");66 }67 }68 69 /**70 * Phaser实际应用场景71 */72 public static class PracticalApplications {73 74 /**75 * 多阶段任务处理76 */77 public static class MultiPhaseProcessor {78 public void processMultiPhaseTask() {79 Phaser phaser = new Phaser(1);80 81 for (int i = 0; i < 3; i++) {82 final int workerId = i;83 new Thread(() -> {84 phaser.register();85 try {86 // 第一阶段:数据准备87 System.out.println("工作者 " + workerId + " 开始数据准备");88 Thread.sleep(1000);89 phaser.arriveAndAwaitAdvance();90 91 // 第二阶段:数据处理92 System.out.println("工作者 " + workerId + " 开始数据处理");93 Thread.sleep(1500);94 phaser.arriveAndAwaitAdvance();95 96 // 第三阶段:结果输出97 System.out.println("工作者 " + workerId + " 开始结果输出");98 Thread.sleep(800);99 phaser.arriveAndDeregister();100 101 } catch (InterruptedException e) {102 Thread.currentThread().interrupt();103 }104 }).start();105 }106 107 // 主线程等待所有阶段完成108 phaser.arriveAndAwaitAdvance();109 System.out.println("所有阶段完成");110 }111 }112 113 /**114 * 游戏回合制处理115 */116 public static class GameRoundProcessor {117 public void processGameRounds(int playerCount, int roundCount) {118 Phaser phaser = new Phaser(playerCount);119 120 for (int i = 0; i < playerCount; i++) {121 final int playerId = i;122 new Thread(() -> {123 try {124 for (int round = 0; round < roundCount; round++) {125 System.out.println("玩家 " + playerId + " 开始第 " + round + " 轮");126 127 // 模拟玩家行动128 Thread.sleep(500 + playerId * 100);129 130 System.out.println("玩家 " + playerId + " 完成第 " + round + " 轮");131 phaser.arriveAndAwaitAdvance(); // 等待所有玩家完成当前轮132 }133 } catch (InterruptedException e) {134 Thread.currentThread().interrupt();135 }136 }).start();137 }138 }139 }140 }141}7. 并发工具类最佳实践
7.1 工具类选择指南
核心原则
选择合适的并发工具类需要考虑以下因素:
- 同步需求:一次性等待还是循环等待
- 线程数量:固定数量还是动态变化
- 超时要求:是否需要超时控制
- 公平性:是否需要公平性保证
工具类选择指南示例
java
1public class ToolSelectionGuide {2 3 /**4 * 工具类选择指南5 */6 public static void selectionGuide() {7 System.out.println("=== 并发工具类选择指南 ===");8 9 // 1. 一次性等待多个任务完成10 System.out.println("1. 一次性等待多个任务完成 -> CountDownLatch");11 System.out.println(" 适用场景:服务启动、数据加载、任务完成等待");12 13 // 2. 循环等待多个线程同步14 System.out.println("2. 循环等待多个线程同步 -> CyclicBarrier");15 System.out.println(" 适用场景:多阶段处理、游戏回合、矩阵计算");16 17 // 3. 控制并发访问数量18 System.out.println("3. 控制并发访问数量 -> Semaphore");19 System.out.println(" 适用场景:连接池、限流器、资源控制");20 21 // 4. 两个线程交换数据22 System.out.println("4. 两个线程交换数据 -> Exchanger");23 System.out.println(" 适用场景:生产者消费者、数据校验、缓冲区交换");24 25 // 5. 动态调整参与线程数量26 System.out.println("5. 动态调整参与线程数量 -> Phaser");27 System.out.println(" 适用场景:多阶段任务、游戏处理、复杂同步");28 }29}7.2 性能优化技巧
性能优化示例
java
1public class PerformanceOptimization {2 3 /**4 * 避免过度同步5 */6 public static class AvoidOverSynchronization {7 public void optimizedApproach() {8 // 不推荐:过度使用同步9 CountDownLatch latch = new CountDownLatch(1);10 Semaphore semaphore = new Semaphore(1);11 12 // 推荐:选择合适的工具13 // 简单等待 -> CountDownLatch14 // 资源控制 -> Semaphore15 // 数据交换 -> Exchanger16 }17 }18 19 /**20 * 合理设置超时21 */22 public static class TimeoutOptimization {23 public void optimizedTimeout() {24 // 设置合理的超时时间25 Semaphore semaphore = new Semaphore(1);26 27 try {28 // 根据业务需求设置超时时间29 boolean acquired = semaphore.tryAcquire(5, TimeUnit.SECONDS);30 if (acquired) {31 // 处理业务逻辑32 semaphore.release();33 } else {34 // 处理超时情况35 System.out.println("获取资源超时");36 }37 } catch (InterruptedException e) {38 Thread.currentThread().interrupt();39 }40 }41 }42 43 /**44 * 避免死锁45 */46 public static class DeadlockPrevention {47 public void preventDeadlock() {48 // 1. 固定资源获取顺序49 Semaphore sem1 = new Semaphore(1);50 Semaphore sem2 = new Semaphore(1);51 52 // 总是先获取sem1,再获取sem253 try {54 sem1.acquire();55 sem2.acquire();56 // 使用资源57 } finally {58 sem2.release();59 sem1.release();60 }61 62 // 2. 使用超时机制63 try {64 if (sem1.tryAcquire(1, TimeUnit.SECONDS)) {65 if (sem2.tryAcquire(1, TimeUnit.SECONDS)) {66 // 使用资源67 sem2.release();68 }69 sem1.release();70 }71 } catch (InterruptedException e) {72 Thread.currentThread().interrupt();73 }74 }75 }76}7.3 常见陷阱与解决方案
| 陷阱 | 问题 | 解决方案 |
|---|---|---|
| CountDownLatch重用 | CountDownLatch是一次性的 | 使用CyclicBarrier或Phaser |
| CyclicBarrier线程数不匹配 | 线程数变化导致死锁 | 使用Phaser动态调整 |
| Semaphore忘记释放 | 导致其他线程无法获取资源 | 使用try-finally确保释放 |
| Exchanger超时处理 | 一个线程超时导致另一个线程阻塞 | 使用超时版本的exchange方法 |
| Phaser过度注册 | 注册过多线程影响性能 | 合理控制注册数量 |
8. 总结
Java并发工具类为多线程编程提供了强大而灵活的支持。
8.1 关键要点
- 工具类特性:CountDownLatch、CyclicBarrier、Semaphore、Exchanger、Phaser
- 使用场景:根据具体需求选择合适的工具类
- 性能优化:避免过度同步、合理设置超时、防止死锁
- 最佳实践:正确使用、及时清理、异常处理
8.2 选择建议
| 场景 | 推荐工具类 | 原因 |
|---|---|---|
| 等待多个任务完成 | CountDownLatch | 一次性等待,简单高效 |
| 循环同步 | CyclicBarrier | 支持重置,适合多轮处理 |
| 资源控制 | Semaphore | 控制并发数量,支持公平性 |
| 数据交换 | Exchanger | 专门用于两个线程交换数据 |
| 动态同步 | Phaser | 支持动态调整参与线程数量 |
8.3 学习建议
- 理解原理:深入理解各种工具类的工作原理
- 实践验证:通过编写代码验证不同工具类的效果
- 场景选择:根据具体需求选择合适的工具类
- 性能测试:对比不同实现方式的性能差异
通过深入理解和熟练运用这些并发工具类,我们能够构建出更加高效、健壮和可维护的Java并发应用程序。
参与讨论