Java 线程基础详解
线程是Java并发编程的基础,理解线程的基本概念、创建方式、生命周期管理对于掌握并发编程至关重要。本文将详细介绍Java线程的核心知识。
核心价值
Java线程 = 轻量级处理单元 + 并发执行 + 资源共享 + 高效通信 + 可靠性保障
- 🚀 轻量级处理:比进程创建更轻量,系统开销小
- 👥 并发执行:多线程并行处理任务,提高CPU利用率
- 🔄 资源共享:共享进程内存空间,减少通信开销
- ⚡ 响应性能:提高应用响应性能,避免阻塞主线程
- 🛡️ 隔离性:各线程独立执行,互不干扰
1. 线程概述
1.1 什么是线程?
核心概念
线程是程序执行的最小单位,是CPU调度的基本单元。一个进程可以包含多个线程,线程共享进程的内存空间,但拥有独立的执行栈。
1.2 线程与进程的关系
| 特性 | 进程 | 线程 |
|---|---|---|
| 资源分配 | 操作系统分配资源的基本单位 | 共享进程的资源 |
| 内存空间 | 独立的内存空间 | 共享进程的内存空间 |
| 创建开销 | 较大 | 较小 |
| 通信方式 | 进程间通信(IPC) | 直接共享内存 |
| 切换开销 | 较大 | 较小 |
| 并发性 | 进程级并发 | 线程级并发 |
| 稳定性 | 一个进程崩溃不影响其他进程 | 一个线程崩溃可能导致整个进程崩溃 |
1.3 线程的优势
- 提高响应性
- 资源共享
- 经济性
java
1// 主线程保持UI响应2SwingUtilities.invokeLater(() -> {3 // UI相关操作4 frame.setVisible(true);5});6 7// 工作线程处理耗时操作8new Thread(() -> {9 // 处理耗时的数据加载10 loadLargeDataset();11 12 // 完成后更新UI13 SwingUtilities.invokeLater(() -> {14 updateUIWithResults();15 });16}).start();java
1// 共享数据2SharedData sharedData = new SharedData();3 4// 多个线程访问共享数据5Thread thread1 = new Thread(() -> {6 sharedData.increment();7 System.out.println("线程1: " + sharedData.getValue());8});9 10Thread thread2 = new Thread(() -> {11 sharedData.increment();12 System.out.println("线程2: " + sharedData.getValue());13});14 15thread1.start();16thread2.start();java
1// 创建多个线程处理任务,比创建多个进程开销小2for (int i = 0; i < 10; i++) {3 final int taskId = i;4 Thread thread = new Thread(() -> {5 System.out.println("处理任务: " + taskId);6 processTask(taskId);7 });8 thread.start();9}2. 线程创建方式详解
线程创建方法对比
| 创建方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 继承Thread | 简单直接,可访问线程方法 | 单继承限制,不能继承其他类 | 简单任务,需要直接操作线程 |
| 实现Runnable | 避免单继承限制,更灵活 | 不能直接访问线程方法 | 大多数场景下的首选方式 |
| 实现Callable | 可返回结果,可抛出异常 | 使用相对复杂,需要Future | 需要返回值或异常处理的任务 |
2.1 继承Thread类
- 代码实现
- 特点分析
- 使用示例
java
1// 自定义线程类2public class CustomThread extends Thread {3 private String threadName;4 5 public CustomThread(String threadName) {6 this.threadName = threadName;7 }8 9 @Override10 public void run() {11 System.out.println("线程 " + threadName + " 开始执行");12 // 线程任务代码...13 System.out.println("线程 " + threadName + " 执行完成");14 }15}1617// 使用方式18CustomThread thread = new CustomThread("工作线程");19thread.start(); // 启动线程继承Thread类特点
- 优点:直接访问Thread的方法(如getName(), getPriority()等)
- 缺点:Java单继承限制,无法再继承其他类
- 控制:可以直接控制线程的行为
- 耦合度:任务逻辑与线程控制耦合在一起
java
1public class ThreadExample {2 public static void main(String[] args) {3 // 创建线程实例4 CustomThread thread1 = new CustomThread("工作线程1");5 CustomThread thread2 = new CustomThread("工作线程2");6 7 // 设置线程属性8 thread1.setPriority(Thread.MAX_PRIORITY); // 最高优先级9 thread2.setPriority(Thread.MIN_PRIORITY); // 最低优先级10 11 // 启动线程12 thread1.start();13 thread2.start();14 15 try {16 thread1.join(); // 等待thread1完成17 thread2.join(); // 等待thread2完成18 } catch (InterruptedException e) {19 Thread.currentThread().interrupt();20 }21 }22}2.2 实现Runnable接口
- 基本实现
- Lambda表达式
- 方法引用
- 特点分析
java
1// 实现Runnable接口2public class TaskRunnable implements Runnable {3 private String taskName;4 5 public TaskRunnable(String taskName) {6 this.taskName = taskName;7 }8 9 @Override10 public void run() {11 System.out.println("任务 " + taskName + " 开始执行");12 // 任务代码...13 System.out.println("任务 " + taskName + " 执行完成");14 }15}1617// 使用方式18TaskRunnable task = new TaskRunnable("数据处理");19Thread thread = new Thread(task);20thread.start();java
1// 使用Lambda表达式创建线程2Thread thread = new Thread(() -> {3 System.out.println("Lambda线程开始执行");4 // 任务代码...5 System.out.println("Lambda线程执行完成");6});78thread.start();java
1public class RunnableExample {2 // 任务方法3 public static void performTask() {4 System.out.println("方法引用任务开始执行");5 // 任务代码...6 System.out.println("方法引用任务执行完成");7 }8 9 public static void main(String[] args) {10 // 使用方法引用11 Thread thread = new Thread(RunnableExample::performTask);12 thread.start();13 }14}Runnable接口特点
- 优点:避免单继承限制,可以继承其他类
- 灵活性:同一个Runnable可以被多个线程使用
- 分离关注点:将任务逻辑与线程控制分离
- 函数式风格:支持Lambda表达式,代码更简洁
- 缺点:无法直接访问Thread的方法,需要通过Thread.currentThread()
最佳实践
在大多数情况下,实现Runnable接口是创建线程的推荐方式,它提供了更好的灵活性和可扩展性。
2.3 实现Callable接口
- 基本实现
- Lambda表达式
- 超时处理
- 特点分析
java
1import java.util.concurrent.Callable;2import java.util.concurrent.FutureTask;34// 实现Callable接口5public class CalculationTask implements Callable<Integer> {6 private int number;7 8 public CalculationTask(int number) {9 this.number = number;10 }11 12 @Override13 public Integer call() throws Exception {14 System.out.println("计算任务开始");15 if (number < 0) {16 throw new IllegalArgumentException("负数不支持");17 }18 int result = number * number;19 System.out.println("计算结果: " + result);20 return result;21 }22}2324// 使用方式25CalculationTask task = new CalculationTask(10);26FutureTask<Integer> futureTask = new FutureTask<>(task);27Thread thread = new Thread(futureTask);28thread.start();2930// 获取结果31try {32 Integer result = futureTask.get(); // 阻塞等待结果33 System.out.println("获取到计算结果: " + result);34} catch (Exception e) {35 e.printStackTrace();36}java
1import java.util.concurrent.Callable;2import java.util.concurrent.FutureTask;34// 使用Lambda表达式创建Callable5Callable<String> callable = () -> {6 System.out.println("Callable任务开始执行");7 Thread.sleep(2000); // 模拟耗时操作8 return "任务执行结果";9};1011FutureTask<String> futureTask = new FutureTask<>(callable);12Thread thread = new Thread(futureTask);13thread.start();1415// 获取结果16try {17 String result = futureTask.get();18 System.out.println("结果: " + result);19} catch (Exception e) {20 e.printStackTrace();21}java
1import java.util.concurrent.Callable;2import java.util.concurrent.FutureTask;3import java.util.concurrent.TimeUnit;4import java.util.concurrent.TimeoutException;56// 长时间运行的任务7Callable<String> longTask = () -> {8 Thread.sleep(10000); // 10秒9 return "长时间任务完成";10};1112FutureTask<String> futureTask = new FutureTask<>(longTask);13Thread thread = new Thread(futureTask);14thread.start();1516try {17 // 设置5秒超时18 String result = futureTask.get(5, TimeUnit.SECONDS);19 System.out.println("任务结果: " + result);20} catch (TimeoutException e) {21 System.out.println("任务超时");22 thread.interrupt(); // 中断线程23} catch (Exception e) {24 e.printStackTrace();25}Callable接口特点
- 返回值:可以返回任务执行结果
- 异常处理:可以抛出检查型异常
- Future支持:支持获取结果、取消任务、检查完成状态
- 超时机制:支持设置等待结果的超时时间
- 缺点:使用较复杂,需要通过FutureTask包装
3. 线程生命周期详解
3.1 线程状态概述
- 线程状态
- 状态检查
- 状态转换
Java线程的6种状态
- NEW:新创建的线程,尚未启动
- RUNNABLE:可运行状态,可能正在运行也可能在等待CPU资源
- BLOCKED:阻塞状态,等待获取监视器锁
- WAITING:等待状态,等待其他线程执行特定操作
- TIMED_WAITING:超时等待状态,指定时间内等待其他线程
- TERMINATED:终止状态,线程执行完毕
java
1Thread thread = new Thread(() -> {2 try {3 Thread.sleep(2000);4 } catch (InterruptedException e) {5 Thread.currentThread().interrupt();6 }7});89System.out.println("初始状态: " + thread.getState()); // NEW1011thread.start();12System.out.println("启动后状态: " + thread.getState()); // RUNNABLE1314try {15 Thread.sleep(1000); // 让线程进入睡眠状态16 System.out.println("睡眠中状态: " + thread.getState()); // TIMED_WAITING17 18 thread.join(); // 等待线程结束19 System.out.println("结束后状态: " + thread.getState()); // TERMINATED20} catch (InterruptedException e) {21 Thread.currentThread().interrupt();22}| 当前状态 | 触发条件 | 目标状态 |
|---|---|---|
| NEW | 调用start()方法 | RUNNABLE |
| RUNNABLE | 获取synchronized锁失败 | BLOCKED |
| RUNNABLE | 调用wait()方法 | WAITING |
| RUNNABLE | 调用sleep(time)或wait(time)方法 | TIMED_WAITING |
| BLOCKED | 获得锁 | RUNNABLE |
| WAITING | 调用notify/notifyAll | RUNNABLE/BLOCKED |
| TIMED_WAITING | 超时或被唤醒 | RUNNABLE/BLOCKED |
| RUNNABLE | 执行完成 | TERMINATED |
3.2 线程状态转换图
3.3 线程状态示例
BLOCKED状态示例
java
1public class BlockedStateExample {2 private static final Object lock = new Object();3 4 public static void main(String[] args) throws InterruptedException {5 Thread thread1 = new Thread(() -> {6 synchronized (lock) {7 System.out.println("线程1获得锁");8 try {9 Thread.sleep(5000); // 持有锁5秒10 } catch (InterruptedException e) {11 Thread.currentThread().interrupt();12 }13 }14 });15 16 // 线程2将被阻塞17 Thread thread2 = new Thread(() -> {18 synchronized (lock) {19 System.out.println("线程2获得锁");20 }21 });22 23 thread1.start();24 Thread.sleep(100); // 确保线程1先启动25 26 thread2.start();27 Thread.sleep(1000); // 给线程2时间进入BLOCKED状态28 29 System.out.println("线程2状态: " + thread2.getState()); // 输出BLOCKED30 }31}WAITING状态示例
java
1public class WaitingStateExample {2 private static final Object lock = new Object();3 4 public static void main(String[] args) throws InterruptedException {5 Thread waitingThread = new Thread(() -> {6 synchronized (lock) {7 try {8 System.out.println("等待线程进入等待状态");9 lock.wait(); // 进入WAITING状态10 System.out.println("等待线程被唤醒");11 } catch (InterruptedException e) {12 Thread.currentThread().interrupt();13 }14 }15 });16 17 Thread notifyThread = new Thread(() -> {18 try {19 Thread.sleep(2000); // 等待2秒20 synchronized (lock) {21 System.out.println("通知线程唤醒等待线程");22 lock.notify();23 }24 } catch (InterruptedException e) {25 Thread.currentThread().interrupt();26 }27 });28 29 waitingThread.start();30 Thread.sleep(500); // 确保等待线程先执行31 System.out.println("等待线程状态: " + waitingThread.getState()); // 输出WAITING32 33 notifyThread.start();34 }35}TIMED_WAITING状态示例
java
1public class TimedWaitingStateExample {2 public static void main(String[] args) throws InterruptedException {3 Thread sleepingThread = new Thread(() -> {4 try {5 System.out.println("线程开始睡眠5秒");6 Thread.sleep(5000);7 System.out.println("线程睡眠结束");8 } catch (InterruptedException e) {9 System.out.println("线程睡眠被中断");10 Thread.currentThread().interrupt();11 }12 });13 14 sleepingThread.start();15 Thread.sleep(1000); // 给线程时间进入TIMED_WAITING状态16 17 System.out.println("线程状态: " + sleepingThread.getState()); // 输出TIMED_WAITING18 }19}4. 线程控制方法详解
4.1 线程控制核心方法
| 控制方法 | 描述 | 注意事项 |
|---|---|---|
| start() | 启动线程,使线程进入就绪状态 | 不能多次调用同一线程的start() |
| join() | 等待线程终止 | 可能导致调用线程阻塞 |
| join(long) | 等待线程终止,有超时时间 | 超时后继续执行,不管线程是否完成 |
| sleep(long) | 当前线程休眠指定时间 | 不会释放对象锁 |
| yield() | 提示线程调度器让出CPU执行权 | 只是提示,调度器可能忽略 |
| interrupt() | 中断线程 | 设置线程的中断状态 |
| isInterrupted() | 检查线程是否被中断 | 不会清除中断状态 |
| interrupted() | 检查当前线程是否被中断 | 会清除中断状态 |
4.2 线程中断机制
- 基本中断
- 轮询中断状态
- 自定义标志
java
1// 可中断的线程任务2Thread thread = new Thread(() -> {3 while (!Thread.currentThread().isInterrupted()) {4 try {5 System.out.println("线程工作中...");6 Thread.sleep(1000);7 } catch (InterruptedException e) {8 System.out.println("线程被中断");9 // 重新设置中断状态(因为InterruptedException会清除中断状态)10 Thread.currentThread().interrupt();11 break;12 }13 }14 System.out.println("线程退出");15});1617thread.start();1819// 等待一段时间后中断线程20try {21 Thread.sleep(3000);22 thread.interrupt(); // 中断线程23} catch (InterruptedException e) {24 Thread.currentThread().interrupt();25}java
1Thread thread = new Thread(() -> {2 // 循环检查中断状态3 while (!Thread.currentThread().isInterrupted()) {4 // 执行一段非阻塞操作5 System.out.println("执行计算任务...");6 7 // 模拟计算工作8 long sum = 0;9 for (int i = 0; i < 1000000; i++) {10 sum += i;11 }12 13 // 在适当的检查点检查中断14 if (Thread.currentThread().isInterrupted()) {15 System.out.println("检测到中断,准备退出");16 break;17 }18 }19 System.out.println("线程正常退出");20});2122thread.start();23thread.interrupt(); // 中断线程java
1public class ControlledThread extends Thread {2 private volatile boolean running = true;3 4 public void stopThread() {5 running = false;6 }7 8 @Override9 public void run() {10 while (running) {11 System.out.println("线程运行中...");12 try {13 Thread.sleep(500);14 } catch (InterruptedException e) {15 System.out.println("睡眠被中断");16 }17 }18 System.out.println("线程停止");19 }20}2122// 使用方式23ControlledThread thread = new ControlledThread();24thread.start();2526// 等待一段时间后停止线程27try {28 Thread.sleep(2000);29 thread.stopThread(); // 优雅停止30} catch (InterruptedException e) {31 Thread.currentThread().interrupt();32}注意事项
- 永远不要使用已废弃的
Thread.stop()、Thread.suspend()和Thread.resume()方法 - 中断是一种协作机制,不会强制终止线程
- 线程在
InterruptedException异常处理中应当重新设置中断状态或传递异常
4.3 线程等待与通知
wait/notify机制详解
重要说明:
wait()、notify()和notifyAll()方法必须在同步代码块或同步方法中调用- 这些方法是
Object类的方法,不是Thread类的方法 wait()会释放锁,而sleep()不会释放锁- 调用
notify()后,被通知的线程不会立即执行,而是需要等待当前线程释放锁
等待/通知示例
java
1public class WaitNotifyExample {2 private static final Object lock = new Object();3 private static boolean dataReady = false;4 private static String data = null;5 6 public static void main(String[] args) {7 // 消费者线程8 Thread consumer = new Thread(() -> {9 synchronized (lock) {10 System.out.println("消费者等待数据...");11 while (!dataReady) { // 使用循环检查条件防止虚假唤醒12 try {13 lock.wait(); // 释放锁并等待14 } catch (InterruptedException e) {15 Thread.currentThread().interrupt();16 return;17 }18 }19 System.out.println("消费者消费数据: " + data);20 }21 });22 23 // 生产者线程24 Thread producer = new Thread(() -> {25 try {26 Thread.sleep(2000); // 模拟耗时操作27 } catch (InterruptedException e) {28 Thread.currentThread().interrupt();29 return;30 }31 32 synchronized (lock) {33 data = "重要数据";34 dataReady = true;35 System.out.println("生产者准备数据: " + data);36 lock.notify(); // 通知等待的消费者37 }38 });39 40 consumer.start();41 producer.start();42 }43}4.4 线程优先级和守护线程
- 线程优先级
- 守护线程
java
1public class ThreadPriorityExample {2 public static void main(String[] args) {3 // 创建三个优先级不同的线程4 Thread lowPriority = new Thread(() -> {5 for (int i = 0; i < 5; i++) {6 System.out.println("低优先级线程: " + i);7 Thread.yield(); // 提示可以让出CPU8 }9 });10 11 Thread normalPriority = new Thread(() -> {12 for (int i = 0; i < 5; i++) {13 System.out.println("普通优先级线程: " + i);14 Thread.yield();15 }16 });17 18 Thread highPriority = new Thread(() -> {19 for (int i = 0; i < 5; i++) {20 System.out.println("高优先级线程: " + i);21 Thread.yield();22 }23 });24 25 // 设置优先级26 lowPriority.setPriority(Thread.MIN_PRIORITY); // 127 normalPriority.setPriority(Thread.NORM_PRIORITY); // 528 highPriority.setPriority(Thread.MAX_PRIORITY); // 1029 30 // 启动线程31 lowPriority.start();32 normalPriority.start();33 highPriority.start();34 }35}java
1public class DaemonThreadExample {2 public static void main(String[] args) {3 // 创建守护线程4 Thread daemonThread = new Thread(() -> {5 while (true) {6 try {7 System.out.println("守护线程运行中...");8 Thread.sleep(1000);9 } catch (InterruptedException e) {10 break;11 }12 }13 });14 15 // 设置为守护线程(必须在start前设置)16 daemonThread.setDaemon(true);17 daemonThread.start();18 19 // 主线程(用户线程)20 try {21 System.out.println("主线程运行3秒后退出");22 Thread.sleep(3000);23 } catch (InterruptedException e) {24 Thread.currentThread().interrupt();25 }26 27 System.out.println("主线程退出,守护线程将自动终止");28 }29}守护线程特点
- 守护线程的优先级较低,但具体调度取决于操作系统
- 当JVM中只剩下守护线程时,JVM将退出
- 典型的守护线程如垃圾回收器、JIT编译器线程
- 守护线程创建的线程也是守护线程
5. 实际应用场景
5.1 并行计算
- 并行数组计算
- 并行文件处理
java
1public class ParallelCalculation {2 public static void main(String[] args) {3 int[] numbers = new int[10_000_000]; // 一千万个数4 // 初始化数组5 for (int i = 0; i < numbers.length; i++) {6 numbers[i] = i + 1;7 }8 9 // 使用多线程并行计算10 int processors = Runtime.getRuntime().availableProcessors();11 System.out.println("CPU核心数: " + processors);12 13 // 创建线程池14 ParallelCalculator[] calculators = new ParallelCalculator[processors];15 Thread[] threads = new Thread[processors];16 int segmentSize = numbers.length / processors;17 18 long startTime = System.currentTimeMillis();19 20 // 分配任务并启动线程21 for (int i = 0; i < processors; i++) {22 int startIndex = i * segmentSize;23 int endIndex = (i == processors - 1) ? numbers.length : (i + 1) * segmentSize;24 25 calculators[i] = new ParallelCalculator(numbers, startIndex, endIndex);26 threads[i] = new Thread(calculators[i]);27 threads[i].start();28 }29 30 // 等待所有线程完成并合并结果31 long sum = 0;32 for (int i = 0; i < processors; i++) {33 try {34 threads[i].join();35 sum += calculators[i].getResult();36 } catch (InterruptedException e) {37 Thread.currentThread().interrupt();38 }39 }40 41 long endTime = System.currentTimeMillis();42 System.out.println("并行计算结果: " + sum);43 System.out.println("耗时: " + (endTime - startTime) + "ms");44 }45 46 static class ParallelCalculator implements Runnable {47 private final int[] numbers;48 private final int startIndex;49 private final int endIndex;50 private long result = 0;51 52 public ParallelCalculator(int[] numbers, int startIndex, int endIndex) {53 this.numbers = numbers;54 this.startIndex = startIndex;55 this.endIndex = endIndex;56 }57 58 @Override59 public void run() {60 for (int i = startIndex; i < endIndex; i++) {61 result += numbers[i];62 }63 }64 65 public long getResult() {66 return result;67 }68 }69}java
1import java.io.*;2import java.nio.file.*;3import java.util.concurrent.*;45public class ParallelFileProcessor {6 public static void main(String[] args) {7 final Path directory = Paths.get("./data");8 final String searchKeyword = "important";9 10 try {11 // 获取目录中的所有文件12 if (!Files.exists(directory)) {13 System.err.println("目录不存在: " + directory);14 return;15 }16 17 File[] files = directory.toFile().listFiles();18 if (files == null || files.length == 0) {19 System.out.println("没有找到文件");20 return;21 }22 23 // 创建线程池并行处理文件24 int threadCount = Math.min(files.length, Runtime.getRuntime().availableProcessors());25 ExecutorService executor = Executors.newFixedThreadPool(threadCount);26 CountDownLatch latch = new CountDownLatch(files.length);27 28 // 提交搜索任务29 for (File file : files) {30 if (file.isFile()) {31 executor.submit(() -> {32 try {33 long occurrences = searchInFile(file, searchKeyword);34 System.out.println("文件 " + file.getName() + 35 " 中找到 " + occurrences + " 个匹配项");36 } finally {37 latch.countDown();38 }39 });40 } else {41 latch.countDown();42 }43 }44 45 // 等待所有任务完成46 latch.await();47 System.out.println("所有文件处理完成");48 49 // 关闭线程池50 executor.shutdown();51 52 } catch (IOException | InterruptedException e) {53 e.printStackTrace();54 }55 }56 57 private static long searchInFile(File file, String keyword) throws IOException {58 long count = 0;59 try (BufferedReader reader = new BufferedReader(new FileReader(file))) {60 String line;61 while ((line = reader.readLine()) != null) {62 if (line.contains(keyword)) {63 count++;64 }65 }66 }67 return count;68 }69}5.2 异步任务处理
java
1import java.util.concurrent.*;2import java.util.ArrayList;3import java.util.List;45public class AsyncTaskProcessing {6 private final int MAX_TASKS = 10;7 private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();8 private final List<WorkerThread> workers = new ArrayList<>();9 private volatile boolean isRunning = true;10 11 public AsyncTaskProcessing(int workerCount) {12 // 创建并启动工作线程13 for (int i = 0; i < workerCount; i++) {14 WorkerThread worker = new WorkerThread("Worker-" + i);15 workers.add(worker);16 worker.start();17 }18 }19 20 // 提交任务21 public void submitTask(Runnable task) throws InterruptedException {22 if (!isRunning) {23 throw new RejectedExecutionException("任务处理器已关闭");24 }25 taskQueue.put(task);26 }27 28 // 关闭任务处理器29 public void shutdown() {30 isRunning = false;31 for (WorkerThread worker : workers) {32 worker.interrupt();33 }34 }35 36 // 工作线程37 private class WorkerThread extends Thread {38 public WorkerThread(String name) {39 super(name);40 }41 42 @Override43 public void run() {44 while (isRunning || !taskQueue.isEmpty()) {45 try {46 Runnable task = taskQueue.poll(1, TimeUnit.SECONDS);47 if (task != null) {48 System.out.println(getName() + " 处理任务...");49 task.run();50 }51 } catch (InterruptedException e) {52 System.out.println(getName() + " 被中断");53 Thread.currentThread().interrupt();54 break;55 } catch (Exception e) {56 System.err.println(getName() + " 执行任务时出错: " + e.getMessage());57 }58 }59 System.out.println(getName() + " 已终止");60 }61 }62 63 // 使用示例64 public static void main(String[] args) {65 AsyncTaskProcessing processor = new AsyncTaskProcessing(3);66 67 try {68 // 提交多个任务69 for (int i = 0; i < 10; i++) {70 final int taskId = i;71 processor.submitTask(() -> {72 try {73 System.out.println("执行任务 " + taskId);74 Thread.sleep((long) (Math.random() * 1000));75 System.out.println("任务 " + taskId + " 完成");76 } catch (InterruptedException e) {77 Thread.currentThread().interrupt();78 }79 });80 }81 82 // 等待一段时间后关闭处理器83 Thread.sleep(5000);84 } catch (InterruptedException e) {85 Thread.currentThread().interrupt();86 } finally {87 processor.shutdown();88 }89 }90}异步任务处理的优势
- 非阻塞操作:提交任务后立即返回,不等待任务完成
- 并发处理:可同时处理多个任务,提高吞吐量
- 资源控制:通过工作线程数量限制并发量
- 优雅关闭:支持完成已提交任务后关闭
5.3 实时事件处理
实时事件处理架构
关键组件:
- 事件源:产生需要处理的事件
- 事件队列:存储待处理的事件
- 事件分发器:独立线程,从队列取出事件并分发
- 事件处理器:处理特定类型的事件
- 结果处理:汇总处理结果
java
1public class EventProcessor {2 private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>();3 private final Map<EventType, EventHandler> handlers = new HashMap<>();4 private final Thread dispatcherThread;5 private volatile boolean running = true;6 7 public EventProcessor() {8 // 注册事件处理器9 handlers.put(EventType.USER_ACTION, new UserActionHandler());10 handlers.put(EventType.SYSTEM_ALERT, new SystemAlertHandler());11 handlers.put(EventType.DATA_CHANGE, new DataChangeHandler());12 13 // 创建分发线程14 dispatcherThread = new Thread(() -> {15 while (running) {16 try {17 Event event = eventQueue.take();18 processEvent(event);19 } catch (InterruptedException e) {20 Thread.currentThread().interrupt();21 break;22 }23 }24 }, "EventDispatcher");25 26 dispatcherThread.start();27 }28 29 public void submitEvent(Event event) {30 try {31 eventQueue.put(event);32 } catch (InterruptedException e) {33 Thread.currentThread().interrupt();34 }35 }36 37 private void processEvent(Event event) {38 EventHandler handler = handlers.get(event.getType());39 if (handler != null) {40 try {41 handler.handle(event);42 } catch (Exception e) {43 System.err.println("处理事件时出错: " + e.getMessage());44 }45 } else {46 System.out.println("未找到事件处理器: " + event.getType());47 }48 }49 50 public void shutdown() {51 running = false;52 dispatcherThread.interrupt();53 }54 55 // 事件类型和处理器接口 (为简化示例仅展示骨架)56 enum EventType { USER_ACTION, SYSTEM_ALERT, DATA_CHANGE }57 58 static class Event {59 private final EventType type;60 private final Object data;61 62 public Event(EventType type, Object data) {63 this.type = type;64 this.data = data;65 }66 67 public EventType getType() { return type; }68 public Object getData() { return data; }69 }70 71 interface EventHandler {72 void handle(Event event);73 }74 75 // 具体处理器实现76 static class UserActionHandler implements EventHandler {77 @Override78 public void handle(Event event) {79 System.out.println("处理用户动作: " + event.getData());80 }81 }82 83 static class SystemAlertHandler implements EventHandler {84 @Override85 public void handle(Event event) {86 System.out.println("处理系统警报: " + event.getData());87 }88 }89 90 static class DataChangeHandler implements EventHandler {91 @Override92 public void handle(Event event) {93 System.out.println("处理数据变更: " + event.getData());94 }95 }96}6. 线程安全和最佳实践
6.1 线程安全问题
- 竞态条件问题
- 线程安全解决方案
java
1// 非线程安全的计数器2public class UnsafeCounter {3 private int count = 0;4 5 public void increment() {6 count++; // 非原子操作,可能导致竞态条件7 }8 9 public int getCount() {10 return count;11 }12 13 public static void main(String[] args) throws InterruptedException {14 UnsafeCounter counter = new UnsafeCounter();15 16 // 创建多个线程同时递增计数器17 Thread[] threads = new Thread[1000];18 for (int i = 0; i < threads.length; i++) {19 threads[i] = new Thread(() -> {20 for (int j = 0; j < 1000; j++) {21 counter.increment();22 }23 });24 threads[i].start();25 }26 27 // 等待所有线程完成28 for (Thread thread : threads) {29 thread.join();30 }31 32 // 理论上结果应该是1,000,000,但实际可能小于这个值33 System.out.println("计数器最终值: " + counter.getCount());34 }35}java
1import java.util.concurrent.atomic.AtomicInteger;23// 线程安全的计数器:使用synchronized4public class ThreadSafeCounter {5 private int syncCount = 0;6 private final AtomicInteger atomicCount = new AtomicInteger(0);7 8 // 方案1: 使用synchronized9 public synchronized void incrementSync() {10 syncCount++;11 }12 13 public synchronized int getSyncCount() {14 return syncCount;15 }16 17 // 方案2: 使用AtomicInteger18 public void incrementAtomic() {19 atomicCount.incrementAndGet();20 }21 22 public int getAtomicCount() {23 return atomicCount.get();24 }25 26 public static void main(String[] args) throws InterruptedException {27 ThreadSafeCounter counter = new ThreadSafeCounter();28 29 // 创建多个线程30 Thread[] threads = new Thread[1000];31 for (int i = 0; i < threads.length; i++) {32 threads[i] = new Thread(() -> {33 for (int j = 0; j < 1000; j++) {34 counter.incrementSync();35 counter.incrementAtomic();36 }37 });38 threads[i].start();39 }40 41 // 等待所有线程完成42 for (Thread thread : threads) {43 thread.join();44 }45 46 System.out.println("synchronized计数器: " + counter.getSyncCount());47 System.out.println("AtomicInteger计数器: " + counter.getAtomicCount());48 }49}6.2 线程安全性策略
确保线程安全的策略
- 不可变性:使用不可变对象避免状态变化
- 线程封闭:将数据限制在单个线程内使用
- 同步机制:
- 使用synchronized同步访问共享数据
- 使用显式锁(ReentrantLock等)管理访问
- volatile变量确保可见性
- 原子类避免复合操作问题
- 线程安全集合:使用并发容器(ConcurrentHashMap等)
- 安全发布:确保对象安全地在线程之间传递
6.3 线程池最佳实践
线程池使用建议
- 合理设置大小:线程池大小 = CPU核心数 * (1 + 等待时间/计算时间)
- 区分任务类型:CPU密集型任务池应小一些,IO密集型任务池可以大一些
- 使用工厂方法:使用ThreadFactory为线程池创建的线程设置有意义的名称
- 处理异常:捕获并处理提交任务内的所有异常
- 优雅关闭:使用shutdown()而非shutdownNow(),确保任务完成
- 监控线程池:跟踪活动线程数、完成任务数、队列大小等指标
6.4 避免死锁的建议
java
1public class DeadlockAvoidance {2 // 规则1: 按固定顺序获取锁3 public void correctResourceAccess(Object resource1, Object resource2) {4 // 总是先获取resource1,再获取resource25 synchronized (resource1) {6 System.out.println("获取第一个锁");7 synchronized (resource2) {8 System.out.println("获取第二个锁");9 // 使用资源...10 }11 }12 }13 14 // 规则2: 使用超时获取锁15 public boolean tryResourceAccess(Lock lock1, Lock lock2) {16 try {17 // 尝试获取锁,但不永远等待18 if (lock1.tryLock(1, TimeUnit.SECONDS)) {19 try {20 if (lock2.tryLock(1, TimeUnit.SECONDS)) {21 try {22 // 使用资源...23 return true;24 } finally {25 lock2.unlock();26 }27 }28 } finally {29 lock1.unlock();30 }31 }32 } catch (InterruptedException e) {33 Thread.currentThread().interrupt();34 }35 return false; // 未能获取所有锁36 }37 38 // 规则3: 避免在持有锁时调用外部方法39 public void avoidNestedLocking(Object lock) {40 synchronized (lock) {41 // 不要在这里调用可能获取其他锁的方法42 safeOperation(); // 只调用不获取锁的安全方法43 }44 }45 46 private void safeOperation() {47 // 不获取任何锁的操作48 }49}7. 总结
7.1 线程核心知识点总结
- 线程创建:继承Thread、实现Runnable、实现Callable
- 线程状态:NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING、TERMINATED
- 线程控制:start()、join()、sleep()、yield()、interrupt()
- 线程通信:wait()、notify()、notifyAll()
- 线程安全:同步、锁、原子变量、线程安全集合
- 应用场景:并行计算、异步任务、实时事件处理
7.2 学习路径与深入方向
7.3 性能与调试建议
- 适量创建线程:线程数不应远超CPU核心数
- 避免过度同步:仅在必要时进行同步
- 使用线程栈:利用线程栈快速定位问题
- 善用JVM参数:调整线程栈大小(
-Xss) - 使用线程转储:通过
jstack获取线程转储分析死锁 - 性能分析:使用JVisualVM、YourKit等工具监控线程性能
参与讨论