Java 线程池详解
线程池是Java并发编程中的重要组件,它通过复用线程来减少线程创建和销毁的开销,提高系统的性能和稳定性。本文将详细介绍Java线程池的原理、配置和最佳实践。
1. 线程池概述
1.1 什么是线程池?
核心概念
线程池是一种线程复用机制,它预先创建一定数量的线程,并将这些线程放入池中。当有任务需要执行时,从池中取出一个线程来执行任务,任务执行完毕后,线程返回池中等待下一个任务。
1.2 线程池的优势
| 优势 | 具体体现 | 业务价值 |
|---|---|---|
| 减少开销 | 避免频繁创建和销毁线程 | 提高系统性能 |
| 控制并发数 | 限制同时运行的线程数量 | 防止资源耗尽 |
| 提高响应速度 | 任务到达时立即执行 | 提升用户体验 |
| 统一管理 | 集中管理线程生命周期 | 简化编程模型 |
| 提供监控 | 可以监控线程池状态 | 便于运维管理 |
1.3 线程池的核心组件
线程池核心组件示例
java
1public class ThreadPoolComponents {2 3 /**4 * 线程池的核心组件5 */6 public static void explainComponents() {7 System.out.println("=== 线程池核心组件 ===");8 9 // 1. 核心线程数 (corePoolSize)10 // 线程池中会维护的最小线程数量11 int corePoolSize = 5;12 System.out.println("核心线程数: " + corePoolSize);13 14 // 2. 最大线程数 (maximumPoolSize)15 // 线程池中允许的最大线程数量16 int maximumPoolSize = 10;17 System.out.println("最大线程数: " + maximumPoolSize);18 19 // 3. 工作队列 (workQueue)20 // 用于存储等待执行的任务21 BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100);22 System.out.println("工作队列容量: " + workQueue.remainingCapacity());23 24 // 4. 线程工厂 (threadFactory)25 // 用于创建新线程的工厂26 ThreadFactory threadFactory = r -> {27 Thread t = new Thread(r);28 t.setName("CustomThread-" + t.getId());29 return t;30 };31 System.out.println("线程工厂: " + threadFactory.getClass().getSimpleName());32 33 // 5. 拒绝策略 (rejectedExecutionHandler)34 // 当线程池和队列都满时的处理策略35 RejectedExecutionHandler rejectedHandler = new ThreadPoolExecutor.AbortPolicy();36 System.out.println("拒绝策略: " + rejectedHandler.getClass().getSimpleName());37 38 // 6. 线程存活时间 (keepAliveTime)39 // 非核心线程的空闲存活时间40 long keepAliveTime = 60L;41 System.out.println("线程存活时间: " + keepAliveTime + "秒");42 }43}2. Executor 框架
2.1 Executor 接口体系
- Executor
- ExecutorService
- ScheduledExecutorService
java
1// 最简单的Executor接口实现2Executor directExecutor = new Executor() {3 @Override4 public void execute(Runnable command) {5 // 直接在当前线程执行任务6 command.run();7 }8};910// 使用Executor11directExecutor.execute(() -> {12 System.out.println("在当前线程执行的任务");13});java
1ExecutorService executorService = Executors.newFixedThreadPool(2);23// 提交任务并获取Future4Future<String> future = executorService.submit(() -> {5 Thread.sleep(1000);6 return "任务完成";7});89// 获取结果(阻塞等待)10try {11 String result = future.get();12 System.out.println(result);13} catch (Exception e) {14 e.printStackTrace();15}1617// 生命周期管理18executorService.shutdown();19boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS);20System.out.println("是否在5秒内终止: " + terminated);2122// 如果需要立即终止23if (!terminated) {24 List<Runnable> unfinishedTasks = executorService.shutdownNow();25 System.out.println("未完成的任务数: " + unfinishedTasks.size());26}java
1ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);23// 延迟执行一次4ScheduledFuture<?> delayedTask = scheduler.schedule(5 () -> System.out.println("延迟3秒执行"), 6 3, 7 TimeUnit.SECONDS8);910// 固定频率执行(从任务开始时计时)11ScheduledFuture<?> fixedRateTask = scheduler.scheduleAtFixedRate(12 () -> System.out.println("每2秒执行一次"), 13 0, // 初始延迟14 2, // 周期15 TimeUnit.SECONDS16);1718// 固定延迟执行(从上一次任务结束时计时)19ScheduledFuture<?> fixedDelayTask = scheduler.scheduleWithFixedDelay(20 () -> System.out.println("上一次任务结束后延迟1秒再执行"), 21 0, // 初始延迟22 1, // 延迟时间23 TimeUnit.SECONDS24);2526// 取消任务27delayedTask.cancel(false);2829// 安全关闭30scheduler.shutdown();2.2 预定义线程池
线程池类型比较
| 线程池类型 | 核心线程数 | 最大线程数 | 空闲线程生存时间 | 工作队列 | 适用场景 |
|---|---|---|---|---|---|
| FixedThreadPool | 固定值n | 固定值n | 0秒 | LinkedBlockingQueue (无界) | 需要限制线程数的 CPU密集型任务 |
| CachedThreadPool | 0 | Integer.MAX_VALUE | 60秒 | SynchronousQueue (同步交付) | 执行大量短生命周期的 异步任务 |
| SingleThreadExecutor | 1 | 1 | 0秒 | LinkedBlockingQueue (无界) | 需要保证 顺序执行的任务 |
| ScheduledThreadPool | 指定值n | Integer.MAX_VALUE | 0秒 | DelayedWorkQueue (延时队列) | 需要定时 或周期性执行的任务 |
| WorkStealingPool | 并行度 | 并行度 | - | 工作窃取队列 | 需要充分利用 多核CPU的任务 |
重要说明:
- FixedThreadPool使用无界队列可能导致OOM(队列过大)
- CachedThreadPool允许无限创建线程可能导致OOM(线程过多)
- SingleThreadExecutor使用无界队列可能导致OOM(队列过大)
- 实际生产环境中通常应该使用ThreadPoolExecutor自定义线程池参数
- FixedThreadPool
- CachedThreadPool
- SingleThreadExecutor
- ScheduledThreadPool
- WorkStealingPool
特点:
- 固定数量线程池,核心线程数=最大线程数
- 使用无界队列存储待执行任务
- 线程不会超时终止(keepAliveTime = 0)
- 任务顺序执行,不支持优先级
创建方式:
java
1// 创建含有5个线程的固定大小线程池2ExecutorService fixedPool = Executors.newFixedThreadPool(5);34// 指定线程工厂5ExecutorService fixedPoolWithFactory = Executors.newFixedThreadPool(6 5,7 new ThreadFactory() {8 @Override9 public Thread newThread(Runnable r) {10 Thread t = new Thread(r);11 t.setName("CustomFixedPool-" + t.getId());12 return t;13 }14 }15);内部实现:
java
1// 等价于2new ThreadPoolExecutor(3 nThreads, // 核心线程数4 nThreads, // 最大线程数5 0L, TimeUnit.MILLISECONDS, // 线程空闲时间(永不回收)6 new LinkedBlockingQueue<Runnable>() // 无界队列7);特点:
- 可缓存的线程池,按需创建线程
- 核心线程数为0,最大线程数为Integer.MAX_VALUE
- 线程空闲60秒后回收
- 使用SynchronousQueue直接交付任务给线程
创建方式:
java
1// 创建可缓存线程池2ExecutorService cachedPool = Executors.newCachedThreadPool();34// 指定线程工厂5ExecutorService cachedPoolWithFactory = Executors.newCachedThreadPool(6 new ThreadFactory() {7 @Override8 public Thread newThread(Runnable r) {9 Thread t = new Thread(r);10 t.setDaemon(true); // 设置为守护线程11 return t;12 }13 }14);内部实现:
java
1// 等价于2new ThreadPoolExecutor(3 0, // 核心线程数4 Integer.MAX_VALUE, // 最大线程数5 60L, TimeUnit.SECONDS, // 线程空闲时间6 new SynchronousQueue<Runnable>() // 同步队列(直接交付)7);特点:
- 单线程执行器,只有一个工作线程
- 所有任务保证按提交顺序执行
- 使用无界队列存储待执行任务
- 任务串行执行,适合要求顺序的场景
创建方式:
java
1// 创建单线程执行器2ExecutorService singlePool = Executors.newSingleThreadExecutor();34// 指定线程工厂5ExecutorService singlePoolWithFactory = Executors.newSingleThreadExecutor(6 r -> {7 Thread t = new Thread(r);8 t.setName("SingleWorker");9 return t;10 }11);内部实现:
java
1// 注意:实际上使用了FinalizableDelegatedExecutorService包装2// 等价于3new ThreadPoolExecutor(4 1, // 核心线程数5 1, // 最大线程数6 0L, TimeUnit.MILLISECONDS, // 线程空闲时间7 new LinkedBlockingQueue<Runnable>() // 无界队列8);特点:
- 支持定时和周期性任务执行
- 核心线程数固定,最大线程数无限
- 使用DelayedWorkQueue存储定时任务
- 可以替代Timer,更灵活且线程安全
创建方式:
java
1// 创建包含2个线程的定时任务线程池2ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);34// 单线程定时执行器5ScheduledExecutorService singleScheduledPool = Executors.newSingleThreadScheduledExecutor();内部实现:
java
1// 等价于2new ScheduledThreadPoolExecutor(3 corePoolSize, // 核心线程数4 new ThreadPoolExecutor.AbortPolicy() // 拒绝策略5);67// ScheduledThreadPoolExecutor构造器8public ScheduledThreadPoolExecutor(int corePoolSize) {9 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,10 new DelayedWorkQueue());11}特点:
- 基于工作窃取算法的线程池
- 每个线程有自己的任务队列
- 当线程空闲时,会从其他繁忙线程队列窃取任务
- 使用ForkJoinPool实现,适合并行计算
创建方式:
java
1// 创建基于当前可用处理器数量的工作窃取线程池2ExecutorService stealingPool = Executors.newWorkStealingPool();34// 指定并行度5ExecutorService stealingPoolWithParallelism = Executors.newWorkStealingPool(4);内部实现:
java
1// 等价于2new ForkJoinPool(3 parallelism, // 并行度4 ForkJoinPool.defaultForkJoinWorkerThreadFactory,5 null,6 true // 异步模式7);预定义线程池的隐患
- FixedThreadPool 和 SingleThreadExecutor 使用无界队列,当任务持续高速提交可能导致内存溢出
- CachedThreadPool 允许创建无限数量的线程,线程数量激增可能导致系统资源耗尽
- 在生产环境中应该使用 ThreadPoolExecutor 自定义线程池,避免使用Executors创建的预定义线程池
生产环境线程池创建示例
java
1ThreadPoolExecutor productionPool = new ThreadPoolExecutor(2 5, // 核心线程数3 10, // 最大线程数4 60, TimeUnit.SECONDS, // 线程空闲时间5 new ArrayBlockingQueue<>(100), // 有界队列6 new ThreadFactoryBuilder() // 自定义线程工厂7 .setNameFormat("prod-pool-%d")8 .setDaemon(false)9 .build(),10 new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略11);3. ThreadPoolExecutor 详解
3.1 核心参数与工作原理
- 核心参数
- 工作原理
- 工作队列类型
- 线程工厂
ThreadPoolExecutor 构造参数
| 参数 | 描述 | 推荐配置 |
|---|---|---|
| corePoolSize | 核心线程数,线程池中保持活动的最小线程数 | CPU密集型:N+1 IO密集型:2N |
| maximumPoolSize | 最大线程数,线程池允许的最大线程数 | 核心线程数的2-3倍 |
| keepAliveTime | 空闲线程存活时间,超过核心线程数的空闲线程在等待新任务时的最长存活时间 | 根据业务和系统负载调整 |
| unit | 时间单位,keepAliveTime的时间单位 | 通常使用秒或毫秒 |
| workQueue | 工作队列,存放等待执行的任务 | 根据任务特性选择合适队列类型 |
| threadFactory | 线程工厂,创建新线程的工厂 | 自定义命名和优先级 |
| handler | 拒绝策略,线程池和队列都满时的处理策略 | 根据业务需求选择合适的策略 |
执行流程:
- 当任务提交时,如果线程数小于核心线程数,创建新的核心线程执行任务
- 如果线程数已达到核心线程数,将任务放入工作队列
- 如果队列已满,但线程数小于最大线程数,创建新的非核心线程执行任务
- 如果队列已满且线程数已达到最大线程数,触发拒绝策略
- 非核心线程在空闲超过keepAliveTime后会被终止
1. ArrayBlockingQueue
- 有界队列,基于数组
- 需要指定容量
- FIFO顺序处理任务
- 适用于已知任务边界的场景
java
1// 创建容量为100的有界队列2BlockingQueue<Runnable> arrayQueue = new ArrayBlockingQueue<>(100);2. LinkedBlockingQueue
- 可选有界/无界队列,基于链表
- 不指定容量时为无界队列(Integer.MAX_VALUE)
- FIFO顺序处理任务
- 无界队列风险:可能导致OOM
java
1// 有界队列2BlockingQueue<Runnable> boundedQueue = new LinkedBlockingQueue<>(500);3// 无界队列(不推荐)4BlockingQueue<Runnable> unboundedQueue = new LinkedBlockingQueue<>();3. SynchronousQueue
- 无容量的同步队列
- 直接传递模式,没有实际存储
- 适用于任务间直接传递或需要立即处理的场景
java
1// 同步队列2BlockingQueue<Runnable> syncQueue = new SynchronousQueue<>();4. PriorityBlockingQueue
- 无界优先级队列
- 根据任务优先级排序(需实现Comparable)
- 适用于任务有优先级要求的场景
java
1// 优先级队列2BlockingQueue<Runnable> priorityQueue = new PriorityBlockingQueue<>();5. DelayQueue
- 延时队列,元素在到期后才能被取出
- 适用于定时任务或需要延迟执行的场景
java
1// 延时队列2BlockingQueue<Runnable> delayQueue = new DelayQueue<>();java
1// 自定义线程工厂2ThreadFactory threadFactory = new ThreadFactory() {3 private final AtomicInteger threadNumber = new AtomicInteger(1);4 private final String namePrefix = "MyApp-Worker-";5 6 @Override7 public Thread newThread(Runnable r) {8 Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());9 10 // 设置为非守护线程11 t.setDaemon(false);12 13 // 设置优先级14 t.setPriority(Thread.NORM_PRIORITY);15 16 // 设置异常处理器17 t.setUncaughtExceptionHandler((thread, exception) -> {18 System.err.println("Thread " + thread.getName() + 19 " encountered an exception: " + exception.getMessage());20 exception.printStackTrace();21 });22 23 return t;24 }25};2627// 使用自定义线程工厂创建线程池28ThreadPoolExecutor executor = new ThreadPoolExecutor(29 5, 10, 60, TimeUnit.SECONDS, 30 new ArrayBlockingQueue<>(100),31 threadFactory,32 new ThreadPoolExecutor.CallerRunsPolicy()33);3.2 拒绝策略详解
拒绝策略的工作原理
当线程池无法接受任务(线程池已关闭或线程池和队列都已满)时,就会触发拒绝策略。Java提供了四种标准拒绝策略,也可以自定义拒绝策略。
每种策略的处理方式不同,需要根据业务需求选择合适的策略:
- 线程池关闭后:所有拒绝策略都会拒绝新的任务
- 线程池和队列都满:根据不同的拒绝策略进行处理
- AbortPolicy
- CallerRunsPolicy
- DiscardPolicy
- DiscardOldestPolicy
- 自定义拒绝策略
java
1// 中止策略:抛出异常(默认策略)2ThreadPoolExecutor executor = new ThreadPoolExecutor(3 2, 2, 0, TimeUnit.SECONDS,4 new ArrayBlockingQueue<>(2),5 new ThreadPoolExecutor.AbortPolicy()6);78// 模拟任务提交9try {10 // 提交5个任务,但线程池只能处理4个(2个线程+2个队列)11 for (int i = 1; i <= 5; i++) {12 final int taskId = i;13 executor.execute(() -> {14 try {15 System.out.println("任务" + taskId + "开始执行");16 Thread.sleep(1000);17 } catch (InterruptedException e) {18 Thread.currentThread().interrupt();19 }20 });21 System.out.println("成功提交任务" + taskId);22 }23} catch (RejectedExecutionException e) {24 System.err.println("任务被拒绝: " + e.getMessage());25}特点:
- 拒绝任务时抛出RejectedExecutionException
- 调用者可以捕获异常进行处理
- 适合需要立即感知任务被拒绝的场景
java
1// 调用者运行策略:由提交任务的线程执行任务2ThreadPoolExecutor executor = new ThreadPoolExecutor(3 2, 2, 0, TimeUnit.SECONDS,4 new ArrayBlockingQueue<>(2),5 new ThreadPoolExecutor.CallerRunsPolicy()6);78// 模拟任务提交9for (int i = 1; i <= 10; i++) {10 final int taskId = i;11 System.out.println("准备提交任务" + taskId + 12 ", 当前线程: " + Thread.currentThread().getName());13 14 executor.execute(() -> {15 String threadName = Thread.currentThread().getName();16 System.out.println("任务" + taskId + "开始执行, 线程: " + threadName);17 try {18 Thread.sleep(1000);19 } catch (InterruptedException e) {20 Thread.currentThread().interrupt();21 }22 System.out.println("任务" + taskId + "执行完成, 线程: " + threadName);23 });24}特点:
- 由调用者线程直接执行被拒绝的任务
- 提供反馈压力机制,降低任务提交速度
- 不会丢失任务,适合任务必须执行且可接受延迟的场景
java
1// 丢弃策略:静默丢弃被拒绝的任务2ThreadPoolExecutor executor = new ThreadPoolExecutor(3 2, 2, 0, TimeUnit.SECONDS,4 new ArrayBlockingQueue<>(2),5 new ThreadPoolExecutor.DiscardPolicy()6);78// 监控线程池状态9new Thread(() -> {10 while (!executor.isShutdown()) {11 System.out.println(12 "线程池状态: 活动线程=" + executor.getActiveCount() +13 ", 队列大小=" + executor.getQueue().size() +14 ", 已完成任务=" + executor.getCompletedTaskCount()15 );16 try {17 Thread.sleep(1000);18 } catch (InterruptedException e) {19 Thread.currentThread().interrupt();20 }21 }22}).start();2324// 模拟任务提交25for (int i = 1; i <= 10; i++) {26 final int taskId = i;27 System.out.println("提交任务" + taskId);28 executor.execute(() -> {29 try {30 System.out.println("执行任务" + taskId);31 Thread.sleep(2000);32 } catch (InterruptedException e) {33 Thread.currentThread().interrupt();34 }35 });36}特点:
- 静默丢弃被拒绝的任务,不做任何处理
- 适合允许任务丢失的场景
- 对任务重要性要求低的场景
java
1// 丢弃最旧任务策略:丢弃队列头部的任务,然后尝试执行当前任务2ThreadPoolExecutor executor = new ThreadPoolExecutor(3 2, 2, 0, TimeUnit.SECONDS,4 new ArrayBlockingQueue<>(2),5 new ThreadPoolExecutor.DiscardOldestPolicy()6);78// 模拟任务提交9for (int i = 1; i <= 10; i++) {10 final int taskId = i;11 System.out.println("提交任务" + taskId);12 executor.execute(() -> {13 try {14 System.out.println("执行任务" + taskId + 15 ", 线程: " + Thread.currentThread().getName());16 Thread.sleep(1000);17 } catch (InterruptedException e) {18 Thread.currentThread().interrupt();19 }20 });21 22 // 让提交速度慢一点,观察效果23 try {24 Thread.sleep(100);25 } catch (InterruptedException e) {26 Thread.currentThread().interrupt();27 }28}特点:
- 丢弃队列中最旧的任务,执行当前任务
- 队列头部任务被丢弃,队列尾部插入新任务
- 适合更注重新任务,可以接受丢弃旧任务的场景
java
1// 自定义拒绝策略2RejectedExecutionHandler customHandler = new RejectedExecutionHandler() {3 @Override4 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {5 // 1. 记录被拒绝的任务6 System.err.println("任务被拒绝,当前线程池状态:");7 System.err.println(" 活动线程数:" + executor.getActiveCount());8 System.err.println(" 已完成任务数:" + executor.getCompletedTaskCount());9 System.err.println(" 工作队列大小:" + executor.getQueue().size());10 11 // 2. 可以尝试重新提交到其他线程池12 fallbackExecutor.execute(r);13 14 // 3. 或者尝试延迟后重试15 // 注意:这种方式可能导致调用栈增长16 /*17 try {18 Thread.sleep(1000); // 延迟1秒后重试19 executor.execute(r);20 } catch (InterruptedException | RejectedExecutionException e) {21 Thread.currentThread().interrupt();22 }23 */24 25 // 4. 或者保存到数据库等待后续处理26 // saveTaskToDatabase(r);27 }28};2930// 创建一个备用线程池31ExecutorService fallbackExecutor = Executors.newSingleThreadExecutor();3233// 使用自定义拒绝策略创建线程池34ThreadPoolExecutor executor = new ThreadPoolExecutor(35 2, 2, 0, TimeUnit.SECONDS,36 new ArrayBlockingQueue<>(2),37 customHandler38);3940// 模拟任务提交41for (int i = 1; i <= 10; i++) {42 final int taskId = i;43 executor.execute(() -> {44 System.out.println("执行任务" + taskId);45 try {46 Thread.sleep(500);47 } catch (InterruptedException e) {48 Thread.currentThread().interrupt();49 }50 });51}自定义策略优势:
- 可以实现业务定制化的拒绝处理逻辑
- 支持任务重试、日志记录、监控告警等复杂处理
- 可以结合队列优先级处理重要任务
3.3 线程池状态与生命周期
java
1ThreadPoolExecutor executor = new ThreadPoolExecutor(2 5, 10, 60, TimeUnit.SECONDS,3 new ArrayBlockingQueue<>(100)4);56// 使用线程池执行一些任务7for (int i = 0; i < 20; i++) {8 final int taskId = i;9 executor.execute(() -> {10 System.out.println("执行任务" + taskId);11 try {12 Thread.sleep(500);13 } catch (InterruptedException e) {14 Thread.currentThread().interrupt();15 }16 });17}1819// 优雅关闭线程池20executor.shutdown();21System.out.println("线程池状态(shutdown后): " + executor.isShutdown());2223// 等待任务完成24try {25 // 等待所有任务完成或超时26 boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);27 System.out.println("所有任务是否已完成: " + terminated);28 29 if (!terminated) {30 // 强制关闭31 System.out.println("尚未完成的任务数: " + executor.getQueue().size());32 System.out.println("立即关闭线程池...");33 34 // 获取未完成的任务列表35 List<Runnable> unfinishedTasks = executor.shutdownNow();36 System.out.println("未执行的任务数: " + unfinishedTasks.size());37 }38} catch (InterruptedException e) {39 // 如果当前线程被中断,立即关闭线程池40 Thread.currentThread().interrupt();41 executor.shutdownNow();42}4344System.out.println("线程池是否已终止: " + executor.isTerminated());线程池生命周期状态
- RUNNING: 初始状态,线程池能够接受新任务并处理排队任务
- SHUTDOWN: 不再接受新任务,但仍会处理排队任务
- STOP: 不接受新任务,不处理排队任务,中断进行中任务
- TIDYING: 所有任务已终止,工作线程数量为0,即将运行terminated()方法
- TERMINATED: terminated()方法执行完成,线程池完全终止
4. 线程池监控与管理
4.1 线程池状态监控
线程池监控示例
java
1public class ThreadPoolMonitoring {2 3 /**4 * 线程池监控器5 */6 public static class ThreadPoolMonitor {7 private final ThreadPoolExecutor executor;8 private final ScheduledExecutorService monitor;9 10 public ThreadPoolMonitor(ThreadPoolExecutor executor) {11 this.executor = executor;12 this.monitor = Executors.newSingleThreadScheduledExecutor();13 }14 15 public void startMonitoring() {16 monitor.scheduleAtFixedRate(() -> {17 printStatus();18 }, 0, 1, TimeUnit.SECONDS);19 }20 21 public void printStatus() {22 System.out.println("=== 线程池状态 ===");23 System.out.println("核心线程数: " + executor.getCorePoolSize());24 System.out.println("最大线程数: " + executor.getMaximumPoolSize());25 System.out.println("当前池大小: " + executor.getPoolSize());26 System.out.println("活跃线程数: " + executor.getActiveCount());27 System.out.println("队列大小: " + executor.getQueue().size());28 System.out.println("已完成任务数: " + executor.getCompletedTaskCount());29 System.out.println("总任务数: " + executor.getTaskCount());30 System.out.println("是否关闭: " + executor.isShutdown());31 System.out.println("是否终止: " + executor.isTerminated());32 System.out.println("------------------------");33 }34 35 public void stopMonitoring() {36 monitor.shutdown();37 }38 }39 40 /**41 * 线程池性能统计42 */43 public static class ThreadPoolStatistics {44 private final ThreadPoolExecutor executor;45 private long startTime;46 private long totalTasks;47 private long completedTasks;48 49 public ThreadPoolStatistics(ThreadPoolExecutor executor) {50 this.executor = executor;51 this.startTime = System.currentTimeMillis();52 }53 54 public void recordTaskSubmission() {55 totalTasks++;56 }57 58 public void recordTaskCompletion() {59 completedTasks++;60 }61 62 public void printStatistics() {63 long currentTime = System.currentTimeMillis();64 long runningTime = currentTime - startTime;65 66 System.out.println("=== 线程池统计信息 ===");67 System.out.println("运行时间: " + runningTime + "ms");68 System.out.println("总提交任务: " + totalTasks);69 System.out.println("已完成任务: " + completedTasks);70 System.out.println("任务完成率: " + (totalTasks > 0 ? (completedTasks * 100.0 / totalTasks) : 0) + "%");71 System.out.println("平均任务处理时间: " + (completedTasks > 0 ? (runningTime / completedTasks) : 0) + "ms");72 }73 }74}4.2 线程池生命周期管理
线程池生命周期管理示例
java
1public class ThreadPoolLifecycle {2 3 /**4 * 优雅关闭线程池5 */6 public static class GracefulShutdown {7 private final ThreadPoolExecutor executor;8 9 public GracefulShutdown(ThreadPoolExecutor executor) {10 this.executor = executor;11 }12 13 public void shutdownGracefully() {14 System.out.println("开始优雅关闭线程池...");15 16 // 1. 停止接受新任务17 executor.shutdown();18 19 try {20 // 2. 等待现有任务完成21 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {22 System.out.println("等待超时,强制关闭...");23 executor.shutdownNow();24 25 // 3. 再次等待26 if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {27 System.err.println("线程池无法关闭");28 }29 }30 } catch (InterruptedException e) {31 System.out.println("等待过程中被中断,强制关闭...");32 executor.shutdownNow();33 Thread.currentThread().interrupt();34 }35 36 System.out.println("线程池已关闭");37 }38 }39 40 /**41 * 线程池重启机制42 */43 public static class ThreadPoolRestart {44 private volatile ThreadPoolExecutor executor;45 private final Object lock = new Object();46 47 public ThreadPoolRestart() {48 this.executor = createNewExecutor();49 }50 51 private ThreadPoolExecutor createNewExecutor() {52 return new ThreadPoolExecutor(53 5, 10, 60L, TimeUnit.SECONDS,54 new ArrayBlockingQueue<>(100),55 new ThreadPoolExecutor.CallerRunsPolicy()56 );57 }58 59 public void restart() {60 synchronized (lock) {61 System.out.println("开始重启线程池...");62 63 // 1. 关闭旧线程池64 ThreadPoolExecutor oldExecutor = executor;65 oldExecutor.shutdown();66 67 try {68 if (!oldExecutor.awaitTermination(30, TimeUnit.SECONDS)) {69 oldExecutor.shutdownNow();70 }71 } catch (InterruptedException e) {72 oldExecutor.shutdownNow();73 Thread.currentThread().interrupt();74 }75 76 // 2. 创建新线程池77 executor = createNewExecutor();78 System.out.println("线程池重启完成");79 }80 }81 82 public void submitTask(Runnable task) {83 synchronized (lock) {84 executor.submit(task);85 }86 }87 }88}5. 线程池最佳实践
5.1 线程池配置策略
核心原则
配置线程池时需要考虑以下因素:
- 任务类型:CPU密集型、IO密集型、混合型
- 系统资源:CPU核心数、内存大小
- 业务需求:响应时间、吞吐量要求
- 监控能力:线程池状态监控
线程池配置最佳实践示例
java
1public class ThreadPoolBestPractices {2 3 /**4 * 1. CPU密集型任务配置5 */6 public static class CPUIntensiveConfig {7 public static ThreadPoolExecutor createCPUIntensivePool() {8 int cpuCores = Runtime.getRuntime().availableProcessors();9 return new ThreadPoolExecutor(10 cpuCores, // 核心线程数 = CPU核心数11 cpuCores, // 最大线程数 = CPU核心数12 0L, TimeUnit.MILLISECONDS, // 不保留空闲线程13 new LinkedBlockingQueue<>(), // 无界队列14 new ThreadPoolExecutor.CallerRunsPolicy()15 );16 }17 }18 19 /**20 * 2. IO密集型任务配置21 */22 public static class IOIntensiveConfig {23 public static ThreadPoolExecutor createIOIntensivePool() {24 int cpuCores = Runtime.getRuntime().availableProcessors();25 return new ThreadPoolExecutor(26 cpuCores * 2, // 核心线程数 = CPU核心数 * 227 cpuCores * 4, // 最大线程数 = CPU核心数 * 428 60L, TimeUnit.SECONDS, // 保留空闲线程60秒29 new ArrayBlockingQueue<>(1000), // 有界队列30 new ThreadPoolExecutor.CallerRunsPolicy()31 );32 }33 }34 35 /**36 * 3. 混合型任务配置37 */38 public static class MixedTaskConfig {39 public static ThreadPoolExecutor createMixedTaskPool() {40 int cpuCores = Runtime.getRuntime().availableProcessors();41 return new ThreadPoolExecutor(42 cpuCores, // 核心线程数 = CPU核心数43 cpuCores * 2, // 最大线程数 = CPU核心数 * 244 30L, TimeUnit.SECONDS, // 保留空闲线程30秒45 new ArrayBlockingQueue<>(500), // 有界队列46 new ThreadPoolExecutor.CallerRunsPolicy()47 );48 }49 }50 51 /**52 * 4. 定时任务配置53 */54 public static class ScheduledTaskConfig {55 public static ScheduledExecutorService createScheduledPool() {56 int cpuCores = Runtime.getRuntime().availableProcessors();57 return Executors.newScheduledThreadPool(cpuCores);58 }59 }60}5.2 任务提交最佳实践
任务提交最佳实践示例
java
1public class TaskSubmissionBestPractices {2 3 /**4 * 1. 使用合适的提交方法5 */6 public static class TaskSubmissionMethods {7 private final ExecutorService executor = Executors.newFixedThreadPool(5);8 9 public void demonstrateSubmissionMethods() {10 // 1. execute() - 不关心返回值11 executor.execute(() -> {12 System.out.println("使用execute提交任务");13 });14 15 // 2. submit() - 关心返回值16 Future<String> future = executor.submit(() -> {17 return "任务执行结果";18 });19 20 // 3. submit() - 提交Callable21 Future<Integer> result = executor.submit(() -> {22 return 42;23 });24 25 // 4. invokeAll() - 批量提交26 List<Callable<String>> tasks = Arrays.asList(27 () -> "任务1",28 () -> "任务2",29 () -> "任务3"30 );31 32 try {33 List<Future<String>> futures = executor.invokeAll(tasks);34 for (Future<String> f : futures) {35 System.out.println("结果: " + f.get());36 }37 } catch (InterruptedException | ExecutionException e) {38 e.printStackTrace();39 }40 }41 }42 43 /**44 * 2. 异常处理45 */46 public static class ExceptionHandling {47 private final ExecutorService executor = Executors.newFixedThreadPool(5);48 49 public void handleExceptions() {50 // 1. 在任务内部处理异常51 executor.submit(() -> {52 try {53 // 可能抛出异常的操作54 throw new RuntimeException("任务异常");55 } catch (Exception e) {56 System.err.println("任务内部处理异常: " + e.getMessage());57 }58 });59 60 // 2. 通过Future处理异常61 Future<String> future = executor.submit(() -> {62 throw new RuntimeException("任务异常");63 });64 65 try {66 String result = future.get();67 } catch (ExecutionException e) {68 System.err.println("通过Future捕获异常: " + e.getCause().getMessage());69 } catch (InterruptedException e) {70 Thread.currentThread().interrupt();71 }72 }73 }74 75 /**76 * 3. 任务取消77 */78 public static class TaskCancellation {79 private final ExecutorService executor = Executors.newFixedThreadPool(5);80 81 public void demonstrateCancellation() {82 Future<?> future = executor.submit(() -> {83 try {84 Thread.sleep(10000); // 长时间运行的任务85 } catch (InterruptedException e) {86 System.out.println("任务被中断");87 Thread.currentThread().interrupt();88 }89 });90 91 // 取消任务92 boolean cancelled = future.cancel(true);93 System.out.println("任务取消结果: " + cancelled);94 }95 }96}5.3 性能优化技巧
线程池性能优化示例
java
1public class ThreadPoolPerformanceOptimization {2 3 /**4 * 1. 任务分解5 */6 public static class TaskDecomposition {7 private final ExecutorService executor = Executors.newFixedThreadPool(5);8 9 public void decomposeLargeTask() {10 int[] data = new int[1000];11 // 初始化数据...12 13 // 将大任务分解为小任务14 int chunkSize = 100;15 List<Future<Integer>> futures = new ArrayList<>();16 17 for (int i = 0; i < data.length; i += chunkSize) {18 final int start = i;19 final int end = Math.min(i + chunkSize, data.length);20 21 Future<Integer> future = executor.submit(() -> {22 int sum = 0;23 for (int j = start; j < end; j++) {24 sum += data[j];25 }26 return sum;27 });28 futures.add(future);29 }30 31 // 收集结果32 int totalSum = 0;33 for (Future<Integer> future : futures) {34 try {35 totalSum += future.get();36 } catch (InterruptedException | ExecutionException e) {37 e.printStackTrace();38 }39 }40 }41 }42 43 /**44 * 2. 任务优先级45 */46 public static class TaskPriority {47 private final ThreadPoolExecutor executor;48 49 public TaskPriority() {50 this.executor = new ThreadPoolExecutor(51 5, 10, 60L, TimeUnit.SECONDS,52 new PriorityBlockingQueue<>(),53 new ThreadPoolExecutor.CallerRunsPolicy()54 );55 }56 57 public void submitPriorityTasks() {58 // 高优先级任务59 executor.submit(new PriorityTask("高优先级任务", 1));60 61 // 低优先级任务62 executor.submit(new PriorityTask("低优先级任务", 3));63 64 // 中优先级任务65 executor.submit(new PriorityTask("中优先级任务", 2));66 }67 68 private static class PriorityTask implements Runnable, Comparable<PriorityTask> {69 private final String name;70 private final int priority;71 72 public PriorityTask(String name, int priority) {73 this.name = name;74 this.priority = priority;75 }76 77 @Override78 public void run() {79 System.out.println("执行任务: " + name + ",优先级: " + priority);80 }81 82 @Override83 public int compareTo(PriorityTask other) {84 return Integer.compare(this.priority, other.priority);85 }86 }87 }88 89 /**90 * 3. 线程池预热91 */92 public static class ThreadPoolWarmup {93 private final ThreadPoolExecutor executor;94 95 public ThreadPoolWarmup() {96 this.executor = new ThreadPoolExecutor(97 5, 10, 60L, TimeUnit.SECONDS,98 new ArrayBlockingQueue<>(100),99 new ThreadPoolExecutor.CallerRunsPolicy()100 );101 }102 103 public void warmup() {104 System.out.println("开始预热线程池...");105 106 // 提交一些轻量级任务来预热线程池107 for (int i = 0; i < 5; i++) {108 executor.submit(() -> {109 System.out.println("预热任务执行");110 });111 }112 113 // 等待预热完成114 try {115 Thread.sleep(1000);116 } catch (InterruptedException e) {117 Thread.currentThread().interrupt();118 }119 120 System.out.println("线程池预热完成");121 }122 }123}6. 总结
线程池是Java并发编程中的重要组件,合理使用线程池可以显著提高系统性能和稳定性。
6.1 关键要点
- 线程池类型:FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool
- 配置参数:核心线程数、最大线程数、队列类型、拒绝策略
- 监控管理:状态监控、生命周期管理、性能统计
- 最佳实践:合理配置、异常处理、任务分解、性能优化
6.2 选择建议
| 场景 | 推荐线程池 | 配置要点 |
|---|---|---|
| 固定并发数 | FixedThreadPool | 核心线程数 = 最大线程数 |
| 短期异步任务 | CachedThreadPool | 适合任务执行时间短 |
| 顺序执行 | SingleThreadExecutor | 保证任务顺序执行 |
| 定时任务 | ScheduledThreadPool | 支持延迟和周期性执行 |
| 自定义需求 | ThreadPoolExecutor | 完全控制配置参数 |
6.3 学习建议
- 理解原理:深入理解线程池的工作原理和参数含义
- 实践验证:通过编写代码验证不同配置的效果
- 性能测试:对比不同线程池配置的性能差异
- 监控运维:建立线程池监控和告警机制
通过深入理解和熟练运用线程池技术,我们能够构建出更加高效、健壮和可维护的Java并发应用程序。
参与讨论