Skip to main content

Spark技术详解

Apache Spark是一个快速、通用的大规模数据处理引擎,它提供了内存计算能力,使得大数据处理速度比传统MapReduce快10-100倍。Spark支持批处理、交互式查询、流处理和机器学习等多种计算范式。

核心价值

Apache Spark = 内存计算 + 统一平台 + 高性能 + 易用性 + 生态系统

  • 🚀 内存计算:数据存储在内存中,计算速度大幅提升
  • 👨‍💻 统一平台:支持批处理、流处理、机器学习等多种计算模式
  • 🔍 高性能:比MapReduce快10-100倍,支持复杂的迭代算法
  • 🔗 易用性:提供Java、Scala、Python、R等多种API
  • 📚 生态系统:Spark SQL、Spark Streaming、MLlib、GraphX等组件

1. Spark核心概念

1.1 Spark架构

Spark采用主从架构,主要包含以下组件:

1.2 Spark核心抽象

Spark核心抽象

  1. RDD:弹性分布式数据集,不可变的分布式对象集合
  2. DataFrame:基于RDD的分布式数据表,类似关系型数据库表
  3. Dataset:类型安全的DataFrame,结合了RDD和DataFrame的优点
  4. SparkContext:Spark应用的入口点,负责与集群通信

1.2.1 Spark 3.x新特性

1.2.2 RDD依赖关系管理

RDD依赖关系示例
java
1public class RDDDependencyExample {
2 public void demonstrateDependencies(JavaSparkContext sc) {
3 // 1. 窄依赖 - 一对一依赖
4 JavaRDD<String> lines = sc.textFile("input.txt");
5 JavaRDD<Integer> lengths = lines.map(String::length); // 窄依赖
6
7 // 2. 宽依赖 - Shuffle依赖
8 JavaPairRDD<String, Integer> pairs = lines.mapToPair(line ->
9 new Tuple2<>(line.split(" ")[0], 1));
10 JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b); // 宽依赖
11
12 // 3. 检查依赖类型
13 System.out.println("Lines RDD dependencies: " + lines.dependencies());
14 System.out.println("Lengths RDD dependencies: " + lengths.dependencies());
15 System.out.println("Counts RDD dependencies: " + counts.dependencies());
16
17 // 4. 优化建议
18 if (hasWideDependency(counts)) {
19 System.out.println("Warning: Wide dependency detected. Consider repartitioning.");
20 }
21 }
22
23 private boolean hasWideDependency(JavaPairRDD<String, Integer> rdd) {
24 return rdd.dependencies().stream()
25 .anyMatch(dep -> dep instanceof ShuffleDependency);
26 }
27}

RDD特性

RDD特性示例
java
1public class RDDFeatures {
2 public static void main(String[] args) {
3 // 1. 弹性(Resilient)
4 System.out.println("RDD具有容错能力,可以从失败中恢复");
5
6 // 2. 分布式(Distributed)
7 System.out.println("RDD数据分布在集群的多个节点上");
8
9 // 3. 数据集(Dataset)
10 System.out.println("RDD是数据集合,支持多种数据类型");
11
12 // 4. 不可变性(Immutable)
13 System.out.println("RDD一旦创建就不能修改,只能通过转换生成新的RDD");
14
15 // 5. 延迟计算(Lazy Evaluation)
16 System.out.println("RDD转换操作是延迟的,只有遇到动作操作时才执行");
17 }
18}

2. Spark编程模型

2.1 RDD操作类型

Spark RDD支持两种类型的操作:

2.2 基本RDD操作

RDD转换操作示例
java
1public class RDDTransformations {
2 public void demonstrateTransformations(JavaRDD<String> lines) {
3 // 1. map - 一对一转换
4 JavaRDD<Integer> lengths = lines.map(String::length);
5
6 // 2. filter - 过滤数据
7 JavaRDD<String> longLines = lines.filter(line -> line.length() > 100);
8
9 // 3. flatMap - 一对多转换
10 JavaRDD<String> words = lines.flatMap(line ->
11 Arrays.asList(line.split(" ")).iterator());
12
13 // 4. distinct - 去重
14 JavaRDD<String> uniqueWords = words.distinct();
15
16 // 5. sample - 采样
17 JavaRDD<String> sampledLines = lines.sample(false, 0.1);
18 }
19}

3. Spark SQL和DataFrame

3.1 DataFrame概念

DataFrame是Spark中处理结构化数据的核心抽象:

3.1.1 Catalyst优化器工作原理

3.1.2 自适应查询执行(AQE)

AQE优化示例
java
1public class AQEExample {
2 public void demonstrateAQE(SparkSession spark) {
3 // 启用AQE
4 spark.conf().set("spark.sql.adaptive.enabled", "true");
5 spark.conf().set("spark.sql.adaptive.coalescePartitions.enabled", "true");
6 spark.conf().set("spark.sql.adaptive.skewJoin.enabled", "true");
7 spark.conf().set("spark.sql.adaptive.localShuffleReader.enabled", "true");
8
9 // 创建测试数据
10 Dataset<Row> users = spark.createDataFrame(Arrays.asList(
11 RowFactory.create("user1", "Alice", 25, "Engineer"),
12 RowFactory.create("user2", "Bob", 30, "Manager"),
13 RowFactory.create("user3", "Charlie", 35, "Director")
14 ), new StructType()
15 .add("id", DataTypes.StringType)
16 .add("name", DataTypes.StringType)
17 .add("age", DataTypes.IntegerType)
18 .add("job", DataTypes.StringType));
19
20 Dataset<Row> orders = spark.createDataFrame(Arrays.asList(
21 RowFactory.create("order1", "user1", 100.0),
22 RowFactory.create("order2", "user2", 200.0),
23 RowFactory.create("order3", "user1", 150.0)
24 ), new StructType()
25 .add("orderId", DataTypes.StringType)
26 .add("userId", DataTypes.StringType)
27 .add("amount", DataTypes.DoubleType));
28
29 // 复杂查询 - AQE会自动优化
30 Dataset<Row> result = users.join(orders, users.col("id").equalTo(orders.col("userId")))
31 .groupBy("job")
32 .agg(functions.avg("amount").as("avg_amount"))
33 .filter(col("avg_amount").gt(100));
34
35 result.explain(true); // 显示优化后的执行计划
36 result.show();
37 }
38}

3.2 DataFrame操作示例

DataFrame操作示例
java
1public class DataFrameOperations {
2 public void demonstrateDataFrameOperations(SparkSession spark) {
3 // 1. 创建DataFrame
4 List<Row> data = Arrays.asList(
5 RowFactory.create("Alice", 25, "Engineer"),
6 RowFactory.create("Bob", 30, "Manager"),
7 RowFactory.create("Charlie", 35, "Director")
8 );
9
10 StructType schema = new StructType()
11 .add("name", DataTypes.StringType)
12 .add("age", DataTypes.IntegerType)
13 .add("job", DataTypes.StringType);
14
15 Dataset<Row> df = spark.createDataFrame(data, schema);
16
17 // 2. 显示数据
18 df.show();
19
20 // 3. 过滤数据
21 Dataset<Row> youngPeople = df.filter(col("age").lt(30));
22
23 // 4. 选择列
24 Dataset<Row> namesAndAges = df.select("name", "age");
25
26 // 5. 分组聚合
27 Dataset<Row> jobCounts = df.groupBy("job").count();
28
29 // 6. SQL查询
30 df.createOrReplaceTempView("people");
31 Dataset<Row> sqlResult = spark.sql(
32 "SELECT job, AVG(age) as avg_age FROM people GROUP BY job"
33 );
34 }
35}
DataFrame优势

DataFrame提供了类似SQL的查询接口,支持优化执行,比RDD操作性能更好,特别适合结构化数据处理。

4. Spark Streaming

4.1 流处理架构

Spark Streaming将流式计算分解为一系列小批量的批处理作业:

4.1.1 结构化流处理(Structured Streaming)

4.1.2 结构化流处理示例

结构化流处理示例
java
1public class StructuredStreamingExample {
2 public void buildStructuredStreaming(SparkSession spark) {
3 // 1. 从Kafka读取流数据
4 Dataset<Row> streamDF = spark
5 .readStream()
6 .format("kafka")
7 .option("kafka.bootstrap.servers", "localhost:9092")
8 .option("subscribe", "user-events")
9 .option("startingOffsets", "latest")
10 .load();
11
12 // 2. 解析JSON数据
13 Dataset<Row> parsedDF = streamDF
14 .selectExpr("CAST(value AS STRING) as json")
15 .select(functions.from_json(col("json"), getUserEventSchema()).as("data"))
16 .select("data.*");
17
18 // 3. 事件时间处理和水印
19 Dataset<Row> withWatermark = parsedDF
20 .withWatermark("timestamp", "10 minutes")
21 .groupBy(
22 functions.window(col("timestamp"), "5 minutes"),
23 col("userId")
24 )
25 .agg(
26 functions.count("*").as("event_count"),
27 functions.avg("amount").as("avg_amount")
28 );
29
30 // 4. 输出到控制台
31 StreamingQuery query = withWatermark
32 .writeStream()
33 .outputMode("append")
34 .format("console")
35 .option("truncate", false)
36 .start();
37
38 // 5. 等待查询终止
39 query.awaitTermination();
40 }
41
42 private StructType getUserEventSchema() {
43 return new StructType()
44 .add("userId", DataTypes.StringType)
45 .add("eventType", DataTypes.StringType)
46 .add("amount", DataTypes.DoubleType)
47 .add("timestamp", DataTypes.TimestampType);
48 }
49}
50
51// 有状态流处理示例
52public class StatefulStreamingExample {
53 public void buildStatefulStreaming(SparkSession spark) {
54 // 1. 从Kafka读取用户行为流
55 Dataset<Row> userBehaviorStream = spark
56 .readStream()
57 .format("kafka")
58 .option("kafka.bootstrap.servers", "localhost:9092")
59 .option("subscribe", "user-behavior")
60 .load();
61
62 // 2. 解析用户行为
63 Dataset<Row> behaviorDF = userBehaviorStream
64 .selectExpr("CAST(value AS STRING) as json")
65 .select(functions.from_json(col("json"), getBehaviorSchema()).as("data"))
66 .select("data.*");
67
68 // 3. 有状态聚合 - 用户会话统计
69 Dataset<Row> sessionStats = behaviorDF
70 .withWatermark("timestamp", "1 hour")
71 .groupByKey((MapFunction<Row, String>) row -> row.getAs("userId"), Encoders.STRING())
72 .flatMapGroupsWithState(
73 new UserSessionAggregator(),
74 OutputMode.Append(),
75 Encoders.bean(UserSession.class),
76 Encoders.bean(UserSession.class),
77 GroupStateTimeout.ProcessingTimeTimeout()
78 );
79
80 // 4. 输出结果
81 StreamingQuery query = sessionStats
82 .writeStream()
83 .outputMode("append")
84 .format("console")
85 .start();
86
87 query.awaitTermination();
88 }
89}
90
91// 用户会话聚合器
92class UserSessionAggregator implements FlatMapGroupsWithStateFunction<String, Row, UserSession, UserSession> {
93 @Override
94 public Iterator<UserSession> call(String userId, Iterator<Row> events, GroupState<UserSession> state) {
95 List<UserSession> results = new ArrayList<>();
96
97 // 获取当前状态
98 UserSession currentSession = state.exists() ? state.get() : new UserSession(userId);
99
100 // 处理事件
101 while (events.hasNext()) {
102 Row event = events.next();
103 currentSession.updateSession(event);
104 }
105
106 // 更新状态
107 state.update(currentSession);
108
109 // 如果会话完成,输出结果
110 if (currentSession.isCompleted()) {
111 results.add(currentSession);
112 state.remove();
113 }
114
115 return results.iterator();
116 }
117}

4.2 流处理示例

基础流处理示例
java
1public class BasicStreaming {
2 public void processStream(SparkSession spark) {
3 // 创建StreamingContext
4 JavaStreamingContext ssc = new JavaStreamingContext(
5 spark.sparkContext(), Durations.seconds(5));
6
7 // 创建DStream
8 JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
9 "localhost", 9999);
10
11 // 处理数据流
12 JavaDStream<String> words = lines.flatMap(line ->
13 Arrays.asList(line.split(" ")).iterator());
14
15 JavaPairDStream<String, Integer> wordCounts = words
16 .mapToPair(word -> new Tuple2<>(word, 1))
17 .reduceByKey((a, b) -> a + b);
18
19 // 输出结果
20 wordCounts.print();
21
22 // 启动流处理
23 ssc.start();
24 ssc.awaitTermination();
25 }
26}

5. Spark MLlib机器学习

5.1 MLlib组件

MLlib是Spark的机器学习库,提供了丰富的算法和工具:

5.1.1 MLlib算法分类

5.1.2 特征工程Pipeline示例

特征工程Pipeline示例
java
1public class FeatureEngineeringPipeline {
2 public PipelineModel buildFeaturePipeline(SparkSession spark) {
3 // 1. 字符串索引化
4 StringIndexer stringIndexer = new StringIndexer()
5 .setInputCol("category")
6 .setOutputCol("categoryIndex")
7 .setHandleInvalid("skip");
8
9 // 2. 独热编码
10 OneHotEncoder oneHotEncoder = new OneHotEncoder()
11 .setInputCol("categoryIndex")
12 .setOutputCol("categoryOneHot");
13
14 // 3. 数值特征标准化
15 StandardScaler standardScaler = new StandardScaler()
16 .setInputCol("numericalFeatures")
17 .setOutputCol("scaledFeatures")
18 .setWithStd(true)
19 .setWithMean(true);
20
21 // 4. 特征向量组装
22 VectorAssembler assembler = new VectorAssembler()
23 .setInputCols(new String[]{"categoryOneHot", "scaledFeatures", "otherFeatures"})
24 .setOutputCol("features");
25
26 // 5. 特征选择
27 ChiSqSelector featureSelector = new ChiSqSelector()
28 .setNumTopFeatures(20)
29 .setFeaturesCol("features")
30 .setLabelCol("label")
31 .setOutputCol("selectedFeatures");
32
33 // 6. 构建Pipeline
34 Pipeline pipeline = new Pipeline()
35 .setStages(new PipelineStage[]{
36 stringIndexer, oneHotEncoder, standardScaler, assembler, featureSelector
37 });
38
39 return pipeline;
40 }
41}
42
43// 完整的机器学习Pipeline示例
44public class CompleteMLPipeline {
45 public void buildCompletePipeline(SparkSession spark, Dataset<Row> trainingData) {
46 // 1. 特征工程Pipeline
47 FeatureEngineeringPipeline featurePipeline = new FeatureEngineeringPipeline();
48 PipelineModel featureModel = featurePipeline.buildFeaturePipeline(spark);
49
50 // 2. 机器学习算法
51 RandomForestClassifier classifier = new RandomForestClassifier()
52 .setLabelCol("label")
53 .setFeaturesCol("selectedFeatures")
54 .setNumTrees(100)
55 .setMaxDepth(10)
56 .setMaxBins(32)
57 .setSeed(42);
58
59 // 3. 模型评估器
60 MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
61 .setLabelCol("label")
62 .setPredictionCol("prediction")
63 .setMetricName("accuracy");
64
65 // 4. 交叉验证
66 CrossValidator crossValidator = new CrossValidator()
67 .setEstimator(new Pipeline().setStages(new PipelineStage[]{
68 featureModel, classifier
69 }))
70 .setEvaluator(evaluator)
71 .setNumFolds(5)
72 .setParallelism(2);
73
74 // 5. 训练模型
75 CrossValidatorModel cvModel = crossValidator.fit(trainingData);
76
77 // 6. 模型评估
78 Dataset<Row> predictions = cvModel.transform(trainingData);
79 double accuracy = evaluator.evaluate(predictions);
80 System.out.println("Cross-validation accuracy: " + accuracy);
81
82 // 7. 保存模型
83 cvModel.save("models/random_forest_model");
84
85 // 8. 特征重要性分析
86 PipelineModel bestModel = (PipelineModel) cvModel.bestModel();
87 RandomForestClassificationModel rfModel = (RandomForestClassificationModel)
88 bestModel.stages()[bestModel.stages().length - 1];
89
90 System.out.println("Feature importances:");
91 double[] importances = rfModel.featureImportances().toArray();
92 for (int i = 0; i < importances.length; i++) {
93 System.out.println("Feature " + i + ": " + importances[i]);
94 }
95 }
96}

5.2 机器学习示例

机器学习Pipeline示例
java
1public class MLPipelineExample {
2 public void buildMLPipeline(SparkSession spark, Dataset<Row> data) {
3 // 1. 特征工程
4 StringIndexer indexer = new StringIndexer()
5 .setInputCol("category")
6 .setOutputCol("categoryIndex");
7
8 VectorAssembler assembler = new VectorAssembler()
9 .setInputCols(new String[]{"feature1", "feature2", "categoryIndex"})
10 .setOutputCol("features");
11
12 // 2. 机器学习算法
13 RandomForestClassifier classifier = new RandomForestClassifier()
14 .setLabelCol("label")
15 .setFeaturesCol("features")
16 .setNumTrees(10);
17
18 // 3. 构建Pipeline
19 Pipeline pipeline = new Pipeline()
20 .setStages(new PipelineStage[]{indexer, assembler, classifier});
21
22 // 4. 训练模型
23 PipelineModel model = pipeline.fit(data);
24
25 // 5. 预测
26 Dataset<Row> predictions = model.transform(data);
27 predictions.show();
28
29 // 6. 模型评估
30 MulticlassClassificationEvaluator evaluator =
31 new MulticlassClassificationEvaluator()
32 .setLabelCol("label")
33 .setPredictionCol("prediction")
34 .setMetricName("accuracy");
35
36 double accuracy = evaluator.evaluate(predictions);
37 System.out.println("Accuracy: " + accuracy);
38 }
39}
MLlib优势

MLlib提供了分布式机器学习算法,支持大规模数据处理,与Spark生态系统无缝集成。

6. Spark性能优化

6.1 内存管理

Spark内存管理是性能优化的关键:

6.1.1 内存管理架构

6.1.2 内存调优策略

内存调优示例
java
1public class SparkMemoryOptimization {
2 public void optimizeMemory(SparkConf conf) {
3 // 1. 执行内存配置
4 conf.set("spark.executor.memory", "8g");
5 conf.set("spark.executor.memoryOverhead", "2g"); // 堆外内存
6 conf.set("spark.memory.fraction", "0.8"); // 执行和存储内存占比
7 conf.set("spark.memory.storageFraction", "0.3"); // 存储内存占比
8
9 // 2. 序列化配置
10 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
11 conf.set("spark.kryo.registrationRequired", "false");
12 conf.set("spark.kryo.registrator", "com.example.MyKryoRegistrator");
13
14 // 3. 压缩配置
15 conf.set("spark.sql.inMemoryColumnarStorage.compressed", "true");
16 conf.set("spark.sql.inMemoryColumnarStorage.batchSize", "10000");
17
18 // 4. 广播变量配置
19 conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760"); // 10MB
20
21 // 5. 动态分配配置
22 conf.set("spark.dynamicAllocation.enabled", "true");
23 conf.set("spark.dynamicAllocation.minExecutors", "2");
24 conf.set("spark.dynamicAllocation.maxExecutors", "20");
25 conf.set("spark.dynamicAllocation.initialExecutors", "5");
26
27 System.out.println("Memory optimization configured");
28 }
29
30 public void optimizeDataStructures(JavaRDD<String> data) {
31 // 1. 使用广播变量减少数据传输
32 List<String> stopWords = Arrays.asList("the", "a", "an", "and", "or", "but");
33 Broadcast<List<String>> stopWordsBroadcast = data.context().broadcast(stopWords,
34 ClassTag$.MODULE$.apply(List.class));
35
36 // 2. 使用累加器进行计数
37 Accumulator<Integer> totalWords = data.context().accumulator(0, "TotalWords");
38 Accumulator<Integer> filteredWords = data.context().accumulator(0, "FilteredWords");
39
40 // 3. 优化RDD操作
41 JavaRDD<String> optimizedData = data
42 .mapPartitions(iterator -> {
43 List<String> batch = new ArrayList<>();
44 while (iterator.hasNext()) {
45 String line = iterator.next();
46 if (!stopWordsBroadcast.value().contains(line.toLowerCase())) {
47 batch.add(line);
48 filteredWords.add(1);
49 }
50 totalWords.add(1);
51 }
52 return batch.iterator();
53 })
54 .cache(); // 缓存中间结果
55
56 // 4. 使用mapPartitions减少函数调用开销
57 JavaRDD<String> processedData = optimizedData.mapPartitions(iterator -> {
58 List<String> results = new ArrayList<>();
59 while (iterator.hasNext()) {
60 String word = iterator.next();
61 results.add(word.toUpperCase());
62 }
63 return results.iterator();
64 });
65
66 System.out.println("Total words: " + totalWords.value());
67 System.out.println("Filtered words: " + filteredWords.value());
68 }
69}
70
71// 自定义Kryo序列化注册器
72public class MyKryoRegistrator implements KryoRegistrator {
73 @Override
74 public void registerClasses(Kryo kryo) {
75 // 注册自定义类
76 kryo.register(User.class);
77 kryo.register(Product.class);
78 kryo.register(Order.class);
79
80 // 注册集合类
81 kryo.register(ArrayList.class);
82 kryo.register(HashMap.class);
83 kryo.register(HashSet.class);
84 }
85}

6.2 优化策略

java
1// 1. 合理设置内存配置
2SparkConf conf = new SparkConf()
3 .set("spark.executor.memory", "8g")
4 .set("spark.storage.memoryFraction", "0.6")
5 .set("spark.sql.adaptive.enabled", "true");

7. Spark部署和配置

7.1 部署模式

Spark支持多种部署模式:

7.1.1 Kubernetes部署架构

7.1.2 云原生部署配置

Kubernetes部署配置示例
java
1public class KubernetesDeployment {
2 public void configureKubernetesDeployment(SparkConf conf) {
3 // 1. Kubernetes配置
4 conf.set("spark.master", "k8s://https://kubernetes.default.svc");
5 conf.set("spark.kubernetes.container.image", "spark:3.4.0");
6 conf.set("spark.kubernetes.namespace", "spark-jobs");
7
8 // 2. 资源配置
9 conf.set("spark.executor.instances", "5");
10 conf.set("spark.executor.memory", "4g");
11 conf.set("spark.executor.cores", "2");
12 conf.set("spark.driver.memory", "2g");
13 conf.set("spark.driver.cores", "1");
14
15 // 3. 存储配置
16 conf.set("spark.kubernetes.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName", "spark-local-dir-1");
17 conf.set("spark.kubernetes.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path", "/tmp");
18 conf.set("spark.kubernetes.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly", "false");
19
20 // 4. 网络配置
21 conf.set("spark.kubernetes.driver.serviceAccountName", "spark");
22 conf.set("spark.kubernetes.executor.serviceAccountName", "spark");
23
24 // 5. 安全配置
25 conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark");
26 conf.set("spark.kubernetes.authenticate.executor.serviceAccountName", "spark");
27
28 System.out.println("Kubernetes deployment configured");
29 }
30
31 public void configureResourceQuotas() {
32 // 配置资源配额
33 String resourceQuota =
34 "apiVersion: v1\n" +
35 "kind: ResourceQuota\n" +
36 "metadata:\n" +
37 " name: spark-quota\n" +
38 " namespace: spark-jobs\n" +
39 "spec:\n" +
40 " hard:\n" +
41 " requests.cpu: \"20\"\n" +
42 " requests.memory: 40Gi\n" +
43 " limits.cpu: \"40\"\n" +
44 " limits.memory: 80Gi\n" +
45 " persistentvolumeclaims: \"10\"";
46
47 System.out.println("Resource quota configuration:");
48 System.out.println(resourceQuota);
49 }
50}
51
52// 监控和日志配置
53public class MonitoringConfiguration {
54 public void configureMonitoring(SparkConf conf) {
55 // 1. 指标收集
56 conf.set("spark.metrics.conf", "/opt/spark/conf/metrics.properties");
57 conf.set("spark.sql.streaming.metricsEnabled", "true");
58
59 // 2. 日志配置
60 conf.set("spark.eventLog.enabled", "true");
61 conf.set("spark.eventLog.dir", "hdfs://namenode:9000/spark-events");
62 conf.set("spark.history.fs.logDirectory", "hdfs://namenode:9000/spark-events");
63
64 // 3. 性能监控
65 conf.set("spark.sql.adaptive.enabled", "true");
66 conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true");
67 conf.set("spark.sql.adaptive.skewJoin.enabled", "true");
68
69 // 4. 动态资源分配
70 conf.set("spark.dynamicAllocation.enabled", "true");
71 conf.set("spark.dynamicAllocation.minExecutors", "2");
72 conf.set("spark.dynamicAllocation.maxExecutors", "20");
73 conf.set("spark.dynamicAllocation.initialExecutors", "5");
74
75 System.out.println("Monitoring and logging configured");
76 }
77}
部署模式特点适用场景
Local模式单机运行,用于开发和测试本地开发和调试
Standalone模式Spark自带的集群管理器小规模集群
YARN模式使用Hadoop YARN管理资源生产环境,与Hadoop集成
Mesos模式使用Apache Mesos管理资源大规模集群,多框架支持
Kubernetes模式使用K8s管理容器化部署云原生环境

7.2 配置示例

Spark配置示例
java
1public class SparkConfiguration {
2 public SparkSession createSparkSession() {
3 SparkConf conf = new SparkConf()
4 .setAppName("MySparkApp")
5 .setMaster("yarn")
6 .set("spark.executor.memory", "8g")
7 .set("spark.executor.cores", "4")
8 .set("spark.driver.memory", "4g")
9 .set("spark.sql.adaptive.enabled", "true")
10 .set("spark.sql.adaptive.coalescePartitions.enabled", "true")
11 .set("spark.sql.adaptive.skewJoin.enabled", "true");
12
13 return SparkSession.builder()
14 .config(conf)
15 .enableHiveSupport()
16 .getOrCreate();
17 }
18}
配置建议

根据集群资源和应用需求合理配置Spark参数,特别是内存和CPU配置,对性能影响很大。

8. 最佳实践

8.1 开发最佳实践

  1. 合理使用缓存:对重复使用的RDD进行缓存
  2. 避免shuffle:减少不必要的shuffle操作
  3. 使用广播变量:减少数据传输开销
  4. 合理分区:根据数据量设置合适的分区数
  5. 监控性能:使用Spark UI监控应用性能

8.2 常见问题解决

java
1// 解决内存溢出问题
2// 1. 增加执行器内存
3.set("spark.executor.memory", "16g")
4
5// 2. 使用磁盘存储
6.set("spark.storage.memoryFraction", "0.3")
7
8// 3. 减少分区数
9data.repartition(100)

9. 总结

Apache Spark是一个功能强大、性能优异的大数据处理引擎,它通过内存计算、统一平台和丰富的生态系统,为大数据处理提供了完整的解决方案。

学习建议

  1. 理解核心概念:深入理解RDD、DataFrame、Dataset等核心抽象
  2. 掌握编程模型:熟练使用转换操作和动作操作
  3. 学习高级特性:掌握Spark SQL、Streaming、MLlib等组件
  4. 性能优化:学习内存管理、分区策略等优化技术
  5. 实践项目:通过实际项目积累经验

Spark是大数据技术栈中的重要组成部分,掌握它将大大提升大数据处理能力。

参与讨论