Skip to main content

Java Stream API完全指南

Java 8引入的Stream API是Java函数式编程的重要里程碑,它提供了一种声明式的方式来处理集合数据。Stream API不仅让代码更加简洁、可读,还支持并行处理,大大提升了Java处理数据的能力和开发效率。

核心价值

Stream API = 声明式编程 + 函数式风格 + 并行处理 + 流水线操作

  • 📝 声明式编程:关注做什么而非怎么做,提高代码可读性
  • λ 函数式风格:使用函数组合和表达式,减少副作用
  • 并行处理:简单切换并行执行,充分利用多核处理能力
  • 🔄 流水线操作:链式API支持多阶段处理,简化复杂数据转换
  • 🧩 内置操作:丰富的内置操作符,简化常见数据处理任务

1. Stream基础概念与原理

1.1 Stream核心概念

Stream不是数据结构,而是数据源的视图。它不存储数据,而是按需计算。Stream操作分为中间操作和终端操作。

Stream特性对比表

特性传统集合操作Stream API优势
执行方式立即执行惰性求值性能优化,避免不必要计算
数据修改可能修改原数据不修改原数据数据安全,函数式编程
并行处理需要手动实现内置并行支持简化并发编程
代码风格命令式声明式代码更简洁易读
链式调用不支持完全支持流畅的API体验
内存使用可能创建中间集合按需处理内存效率更高

Stream创建与基本操作

Stream创建方式完整示例
java
1import java.util.*;
2import java.util.stream.*;
3import java.nio.file.*;
4import java.io.IOException;
5
6public class StreamCreationDemo {
7 public static void main(String[] args) throws IOException {
8 // 1. 从集合创建Stream
9 List<String> languages = Arrays.asList("Java", "Python", "JavaScript", "C++", "Go");
10 Stream<String> streamFromList = languages.stream();
11 System.out.println("从List创建: " + streamFromList.collect(Collectors.toList()));
12
13 // 2. 从数组创建Stream
14 String[] array = {"Apple", "Banana", "Cherry", "Date"};
15 Stream<String> streamFromArray = Arrays.stream(array);
16 System.out.println("从数组创建: " + streamFromArray.collect(Collectors.toList()));
17
18 // 3. 使用Stream.of()创建
19 Stream<Integer> streamOf = Stream.of(1, 2, 3, 4, 5);
20 System.out.println("使用of()创建: " + streamOf.collect(Collectors.toList()));
21
22 // 4. 创建空Stream
23 Stream<String> emptyStream = Stream.empty();
24 System.out.println("空Stream元素数量: " + emptyStream.count());
25
26 // 5. 使用Stream.generate()创建无限流
27 Stream<Double> randomStream = Stream.generate(Math::random).limit(5);
28 System.out.println("随机数流: " + randomStream.collect(Collectors.toList()));
29
30 // 6. 使用Stream.iterate()创建
31 Stream<Integer> iterateStream = Stream.iterate(0, n -> n + 2).limit(10);
32 System.out.println("迭代流(偶数): " + iterateStream.collect(Collectors.toList()));
33
34 // 7. 从范围创建IntStream
35 IntStream rangeStream = IntStream.range(1, 6);
36 System.out.println("范围流: " + rangeStream.boxed().collect(Collectors.toList()));
37
38 // 8. 从文件创建Stream
39 try {
40 Stream<String> fileStream = Files.lines(Paths.get("example.txt"));
41 // 注意:实际使用时需要确保文件存在
42 // System.out.println("文件行数: " + fileStream.count());
43 } catch (Exception e) {
44 System.out.println("文件不存在,跳过文件流示例");
45 }
46
47 // 9. 从字符串创建字符流
48 IntStream charStream = "Hello World".chars();
49 System.out.println("字符流: " + charStream.mapToObj(c -> (char) c).collect(Collectors.toList()));
50
51 // 10. 并行流创建
52 Stream<String> parallelStream = languages.parallelStream();
53 System.out.println("并行流处理: " + parallelStream.map(String::toUpperCase).collect(Collectors.toList()));
54 }
55}

2. Stream中间操作详解

中间操作是Stream处理的核心,它们是惰性的,只有在遇到终端操作时才会执行。

过滤操作详解

过滤操作完整示例
java
1import java.util.*;
2import java.util.stream.*;
3
4public class FilterOperationsDemo {
5
6 static class Employee {
7 private String name;
8 private String department;
9 private int age;
10 private double salary;
11 private List<String> skills;
12
13 public Employee(String name, String department, int age, double salary, String... skills) {
14 this.name = name;
15 this.department = department;
16 this.age = age;
17 this.salary = salary;
18 this.skills = Arrays.asList(skills);
19 }
20
21 // getters
22 public String getName() { return name; }
23 public String getDepartment() { return department; }
24 public int getAge() { return age; }
25 public double getSalary() { return salary; }
26 public List<String> getSkills() { return skills; }
27
28 @Override
29 public String toString() {
30 return String.format("Employee{name='%s', dept='%s', age=%d, salary=%.0f, skills=%s}",
31 name, department, age, salary, skills);
32 }
33 }
34
35 public static void main(String[] args) {
36 List<Employee> employees = Arrays.asList(
37 new Employee("张三", "IT", 28, 8000, "Java", "Spring", "MySQL"),
38 new Employee("李四", "IT", 32, 12000, "Python", "Django", "PostgreSQL"),
39 new Employee("王五", "HR", 29, 6000, "招聘", "培训"),
40 new Employee("赵六", "Finance", 35, 9000, "会计", "审计"),
41 new Employee("钱七", "IT", 26, 7000, "JavaScript", "React", "Node.js"),
42 new Employee("孙八", "Marketing", 31, 7500, "市场分析", "广告策划"),
43 new Employee("周九", "IT", 40, 15000, "Java", "架构设计", "团队管理")
44 );
45
46 System.out.println("=== 原始员工数据 ===");
47 employees.forEach(System.out::println);
48
49 // 1. 基础过滤 - filter()
50 System.out.println("\n=== IT部门员工 ===");
51 List<Employee> itEmployees = employees.stream()
52 .filter(emp -> "IT".equals(emp.getDepartment()))
53 .collect(Collectors.toList());
54 itEmployees.forEach(System.out::println);
55
56 // 2. 复合条件过滤
57 System.out.println("\n=== IT部门且薪资>8000的员工 ===");
58 List<Employee> highSalaryIT = employees.stream()
59 .filter(emp -> "IT".equals(emp.getDepartment()))
60 .filter(emp -> emp.getSalary() > 8000)
61 .collect(Collectors.toList());
62 highSalaryIT.forEach(System.out::println);
63
64 // 3. 使用Predicate组合
65 System.out.println("\n=== 使用Predicate组合条件 ===");
66 Predicate<Employee> isIT = emp -> "IT".equals(emp.getDepartment());
67 Predicate<Employee> isHighSalary = emp -> emp.getSalary() > 10000;
68 Predicate<Employee> isYoung = emp -> emp.getAge() < 30;
69
70 // 年轻的高薪IT员工
71 List<Employee> youngHighSalaryIT = employees.stream()
72 .filter(isIT.and(isHighSalary).and(isYoung))
73 .collect(Collectors.toList());
74 System.out.println("年轻高薪IT员工: " + youngHighSalaryIT);
75
76 // IT或高薪员工
77 List<Employee> itOrHighSalary = employees.stream()
78 .filter(isIT.or(isHighSalary))
79 .collect(Collectors.toList());
80 System.out.println("IT或高薪员工数量: " + itOrHighSalary.size());
81
82 // 4. 基于集合属性过滤
83 System.out.println("\n=== 掌握Java技能的员工 ===");
84 List<Employee> javaEmployees = employees.stream()
85 .filter(emp -> emp.getSkills().contains("Java"))
86 .collect(Collectors.toList());
87 javaEmployees.forEach(System.out::println);
88
89 // 5. 空值过滤
90 List<String> namesWithNulls = Arrays.asList("Alice", null, "Bob", "", "Charlie", null);
91 System.out.println("\n=== 过滤空值和空字符串 ===");
92 List<String> validNames = namesWithNulls.stream()
93 .filter(Objects::nonNull)
94 .filter(name -> !name.trim().isEmpty())
95 .collect(Collectors.toList());
96 System.out.println("有效姓名: " + validNames);
97
98 // 6. 范围过滤
99 System.out.println("\n=== 年龄在25-35之间的员工 ===");
100 List<Employee> ageRangeEmployees = employees.stream()
101 .filter(emp -> emp.getAge() >= 25 && emp.getAge() <= 35)
102 .collect(Collectors.toList());
103 ageRangeEmployees.forEach(emp ->
104 System.out.println(emp.getName() + " - 年龄: " + emp.getAge()));
105
106 // 7. 自定义复杂过滤逻辑
107 System.out.println("\n=== 资深员工(年龄>30且薪资>平均薪资) ===");
108 double avgSalary = employees.stream()
109 .mapToDouble(Employee::getSalary)
110 .average()
111 .orElse(0.0);
112
113 System.out.println("平均薪资: " + String.format("%.2f", avgSalary));
114
115 List<Employee> seniorEmployees = employees.stream()
116 .filter(emp -> emp.getAge() > 30 && emp.getSalary() > avgSalary)
117 .collect(Collectors.toList());
118 seniorEmployees.forEach(System.out::println);
119 }
120}

3. Stream终端操作详解

终端操作会触发流的处理并产生结果。一旦执行了终端操作,流就被消费了,不能再次使用。

  1. 并行流与性能优化

并行流是Stream API的重要特性,可以充分利用多核处理器的优势。

并行流使用注意事项
  1. 数据量要足够大:小数据集使用并行流可能反而降低性能
  2. 避免有状态操作:如sorted、distinct等操作会影响并行性能
  3. 线程安全:确保Lambda表达式中的操作是线程安全的
  4. I/O密集型任务:不适合使用并行流,应该使用异步处理

并行流基础操作

并行流基础示例
java
1import java.util.*;
2import java.util.concurrent.*;
3import java.util.stream.*;
4
5public class ParallelStreamBasics {
6
7 public static void main(String[] args) {
8 // 创建大量测试数据
9 List<Integer> numbers = IntStream.rangeClosed(1, 1_000_000)
10 .boxed()
11 .collect(Collectors.toList());
12
13 System.out.println("数据量: " + numbers.size());
14 System.out.println("可用处理器数量: " + Runtime.getRuntime().availableProcessors());
15
16 // 1. 串行流 vs 并行流性能对比
17 System.out.println("\n=== 性能对比:计算平方和 ===");
18
19 // 串行流
20 long startTime = System.currentTimeMillis();
21 long serialSum = numbers.stream()
22 .mapToLong(n -> (long) n * n)
23 .sum();
24 long serialTime = System.currentTimeMillis() - startTime;
25
26 // 并行流
27 startTime = System.currentTimeMillis();
28 long parallelSum = numbers.parallelStream()
29 .mapToLong(n -> (long) n * n)
30 .sum();
31 long parallelTime = System.currentTimeMillis() - startTime;
32
33 System.out.println("串行结果: " + serialSum + ", 耗时: " + serialTime + "ms");
34 System.out.println("并行结果: " + parallelSum + ", 耗时: " + parallelTime + "ms");
35 System.out.println("性能提升: " + String.format("%.2fx", (double) serialTime / parallelTime));
36
37 // 2. 并行流创建方式
38 System.out.println("\n=== 并行流创建方式 ===");
39
40 // 方式1: 从集合创建并行流
41 Stream<Integer> parallelStream1 = numbers.parallelStream();
42
43 // 方式2: 将串行流转换为并行流
44 Stream<Integer> parallelStream2 = numbers.stream().parallel();
45
46 // 方式3: 直接创建并行流
47 IntStream parallelIntStream = IntStream.range(1, 1000).parallel();
48
49 System.out.println("并行流1是否并行: " + parallelStream1.isParallel());
50 System.out.println("并行流2是否并行: " + parallelStream2.isParallel());
51 System.out.println("并行IntStream是否并行: " + parallelIntStream.isParallel());
52
53 // 3. 并行流转串行流
54 Stream<Integer> sequentialStream = numbers.parallelStream().sequential();
55 System.out.println("转换后是否并行: " + sequentialStream.isParallel());
56
57 // 4. 复杂计算的并行处理
58 System.out.println("\n=== 复杂计算并行处理 ===");
59
60 // 模拟复杂计算函数
61 Function<Integer, Double> complexCalculation = n -> {
62 double result = 0;
63 for (int i = 1; i <= 1000; i++) {
64 result += Math.sin(n * i) * Math.cos(n * i);
65 }
66 return result;
67 };
68
69 List<Integer> smallDataset = IntStream.rangeClosed(1, 1000)
70 .boxed()
71 .collect(Collectors.toList());
72
73 // 串行复杂计算
74 startTime = System.currentTimeMillis();
75 double serialComplexSum = smallDataset.stream()
76 .mapToDouble(complexCalculation::apply)
77 .sum();
78 long serialComplexTime = System.currentTimeMillis() - startTime;
79
80 // 并行复杂计算
81 startTime = System.currentTimeMillis();
82 double parallelComplexSum = smallDataset.parallelStream()
83 .mapToDouble(complexCalculation::apply)
84 .sum();
85 long parallelComplexTime = System.currentTimeMillis() - startTime;
86
87 System.out.println("串行复杂计算: " + String.format("%.2f", serialComplexSum) +
88 ", 耗时: " + serialComplexTime + "ms");
89 System.out.println("并行复杂计算: " + String.format("%.2f", parallelComplexSum) +
90 ", 耗时: " + parallelComplexTime + "ms");
91 System.out.println("复杂计算性能提升: " +
92 String.format("%.2fx", (double) serialComplexTime / parallelComplexTime));
93 }
94}

5. Stream最佳实践与设计模式

掌握Stream的最佳实践和常见设计模式,能够写出更优雅、高效的代码。

最佳实践原则
  1. 优先使用Stream API:相比传统循环,Stream更简洁易读
  2. 合理选择并行流:只在数据量大且CPU密集时使用
  3. 避免副作用:Lambda表达式应该是纯函数
  4. 使用方法引用:提高代码可读性
  5. 链式调用要适度:过长的链式调用影响可读性

通过掌握Stream API的这些核心概念和最佳实践,你可以编写出更加简洁、高效、易维护的Java代码,充分发挥函数式编程的优势。

6. 面试题精选

6.1 什么是Stream API?它有什么优势?

答案: Stream API是Java 8引入的处理集合的API,它允许以声明式方式处理数据集合,支持函数式编程范式。

Stream API的主要优势:

  1. 声明式编程:关注"做什么"而非"怎么做",代码更加简洁、可读
  2. 函数式风格:支持Lambda表达式和方法引用,减少样板代码
  3. 并行处理:轻松切换并行/串行执行,充分利用多核处理能力
  4. 惰性求值:中间操作延迟执行,只有在终端操作时才会执行,提高性能
  5. 链式操作:支持多步骤流水线处理,API设计流畅
  6. 内置丰富操作:提供filter、map、reduce等多种数据处理操作
  7. 无副作用:不修改数据源,符合函数式编程原则

6.2 Stream的中间操作和终端操作有什么区别?常用的操作有哪些?

答案:

  • 中间操作(Intermediate Operations):返回一个新的Stream,可以链式调用多个中间操作,不会立即执行
  • 终端操作(Terminal Operations):触发实际计算,返回一个非Stream的结果,执行后Stream不能再被使用

常用中间操作:

  • filter(Predicate): 过滤元素
  • map(Function): 转换元素
  • flatMap(Function): 扁平化嵌套集合
  • distinct(): 去除重复
  • sorted(): 排序
  • peek(Consumer): 查看元素(调试)
  • limit(n): 限制元素数量
  • skip(n): 跳过元素

常用终端操作:

  • collect(Collector): 收集结果到容器
  • forEach(Consumer): 遍历每个元素
  • reduce(BinaryOperator): 归约操作
  • count(): 计数
  • anyMatch(Predicate): 是否存在匹配元素
  • allMatch(Predicate): 是否全部匹配
  • noneMatch(Predicate): 是否全部不匹配
  • findFirst()/findAny(): 查找元素
  • min()/max(): 查找最小/最大值
java
1// 中间操作示例
2List<String> result = names.stream() // 创建Stream
3 .filter(name -> name.length() > 5) // 中间操作1
4 .map(String::toUpperCase) // 中间操作2
5 .sorted() // 中间操作3
6 .collect(Collectors.toList()); // 终端操作

6.3 Stream与Collection的区别是什么?

答案: Stream与Collection的主要区别:

特性StreamCollection
目的处理数据存储数据
数据存储不存储数据存储数据
执行方式惰性计算立即计算
消费性只能遍历一次可以多次遍历
数据修改不修改数据源可以修改数据
并行处理内置支持并行需手动实现并行
无限大小可以表示无限序列总是有限大小

6.4 如何正确使用并行流(parallelStream)?什么情况下应该避免使用?

答案:

正确使用并行流的原则:

  1. 数据量足够大:对于小数据集,并行处理的开销可能超过收益
  2. 操作足够重:CPU密集型操作更适合并行处理
  3. 数据结构易拆分:ArrayList、数组分解高效,LinkedList分解效率低
  4. 避免共享可变状态:并行操作中避免修改共享变量
  5. 合理设置线程池大小:根据CPU核心数和任务特性调整并行度
java
1// 并行流示例
2long sum = IntStream.rangeClosed(1, 10_000_000)
3 .parallel() // 切换到并行模式
4 .filter(n -> n % 2 == 0)
5 .mapToLong(n -> n * n)
6 .sum();
7
8// 自定义并行度
9System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");

避免使用并行流的场景:

  1. I/O绑定操作:如文件读写、网络操作
  2. 顺序敏感操作:结果依赖于元素处理顺序
  3. 很小的数据量:并行开销可能大于收益(通常低于1万元素)
  4. 不可分割的数据源:如LinkedList等难以有效分割的结构
  5. 使用非线程安全的操作或收集器:如普通ArrayList::add

6.5 请解释Stream中的Collector收集器,并列举几个常用的收集器。

答案: Collector是一种终端操作,用于将Stream中的元素累积到结果容器中,如List、Map等。Collectors类提供了多种预定义的收集器。

常用收集器:

  1. 转换为集合
java
1// 转为List
2List<String> list = stream.collect(Collectors.toList());
3// 转为Set
4Set<String> set = stream.collect(Collectors.toSet());
5// 转为特定集合
6TreeSet<String> treeSet = stream.collect(Collectors.toCollection(TreeSet::new));
  1. 字符串连接
java
1// 简单连接
2String joined = stream.collect(Collectors.joining());
3// 带分隔符
4String joined = stream.collect(Collectors.joining(", "));
5// 带前缀和后缀
6String joined = stream.collect(Collectors.joining(", ", "[", "]"));
  1. 分组和分区
java
1// 按属性分组
2Map<Department, List<Employee>> byDept = employees.stream()
3 .collect(Collectors.groupingBy(Employee::getDepartment));
4// 多级分组
5Map<Department, Map<EmployeeType, List<Employee>>> byDeptAndType = employees.stream()
6 .collect(Collectors.groupingBy(Employee::getDepartment,
7 Collectors.groupingBy(Employee::getType)));
8// 分区(特殊分组)
9Map<Boolean, List<Employee>> partitioned = employees.stream()
10 .collect(Collectors.partitioningBy(emp -> emp.getSalary() > 50000));
  1. 统计和汇总
java
1// 求和
2int total = stream.collect(Collectors.summingInt(User::getAge));
3// 平均值
4double avg = stream.collect(Collectors.averagingInt(User::getAge));
5// 汇总统计
6IntSummaryStatistics stats = stream.collect(Collectors.summarizingInt(User::getAge));
  1. 归约和映射
java
1// 归约(类似reduce)
2Optional<User> oldest = users.stream()
3 .collect(Collectors.reducing((u1, u2) -> u1.getAge() > u2.getAge() ? u1 : u2));
4// 映射后收集
5Map<Integer, String> idToName = users.stream()
6 .collect(Collectors.toMap(User::getId, User::getName));

自定义收集器:可以通过实现Collector接口或使用Collector.of()方法创建自定义收集器,满足特殊需求。

参与讨论