Java 并发容器详解
Java并发包提供了多种线程安全的容器类,这些容器专门为多线程环境设计,提供了比使用synchronized更高效的并发访问机制。本文将详细介绍各种并发容器的使用方法和最佳实践。
本文内容概览
核心价值
并发容器 = 线程安全性 + 高性能访问 + 低竞争设计 + 功能扩展 + 便捷API
- 🛡️ 线程安全:无需额外同步,免除并发错误困扰
- ⚡ 高性能:采用分段锁、无锁算法等现代并发技术
- 🔄 一致性模型:提供清晰的一致性保证
- 🚀 扩展性强:良好应对高并发负载场景
- 🔧 功能丰富:提供阻塞、超时等特性满足多样需求
1. 并发容器概述
1.1 什么是并发容器?
核心概念
并发容器是Java并发包中提供的线程安全集合类,它们通过不同的并发控制机制(如分段锁、CAS操作、读写锁等)实现线程安全,避免了使用synchronized的性能开销。
1.2 并发容器与传统同步容器对比
| 特性 | 传统同步容器 | 并发容器 |
|---|---|---|
| 实现方式 | Collections.synchronizedXxx | java.util.concurrent 包 |
| 同步机制 | 方法级synchronized锁 | 分段锁、CAS、读写分离等 |
| 锁粒度 | 粗粒度(整个容器) | 细粒度(部分数据) |
| 性能 | 高并发下性能较差 | 高并发下性能更好 |
| 迭代器 | fail-fast机制 | 弱一致性或快照迭代 |
| 阻塞操作 | 不支持 | 部分容器支持 |
| 原子复合操作 | 需要额外同步 | 部分容器原生支持 |
1.3 并发容器分类
- 并发Map
- 并发List/Set
- 并发队列
并发Map实现类
- ConcurrentHashMap:分段锁实现的高性能线程安全哈希表
- ConcurrentSkipListMap:基于跳表的有序并发Map,适合高并发读取
java
1// 创建并发Map2Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();3// 线程安全操作4concurrentMap.put("one", 1);5concurrentMap.putIfAbsent("two", 2); // 原子性的"如果不存在则放入"6// 复合操作7concurrentMap.compute("three", (k, v) -> (v == null) ? 3 : v + 1);并发List/Set实现类
- CopyOnWriteArrayList:写时复制的List实现,读多写少场景的理想选择
- CopyOnWriteArraySet:基于CopyOnWriteArrayList的Set实现
- ConcurrentSkipListSet:基于跳表的有序并发Set
java
1// 创建并发List2List<String> concurrentList = new CopyOnWriteArrayList<>();3// 安全的并发修改4concurrentList.add("item1");5// 遍历时不会抛出ConcurrentModificationException6for (String item : concurrentList) {7 System.out.println(item);8 concurrentList.add("newItem"); // 在遍历时修改是安全的9}并发队列实现类
阻塞队列
- ArrayBlockingQueue:基于数组的有界阻塞队列
- LinkedBlockingQueue:基于链表的可选有界阻塞队列
- PriorityBlockingQueue:支持优先级的无界阻塞队列
- DelayQueue:延迟元素队列,元素到期才能被取出
- SynchronousQueue:没有内部容量的阻塞队列,直接传递
非阻塞队列
- ConcurrentLinkedQueue:基于链表的无界非阻塞队列
- ConcurrentLinkedDeque:基于链表的无界非阻塞双端队列
java
1// 创建阻塞队列2BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(100);34// 生产者 - 可以阻塞5try {6 taskQueue.put(new Task()); // 如果队列满,会阻塞7} catch (InterruptedException e) {8 Thread.currentThread().interrupt();9}1011// 消费者 - 可以阻塞12try {13 Task task = taskQueue.take(); // 如果队列空,会阻塞14 processTask(task);15} catch (InterruptedException e) {16 Thread.currentThread().interrupt();17}2. ConcurrentHashMap详解
2.1 ConcurrentHashMap 原理
内部结构与工作原理
JDK 7实现:
- 使用分段锁(Segment)机制,将数据分为多个段
- 每个段独立加锁,减少锁竞争
- Segment继承自ReentrantLock
JDK 8+实现:
- 移除Segment概念,采用Node数组+链表+红黑树结构
- 使用CAS操作和synchronized实现更细粒度锁
- 当链表长度超过阈值(8)时转换为红黑树,提升性能
- 对桶(bucket)级别加锁,进一步减少锁竞争
2.2 基本用法
- 创建与基本操作
- 原子操作
- 批量操作
java
1import java.util.concurrent.ConcurrentHashMap;23// 创建ConcurrentHashMap4ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();56// 添加元素7map.put("apple", 10);8map.put("banana", 20);910// 获取元素11int appleCount = map.get("apple"); // 101213// 原子性的"如果不存在则添加"14map.putIfAbsent("orange", 15); // 添加成功,返回null15map.putIfAbsent("apple", 25); // 添加失败,返回101617// 移除元素18map.remove("banana"); // 返回201920// 原子性的"如果值匹配则移除"21boolean removed = map.remove("apple", 10); // 返回true,移除成功java
1// 原子性的替换操作2map.replace("orange", 15, 25); // 仅当当前值为15时,替换为2534// 计算API - 原子性的基于当前值计算新值5map.compute("apple", (key, value) -> 6 (value == null) ? 100 : value + 50); // 如果不存在,设为100;否则增加5078// 仅当键存在时计算9map.computeIfPresent("orange", (key, value) -> value * 2);1011// 仅当键不存在时计算12map.computeIfAbsent("grape", key -> 30);1314// 合并操作 - 结合现有值和给定值生成新值15map.merge("apple", 10, (oldValue, value) -> oldValue + value);java
1// 批量添加2ConcurrentHashMap<String, Integer> fruitPrices = new ConcurrentHashMap<>();3fruitPrices.put("apple", 100);4fruitPrices.put("banana", 80);56map.putAll(fruitPrices);78// 遍历 - 弱一致性9map.forEach((key, value) -> 10 System.out.println(key + ": " + value));1112// 使用键值计算值13map.forEach(2, (key, value) -> 14 key + "=" + value, // 转换函数15 results -> System.out.println("处理结果: " + results) // 结果处理16);1718// 规约操作19Integer sum = map.reduce(2, 20 (key, value) -> value, // 转换函数21 (v1, v2) -> v1 + v2 // 规约函数22);2.3 性能特性与最佳实践
- 性能特性
- 最佳实践
- 常见陷阱
ConcurrentHashMap的性能特点:
- 读操作完全并行:多线程可以同时读取,无阻塞
- 写操作局部锁定:仅锁定需要修改的部分,允许其他部分并发访问
- 弱一致性迭代器:迭代时不会抛出ConcurrentModificationException,但可能不反映最新修改
- 高度优化的并发访问:使用内部优化减少锁竞争
- 调整大小操作并发化:多个线程可同时参与扩容过程
java
1// 1. 避免对ConcurrentHashMap进行同步包装(没有必要且损害性能)2Map<String, String> map = new ConcurrentHashMap<>();3// 错误:不要这样做4Map<String, String> syncMap = Collections.synchronizedMap(map);56// 2. 使用原子操作替代"检查后执行"模式7// 不好:非原子的检查再更新8if (!map.containsKey("key")) {9 map.put("key", "value");10}11// 好:使用原子操作12map.putIfAbsent("key", "value");1314// 3. 利用ConcurrentHashMap提供的原子复合操作15map.compute("counter", (k, v) -> (v == null) ? 1 : Integer.parseInt(v) + 1);1617// 4. 合理设置初始容量,避免频繁扩容18// 预计元素数量为500,负载因子默认0.7519int initialCapacity = (int) (500 / 0.75) + 1;20Map<String, Object> optimizedMap = new ConcurrentHashMap<>(initialCapacity);java
1ConcurrentHashMap<String, List<String>> mapOfLists = new ConcurrentHashMap<>();23// 陷阱1: 虽然ConcurrentHashMap是线程安全的,但其中的值可能不是4mapOfLists.putIfAbsent("cities", new ArrayList<>()); // ArrayList不是线程安全的5// 正确做法:使用线程安全的集合作为值6mapOfLists.putIfAbsent("cities", new CopyOnWriteArrayList<>());78// 陷阱2: 复合操作仍需原子方法保护9// 不安全:10if (mapOfLists.containsKey("cities")) {11 mapOfLists.get("cities").add("New York"); // 不是原子操作12}1314// 安全:使用原子性compute操作15mapOfLists.compute("cities", (k, v) -> {16 List<String> cities = (v == null) ? new CopyOnWriteArrayList<>() : v;17 cities.add("New York");18 return cities;19});2021// 陷阱3: 迭代器的弱一致性可能看不到最新修改22// 迭代开始后的修改可能不会反映在当前迭代过程中3. CopyOnWriteArrayList 详解
3.1 写时复制机制
CopyOnWriteArrayList工作原理
CopyOnWriteArrayList的核心特性:
- 写时复制:每次修改操作都会创建底层数组的新副本
- 读操作无锁:读取操作不需要加锁,提供了最大程度的并发读取性能
- 写操作同步:写操作需要获取独占锁,一次只能有一个线程修改
- 适用场景:读多写少的场景,写操作频繁的场景性能较差
- 内存开销:每次修改都创建新数组,可能导致GC压力和内存使用增加
3.2 基本用法
- 基本操作
- 多线程场景
- 快照迭代器
java
1import java.util.concurrent.CopyOnWriteArrayList;23// 创建CopyOnWriteArrayList4List<String> cowList = new CopyOnWriteArrayList<>();56// 添加元素7cowList.add("Java");8cowList.add("Python");9cowList.addAll(Arrays.asList("Go", "Rust"));1011// 获取元素12String language = cowList.get(0); // "Java"1314// 迭代 - 安全,不会抛出ConcurrentModificationException15for (String lang : cowList) {16 System.out.println(lang);17 // 即使此处修改cowList,迭代器仍然基于原始快照18 cowList.add("新语言"); // 不会影响当前迭代19}2021// 修改元素22cowList.set(1, "Python 3");2324// 删除元素25cowList.remove("Go");26cowList.remove(0); // 删除第一个元素java
1CopyOnWriteArrayList<String> safeList = new CopyOnWriteArrayList<>();23// 多线程添加元素4ExecutorService executor = Executors.newFixedThreadPool(10);56for (int i = 0; i < 100; i++) {7 final int index = i;8 executor.submit(() -> {9 safeList.add("Item " + index);10 });11}1213executor.shutdown();14try {15 executor.awaitTermination(5, TimeUnit.SECONDS);16} catch (InterruptedException e) {17 Thread.currentThread().interrupt();18}1920// 线程安全的迭代21for (String item : safeList) {22 System.out.println(item);23}java
1CopyOnWriteArrayList<Integer> numbers = new CopyOnWriteArrayList<>(2 Arrays.asList(1, 2, 3, 4, 5));34// 获取迭代器 - 它反映当前的快照5Iterator<Integer> iterator = numbers.iterator();67// 在获取迭代器后修改列表8numbers.add(6);9numbers.remove(0);1011// 迭代器仍然反映原始状态12while (iterator.hasNext()) {13 Integer num = iterator.next();14 System.out.print(num + " "); // 输出 1 2 3 4 515 16 // 注意:CopyOnWriteArrayList的迭代器不支持修改操作17 // iterator.remove(); // 将抛出UnsupportedOperationException18}1920System.out.println();21// 新迭代器会反映最新状态22for (Integer num : numbers) {23 System.out.print(num + " "); // 输出 2 3 4 5 624}3.3 适用场景与最佳实践
java
1// 场景:事件监听器列表2public class EventManager {3 // 使用CopyOnWriteArrayList存储监听器 - 非常适合监听器模式4 private final List<EventListener> listeners = new CopyOnWriteArrayList<>();5 6 // 添加监听器 - 写操作,相对不频繁7 public void addListener(EventListener listener) {8 listeners.add(listener);9 }10 11 // 移除监听器 - 写操作,相对不频繁12 public void removeListener(EventListener listener) {13 listeners.remove(listener);14 }15 16 // 触发事件 - 读操作,频繁执行17 public void fireEvent(Event event) {18 // 安全迭代,即使有线程同时添加/移除监听器19 for (EventListener listener : listeners) {20 listener.onEvent(event);21 }22 }23}性能注意事项
CopyOnWriteArrayList 适用于读操作远多于写操作的场景。对于频繁写入的场景,它的性能会显著下降,因为:
- 每次写操作都会复制整个底层数组
- 写操作需要获取独占锁,导致写操作串行化
- 内存使用率高,可能增加GC压力
在元素数量较大且修改频繁的场景,应考虑使用其他并发容器。
4. 阻塞队列
4.1 BlockingQueue 接口
阻塞队列的核心操作对比
| 操作类型 | 抛出异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除 | remove() | poll() | take() | poll(time, unit) |
| 检查 | element() | peek() | 不适用 | 不适用 |
操作行为说明:
- 抛出异常:队列满/空时抛出异常
- 返回特殊值:队列满返回false,队列空返回null
- 阻塞:队列满/空时阻塞等待
- 超时:阻塞指定时间后仍无法操作则返回特殊值
- 基本用法
- 生产者-消费者
- 优先级队列
java
1import java.util.concurrent.*;23public class BlockingQueueExample {4 public static void main(String[] args) throws InterruptedException {5 // 创建有界阻塞队列 - 容量为56 BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);7 8 // 添加元素 - 多种方式9 queue.add("元素1"); // 成功添加,队列未满10 queue.offer("元素2"); // 成功添加,返回true11 queue.put("元素3"); // 添加元素,可能阻塞12 queue.offer("元素4", 1, TimeUnit.SECONDS); // 添加元素,最多等待1秒13 14 // 检索元素但不移除15 String peek = queue.peek(); // 查看队首元素16 System.out.println("队首元素: " + peek);17 18 // 移除元素 - 多种方式19 String item1 = queue.remove(); // 移除并返回队首元素,队列为空时抛异常20 String item2 = queue.poll(); // 移除并返回队首元素,队列为空时返回null21 String item3 = queue.take(); // 移除并返回队首元素,队列为空时阻塞22 String item4 = queue.poll(1, TimeUnit.SECONDS); // 移除元素,最多等待1秒23 24 System.out.println("已移除: " + item1 + ", " + item2 + ", " + item3 + ", " + item4);25 26 // 检查队列状态27 System.out.println("队列是否为空: " + queue.isEmpty());28 System.out.println("队列元素数量: " + queue.size());29 System.out.println("队列是否包含'元素1': " + queue.contains("元素1"));30 }31}java
1import java.util.concurrent.*;2import java.util.concurrent.atomic.AtomicInteger;34public class ProducerConsumerExample {5 public static void main(String[] args) {6 // 创建有界阻塞队列7 BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);8 9 // 创建生产者线程10 Thread producer = new Thread(new Producer(queue));11 12 // 创建消费者线程13 Thread consumer = new Thread(new Consumer(queue));14 15 // 启动线程16 producer.start();17 consumer.start();18 }19 20 // 生产者类21 static class Producer implements Runnable {22 private final BlockingQueue<Integer> queue;23 private final AtomicInteger counter = new AtomicInteger();24 25 public Producer(BlockingQueue<Integer> queue) {26 this.queue = queue;27 }28 29 @Override30 public void run() {31 try {32 while (!Thread.currentThread().isInterrupted()) {33 int num = counter.incrementAndGet();34 // 使用put方法,如果队列满则阻塞35 queue.put(num);36 System.out.println("生产: " + num);37 TimeUnit.MILLISECONDS.sleep(100); // 生产速率控制38 }39 } catch (InterruptedException e) {40 Thread.currentThread().interrupt();41 }42 }43 }44 45 // 消费者类46 static class Consumer implements Runnable {47 private final BlockingQueue<Integer> queue;48 49 public Consumer(BlockingQueue<Integer> queue) {50 this.queue = queue;51 }52 53 @Override54 public void run() {55 try {56 while (!Thread.currentThread().isInterrupted()) {57 // 使用take方法,如果队列空则阻塞58 Integer value = queue.take();59 System.out.println("消费: " + value);60 TimeUnit.MILLISECONDS.sleep(200); // 消费速率控制61 }62 } catch (InterruptedException e) {63 Thread.currentThread().interrupt();64 }65 }66 }67}java
1import java.util.concurrent.*;2import java.util.Comparator;34public class PriorityBlockingQueueExample {5 public static void main(String[] args) throws InterruptedException {6 // 创建优先级阻塞队列,使用自定义比较器7 BlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>(8 11, Comparator.comparingInt(Task::getPriority).reversed());9 10 // 添加不同优先级的任务11 priorityQueue.put(new Task(5, "普通任务"));12 priorityQueue.put(new Task(1, "低优先级任务"));13 priorityQueue.put(new Task(10, "高优先级任务"));14 priorityQueue.put(new Task(7, "中高优先级任务"));15 priorityQueue.put(new Task(3, "中低优先级任务"));16 17 System.out.println("任务将按优先级顺序执行(高到低):");18 19 // 取出并处理所有任务(会按优先级顺序取出)20 while (!priorityQueue.isEmpty()) {21 Task task = priorityQueue.take();22 System.out.println("执行: " + task);23 // 模拟任务执行24 TimeUnit.MILLISECONDS.sleep(100);25 }26 }27 28 // 带优先级的任务类29 static class Task {30 private final int priority;31 private final String name;32 33 public Task(int priority, String name) {34 this.priority = priority;35 this.name = name;36 }37 38 public int getPriority() {39 return priority;40 }41 42 @Override43 public String toString() {44 return name + " (优先级: " + priority + ")";45 }46 }47}4.2 ArrayBlockingQueue
ArrayBlockingQueue特点
核心特性:
- 基于数组实现的有界阻塞队列
- 创建时必须指定容量
- 按照FIFO(先进先出)原则对元素进行排序
- 使用单一的锁来控制对队列的访问
- 支持公平策略
适用场景:
- 明确知道队列大小上限的场景
- 需要FIFO顺序的场景
- 生产者和消费者速度相近的情况
- 需要限制系统资源使用的情况
4.3 LinkedBlockingQueue
LinkedBlockingQueue特点
核心特性:
- 基于链表实现的可选有界阻塞队列
- 默认容量为
Integer.MAX_VALUE(可视为无界) - 可以指定容量使其成为有界队列
- 使用两个锁(takeLock和putLock)分别控制入队和出队操作
- 相比ArrayBlockingQueue,并发性能更好
适用场景:
- 不确定队列大小上限的场景
- 生产者和消费者速度差异较大的场景
- 需要更高并发吞吐量的场景
java
1// 创建无界LinkedBlockingQueue2BlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>();34// 创建有界LinkedBlockingQueue(容量1000)5BlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(1000);67// 两个独立的锁提高并发性8// putLock控制入队9// takeLock控制出队10// 两个操作可以并发执行4.4 DelayQueue
DelayQueue特点
核心特性:
- 无界阻塞延迟队列
- 元素只有在其指定的延迟时间到期后才能被取出
- 队列头部是延迟最先到期的元素
- 元素必须实现
Delayed接口 - 如果没有元素到期,take()方法会阻塞
适用场景:
- 定时任务调度
- 缓存过期策略实现
- 请求超时处理
- 限流算法实现
- 基本用法
- 缓存实现
java
1import java.util.concurrent.*;23public class DelayQueueExample {4 public static void main(String[] args) throws InterruptedException {5 // 创建DelayQueue6 DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();7 8 // 添加延迟任务(当前时间 + 指定延迟)9 long now = System.currentTimeMillis();10 delayQueue.put(new DelayedTask("Task1", now + 2000)); // 延迟2秒11 delayQueue.put(new DelayedTask("Task2", now + 5000)); // 延迟5秒12 delayQueue.put(new DelayedTask("Task3", now + 1000)); // 延迟1秒13 delayQueue.put(new DelayedTask("Task4", now + 3000)); // 延迟3秒14 15 System.out.println("所有任务已添加到队列");16 17 // 按延迟时间顺序取出任务执行18 while (!delayQueue.isEmpty()) {19 // take() 会一直阻塞直到有可用元素20 DelayedTask task = delayQueue.take();21 System.out.println(System.currentTimeMillis() - now + 22 "ms 后执行: " + task);23 }24 }25 26 // 实现Delayed接口的任务类27 static class DelayedTask implements Delayed {28 private final String name;29 private final long executeTime;30 31 public DelayedTask(String name, long executeTime) {32 this.name = name;33 this.executeTime = executeTime;34 }35 36 @Override37 public long getDelay(TimeUnit unit) {38 // 返回剩余延迟时间39 return unit.convert(executeTime - System.currentTimeMillis(), 40 TimeUnit.MILLISECONDS);41 }42 43 @Override44 public int compareTo(Delayed other) {45 // 比较剩余延迟时间46 if (other == this) {47 return 0;48 }49 50 if (other instanceof DelayedTask) {51 DelayedTask otherTask = (DelayedTask) other;52 return Long.compare(this.executeTime, otherTask.executeTime);53 }54 55 return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), 56 other.getDelay(TimeUnit.MILLISECONDS));57 }58 59 @Override60 public String toString() {61 return name;62 }63 }64}java
1import java.util.concurrent.*;2import java.util.Map;3import java.util.HashMap;45/**6 * 使用DelayQueue实现简单的缓存过期策略7 */8public class DelayQueueCache<K, V> {9 // 存储缓存内容10 private final Map<K, V> cache = new ConcurrentHashMap<>();11 // 存储过期条目12 private final DelayQueue<DelayedItem<K>> expirationQueue = new DelayQueue<>();13 14 /**15 * 添加缓存项,并设置过期时间16 */17 public void put(K key, V value, long expiryTimeMillis) {18 // 移除已有的相同key的过期项19 removeIfExists(key);20 21 // 添加到缓存22 cache.put(key, value);23 24 // 添加过期条目到延迟队列25 expirationQueue.put(new DelayedItem<>(key, expiryTimeMillis));26 }27 28 /**29 * 获取缓存项,如果已过期则返回null30 */31 public V get(K key) {32 // 如果缓存中不存在,直接返回null33 if (!cache.containsKey(key)) {34 return null;35 }36 37 // 如果已过期,返回null38 if (isExpired(key)) {39 cache.remove(key);40 return null;41 }42 43 return cache.get(key);44 }45 46 /**47 * 检查是否已过期48 */49 private boolean isExpired(K key) {50 // 遍历所有已过期的条目并删除51 DelayedItem<K> delayedItem = expirationQueue.peek();52 while (delayedItem != null && delayedItem.getDelay(TimeUnit.MILLISECONDS) <= 0) {53 expirationQueue.poll(); // 移除过期条目54 cache.remove(delayedItem.getKey()); // 从缓存中删除55 56 // 如果这就是我们要找的key,说明已过期57 if (delayedItem.getKey().equals(key)) {58 return true;59 }60 61 delayedItem = expirationQueue.peek();62 }63 64 return false;65 }66 67 /**68 * 移除现有的过期项69 */70 private void removeIfExists(K key) {71 // 先从缓存中移除72 cache.remove(key);73 74 // 尝试从过期队列中移除75 // 注意:DelayQueue无法通过键直接删除元素,需要创建专门的移除线程76 // 这里简化处理,实际应用中可以使用更高效的方法77 }78 79 /**80 * 清空过期缓存的任务81 */82 public void startExpiryProcessor() {83 Thread processor = new Thread(() -> {84 while (!Thread.currentThread().isInterrupted()) {85 try {86 // 取出并移除一个过期元素(如果没有过期元素,会阻塞)87 DelayedItem<K> expiredItem = expirationQueue.take();88 // 从缓存中删除89 cache.remove(expiredItem.getKey());90 System.out.println("过期移除: " + expiredItem.getKey());91 } catch (InterruptedException e) {92 Thread.currentThread().interrupt();93 }94 }95 });96 97 // 设置为守护线程98 processor.setDaemon(true);99 processor.start();100 }101 102 /**103 * Delayed实现类,用于缓存过期104 */105 static class DelayedItem<T> implements Delayed {106 private final T key;107 private final long expiryTime;108 109 public DelayedItem(T key, long delayMillis) {110 this.key = key;111 this.expiryTime = System.currentTimeMillis() + delayMillis;112 }113 114 public T getKey() {115 return key;116 }117 118 @Override119 public long getDelay(TimeUnit unit) {120 return unit.convert(expiryTime - System.currentTimeMillis(), 121 TimeUnit.MILLISECONDS);122 }123 124 @Override125 public int compareTo(Delayed other) {126 if (other == this) {127 return 0;128 }129 130 long diff = getDelay(TimeUnit.MILLISECONDS) - 131 other.getDelay(TimeUnit.MILLISECONDS);132 return Long.compare(diff, 0);133 }134 }135 136 // 示例用法137 public static void main(String[] args) throws InterruptedException {138 DelayQueueCache<String, String> cache = new DelayQueueCache<>();139 140 // 启动过期处理器141 cache.startExpiryProcessor();142 143 // 添加缓存项144 cache.put("key1", "value1", 2000); // 2秒后过期145 cache.put("key2", "value2", 5000); // 5秒后过期146 cache.put("key3", "value3", 1000); // 1秒后过期147 148 // 读取缓存149 System.out.println("key1: " + cache.get("key1"));150 System.out.println("key2: " + cache.get("key2"));151 System.out.println("key3: " + cache.get("key3"));152 153 // 等待一段时间后再次读取154 Thread.sleep(3000);155 System.out.println("3秒后...");156 System.out.println("key1: " + cache.get("key1")); // 已过期157 System.out.println("key2: " + cache.get("key2")); // 未过期158 System.out.println("key3: " + cache.get("key3")); // 已过期159 160 // 再次等待161 Thread.sleep(3000);162 System.out.println("再过3秒后...");163 System.out.println("key2: " + cache.get("key2")); // 已过期164 }165}4.5 SynchronousQueue
SynchronousQueue特点
核心特性:
- 特殊的阻塞队列,内部容量为零
- 不存储元素,每个插入操作必须等待对应的移除操作
- 直接从生产者传递给消费者("直通车"队列)
- 支持公平和非公平两种模式
- 适用于"交接"场景
适用场景:
- 需要即时交付任务的场景
- 生产者和消费者需要"手递手"交接的场景
- Executors.newCachedThreadPool()使用SynchronousQueue作为工作队列
java
1// SynchronousQueue示例2public class SynchronousQueueExample {3 public static void main(String[] args) {4 // 创建SynchronousQueue (默认非公平模式)5 BlockingQueue<String> syncQueue = new SynchronousQueue<>();6 // 也可以指定为公平模式: new SynchronousQueue<>(true);7 8 // 消费者线程9 new Thread(() -> {10 try {11 // 模拟消费者延迟12 Thread.sleep(2000);13 14 // 取元素15 System.out.println("消费者准备取元素: " + System.currentTimeMillis());16 String item = syncQueue.take();17 System.out.println("消费者已取到元素: " + item + " - " + System.currentTimeMillis());18 19 // 再次取元素20 Thread.sleep(2000);21 System.out.println("消费者再次准备取元素: " + System.currentTimeMillis());22 item = syncQueue.take();23 System.out.println("消费者已取到元素: " + item + " - " + System.currentTimeMillis());24 } catch (InterruptedException e) {25 Thread.currentThread().interrupt();26 }27 }).start();28 29 // 生产者线程30 new Thread(() -> {31 try {32 // 存入元素33 System.out.println("生产者准备放入元素: " + System.currentTimeMillis());34 syncQueue.put("元素A");35 System.out.println("生产者已放入元素A - " + System.currentTimeMillis());36 37 // 再次存入元素38 System.out.println("生产者准备放入元素: " + System.currentTimeMillis());39 syncQueue.put("元素B");40 System.out.println("生产者已放入元素B - " + System.currentTimeMillis());41 } catch (InterruptedException e) {42 Thread.currentThread().interrupt();43 }44 }).start();45 }46}5. ConcurrentLinkedQueue详解
5.1 ConcurrentLinkedQueue 基本概念
ConcurrentLinkedQueue是一个无界线程安全的队列,基于链表实现。
ConcurrentLinkedQueue基本用法示例
java
1import java.util.concurrent.ConcurrentLinkedQueue;2import java.util.concurrent.ExecutorService;3import java.util.concurrent.Executors;45public class ConcurrentLinkedQueueExamples {6 7 /**8 * ConcurrentLinkedQueue基本用法9 */10 public static class BasicUsage {11 public static void main(String[] args) {12 ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();13 ExecutorService executor = Executors.newFixedThreadPool(4);14 15 System.out.println("=== ConcurrentLinkedQueue基本用法 ===");16 17 // 多个生产者18 for (int i = 0; i < 2; i++) {19 final int producerId = i;20 executor.submit(() -> {21 for (int j = 0; j < 5; j++) {22 String item = "Producer" + producerId + "-Item" + j;23 queue.offer(item);24 System.out.println("生产者" + producerId + "放入: " + item);25 try {26 Thread.sleep(200);27 } catch (InterruptedException e) {28 Thread.currentThread().interrupt();29 }30 }31 });32 }33 34 // 多个消费者35 for (int i = 0; i < 2; i++) {36 final int consumerId = i;37 executor.submit(() -> {38 for (int j = 0; j < 5; j++) {39 String item = queue.poll();40 if (item != null) {41 System.out.println("消费者" + consumerId + "取出: " + item);42 }43 try {44 Thread.sleep(300);45 } catch (InterruptedException e) {46 Thread.currentThread().interrupt();47 }48 }49 });50 }51 52 executor.shutdown();53 }54 }55 56 /**57 * ConcurrentLinkedQueue实际应用场景58 */59 public static class PracticalApplications {60 61 /**62 * 消息队列实现63 */64 public static class MessageQueue {65 private final ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<>();66 67 public void sendMessage(Message message) {68 queue.offer(message);69 }70 71 public Message receiveMessage() {72 return queue.poll();73 }74 75 public boolean isEmpty() {76 return queue.isEmpty();77 }78 79 public int size() {80 return queue.size();81 }82 83 static class Message {84 private final String id;85 private final String content;86 private final long timestamp;87 88 public Message(String id, String content) {89 this.id = id;90 this.content = content;91 this.timestamp = System.currentTimeMillis();92 }93 94 @Override95 public String toString() {96 return "Message{id='" + id + "', content='" + content + "', timestamp=" + timestamp + "}";97 }98 }99 }100 }101}6. 其他并发容器
6.1 ConcurrentSkipListMap
ConcurrentSkipListMap基本用法示例
java
1import java.util.concurrent.ConcurrentSkipListMap;23public class ConcurrentSkipListMapExamples {4 5 /**6 * ConcurrentSkipListMap基本用法7 */8 public static class BasicUsage {9 public static void main(String[] args) {10 ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();11 12 System.out.println("=== ConcurrentSkipListMap基本用法 ===");13 14 // 添加元素(自动排序)15 map.put("zebra", 1);16 map.put("apple", 2);17 map.put("banana", 3);18 map.put("cat", 4);19 20 System.out.println("排序后的Map:");21 map.forEach((key, value) -> System.out.println(key + " = " + value));22 23 // 获取第一个和最后一个元素24 System.out.println("第一个元素: " + map.firstKey());25 System.out.println("最后一个元素: " + map.lastKey());26 27 // 获取子Map28 System.out.println("a到c之间的元素:");29 map.subMap("a", "d").forEach((key, value) -> 30 System.out.println(key + " = " + value));31 }32 }33}6.2 ConcurrentSkipListSet
ConcurrentSkipListSet基本用法示例
java
1import java.util.concurrent.ConcurrentSkipListSet;23public class ConcurrentSkipListSetExamples {4 5 /**6 * ConcurrentSkipListSet基本用法7 */8 public static class BasicUsage {9 public static void main(String[] args) {10 ConcurrentSkipListSet<String> set = new ConcurrentSkipListSet<>();11 12 System.out.println("=== ConcurrentSkipListSet基本用法 ===");13 14 // 添加元素(自动排序)15 set.add("zebra");16 set.add("apple");17 set.add("banana");18 set.add("cat");19 20 System.out.println("排序后的Set:");21 set.forEach(System.out::println);22 23 // 获取第一个和最后一个元素24 System.out.println("第一个元素: " + set.first());25 System.out.println("最后一个元素: " + set.last());26 27 // 获取子Set28 System.out.println("a到c之间的元素:");29 set.subSet("a", "d").forEach(System.out::println);30 }31 }32}7. 性能比较
7.1 不同容器的性能特点
容器性能比较示例
java
1import java.util.*;2import java.util.concurrent.*;34public class ContainerPerformanceComparison {5 6 /**7 * 容器性能比较8 */9 public static void main(String[] args) {10 System.out.println("=== 容器性能比较 ===");11 12 // HashMap vs ConcurrentHashMap13 System.out.println("=== HashMap vs ConcurrentHashMap ===");14 15 // HashMap(非线程安全)16 Map<String, Integer> hashMap = new HashMap<>();17 long start = System.currentTimeMillis();18 for (int i = 0; i < 100000; i++) {19 hashMap.put("key" + i, i);20 }21 System.out.println("HashMap写入时间: " + (System.currentTimeMillis() - start) + "ms");22 23 // ConcurrentHashMap(线程安全)24 Map<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();25 start = System.currentTimeMillis();26 for (int i = 0; i < 100000; i++) {27 concurrentHashMap.put("key" + i, i);28 }29 System.out.println("ConcurrentHashMap写入时间: " + (System.currentTimeMillis() - start) + "ms");30 31 // ArrayList vs CopyOnWriteArrayList32 System.out.println("\n=== ArrayList vs CopyOnWriteArrayList ===");33 34 // ArrayList(非线程安全)35 List<String> arrayList = new ArrayList<>();36 start = System.currentTimeMillis();37 for (int i = 0; i < 10000; i++) {38 arrayList.add("item" + i);39 }40 System.out.println("ArrayList写入时间: " + (System.currentTimeMillis() - start) + "ms");41 42 // CopyOnWriteArrayList(线程安全)43 List<String> copyOnWriteArrayList = new CopyOnWriteArrayList<>();44 start = System.currentTimeMillis();45 for (int i = 0; i < 10000; i++) {46 copyOnWriteArrayList.add("item" + i);47 }48 System.out.println("CopyOnWriteArrayList写入时间: " + (System.currentTimeMillis() - start) + "ms");49 }50}8. 最佳实践
8.1 选择合适的容器
容器选择指南示例
java
1public class ContainerSelectionGuide {2 3 /**4 * 容器选择指南5 */6 public static void selectionGuide() {7 System.out.println("=== 并发容器选择指南 ===");8 9 // 高并发读写 - 使用ConcurrentHashMap10 System.out.println("1. 高并发读写 -> ConcurrentHashMap");11 System.out.println(" 适用场景:缓存、计数器、配置管理");12 13 // 读多写少 - 使用CopyOnWriteArrayList14 System.out.println("2. 读多写少 -> CopyOnWriteArrayList");15 System.out.println(" 适用场景:监听器列表、配置列表");16 17 // 生产者消费者 - 使用BlockingQueue18 System.out.println("3. 生产者消费者 -> BlockingQueue");19 System.out.println(" 适用场景:任务队列、消息队列");20 21 // 需要排序 - 使用ConcurrentSkipListMap22 System.out.println("4. 需要排序 -> ConcurrentSkipListMap");23 System.out.println(" 适用场景:有序缓存、排行榜");24 25 // 简单同步 - 使用Collections.synchronizedXXX()26 System.out.println("5. 简单同步 -> Collections.synchronizedXXX()");27 System.out.println(" 适用场景:低并发场景");28 }29 30 // 高并发读写 - 使用ConcurrentHashMap31 public static <K, V> Map<K, V> createHighConcurrencyMap() {32 return new ConcurrentHashMap<>();33 }34 35 // 读多写少 - 使用CopyOnWriteArrayList36 public static <T> List<T> createReadHeavyList() {37 return new CopyOnWriteArrayList<>();38 }39 40 // 生产者消费者 - 使用BlockingQueue41 public static <T> BlockingQueue<T> createProducerConsumerQueue() {42 return new LinkedBlockingQueue<>();43 }44 45 // 需要排序 - 使用ConcurrentSkipListMap46 public static <K extends Comparable<K>, V> Map<K, V> createSortedMap() {47 return new ConcurrentSkipListMap<>();48 }49}8.2 避免常见陷阱
常见陷阱示例
java
1public class CommonPitfalls {2 3 /**4 * 避免常见陷阱5 */6 public static void avoidPitfalls() {7 System.out.println("=== 避免常见陷阱 ===");8 9 // 错误:在迭代时修改集合10 System.out.println("1. 避免在迭代时修改集合");11 List<String> list = new CopyOnWriteArrayList<>();12 list.add("a");13 list.add("b");14 list.add("c");15 16 // 这样是安全的,因为CopyOnWriteArrayList在迭代时创建副本17 for (String item : list) {18 list.add("new"); // 不会影响当前迭代19 }20 21 // 正确:使用原子操作22 System.out.println("2. 使用原子操作");23 ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();24 25 // 使用原子操作而不是检查然后设置26 map.computeIfAbsent("key", k -> 1);27 28 // 而不是29 // if (!map.containsKey("key")) {30 // map.put("key", 1);31 // }32 33 // 正确:避免过度同步34 System.out.println("3. 避免过度同步");35 // 使用并发容器而不是手动同步36 Map<String, String> safeMap = new ConcurrentHashMap<>();37 // 而不是38 // Map<String, String> unsafeMap = Collections.synchronizedMap(new HashMap<>());39 }40}9. 面试题
9.1 基础概念
Q: ConcurrentHashMap和Hashtable有什么区别?
A:
- Hashtable:使用synchronized关键字,锁粒度大,性能较差
- ConcurrentHashMap:使用分段锁,锁粒度小,性能更好
- Hashtable:不允许null键值,ConcurrentHashMap允许null值
- ConcurrentHashMap:迭代器是弱一致性的
Q: CopyOnWriteArrayList适用于什么场景?
A:
- 读多写少的场景
- 监听器列表等需要频繁遍历但很少修改的场景
- 写操作会创建新副本,内存开销较大
- 迭代器不会抛出ConcurrentModificationException
9.2 性能相关
Q: BlockingQueue的几种实现有什么区别?
A:
- ArrayBlockingQueue:有界队列,基于数组
- LinkedBlockingQueue:有界或无界队列,基于链表
- PriorityBlockingQueue:无界优先级队列
- SynchronousQueue:不存储元素的阻塞队列
Q: 如何选择合适的并发容器?
A:
- 高并发读写:ConcurrentHashMap
- 读多写少:CopyOnWriteArrayList
- 生产者消费者:BlockingQueue
- 需要排序:ConcurrentSkipListMap
- 简单同步:Collections.synchronizedXXX()
9.3 实际应用
Q: ConcurrentHashMap的size()方法是如何实现的?
A:
- 遍历所有段,累加每个段的元素数量
- 由于并发修改,size()返回的是近似值
- 如果需要精确值,可以使用mappingCount()方法
- 在高并发环境下,size()的性能可能不如预期
Q: 如何避免并发容器的常见问题?
A:
- 使用原子操作而不是检查然后设置
- 避免在迭代时修改集合(除了CopyOnWriteArrayList)
- 合理选择容器类型
- 注意内存开销
10. 总结
Java并发容器为多线程编程提供了强大而高效的支持。
10.1 关键要点
- 容器特性:ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue等
- 性能特点:不同容器适用于不同场景
- 选择原则:根据读写比例、并发程度、功能需求选择
- 最佳实践:避免常见陷阱,合理使用
10.2 选择建议
| 场景 | 推荐容器 | 原因 |
|---|---|---|
| 高并发读写 | ConcurrentHashMap | 分段锁,性能好 |
| 读多写少 | CopyOnWriteArrayList | 写时复制,读性能好 |
| 生产者消费者 | BlockingQueue | 阻塞操作,线程安全 |
| 需要排序 | ConcurrentSkipListMap | 自动排序,并发安全 |
| 简单同步 | Collections.synchronizedXXX() | 简单易用 |
10.3 学习建议
- 理解原理:深入理解各种容器的实现原理
- 性能测试:对比不同容器的性能差异
- 场景应用:在实际项目中应用合适的容器
- 持续学习:关注新的并发容器技术
通过深入理解和熟练运用这些并发容器,我们能够构建出更加高效、健壮和可维护的Java并发应用程序。
评论