本文是大数据系列第 70 篇,全面讲解 Spark RDD 的三种创建方式与常用 Transformation 算子实战。

完整图文版(含截图):CSDN 原文 | 掘金

SparkContext:入口对象

SparkContext 是 Spark 应用程序的核心组件,所有 RDD 创建操作都从它开始。它负责:

  • 与集群管理器(Standalone/YARN)建立连接
  • 创建 RDD、广播变量、累加器
  • 提交 Job 并协调执行
import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf()
  .setMaster("spark://h121.wzk.icu:7077")
  .setAppName("RDD Demo")
val sc = new SparkContext(conf)

三种 RDD 创建方式

方式一:从集合创建(parallelize / makeRDD)

适合测试和小规模数据,直接将内存中的 Scala 集合并行化为 RDD:

// parallelize:基础方法
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5), numSlices = 3)

// makeRDD:parallelize 的封装,支持指定分区位置偏好
val rdd2 = sc.makeRDD(Seq("hello", "world", "spark"))

// 验证分区数量
println(rdd1.getNumPartitions)  // 3

numSlices 控制分区数量,默认值为 sc.defaultParallelism(通常等于 CPU 核心总数)。

方式二:从文件系统创建(textFile)

生产环境最常用的方式,支持多种存储后端:

// 读取本地文件
val localRDD = sc.textFile("file:///opt/data/words.txt")

// 读取 HDFS 文件
val hdfsRDD = sc.textFile("hdfs://h121.wzk.icu:8020/data/input/")

// 读取目录(自动合并目录下所有文件)
val dirRDD = sc.textFile("hdfs:///data/logs/2024/")

// 指定最小分区数(实际分区数取 max(minPartitions, HDFS Block 数))
val rdd = sc.textFile("hdfs:///data/big-file.csv", minPartitions = 10)

textFile 默认按行读取,每行是一个字符串元素。读取整个文件(文件名→内容)用 wholeTextFiles

方式三:从已有 RDD 转换

通过 Transformation 操作将一个 RDD 转换为另一个 RDD,是构建计算流水线的主要方式(见下节)。

Transformation 算子详解

Transformation 是惰性的——调用时不触发计算,只记录转换逻辑。只有遇到 Action 操作(collectcountsaveAsTextFile 等)时,Spark 才会真正执行整个 DAG。

基础转换

map —— 一对一映射,对每个元素应用函数:

val nums = sc.parallelize(1 to 5)
val doubled = nums.map(_ * 2)
// 结果:2, 4, 6, 8, 10

filter —— 按条件过滤元素:

val evens = nums.filter(_ % 2 == 0)
// 结果:2, 4

flatMap —— 一对多展开,先 map 再 flatten:

val lines = sc.parallelize(Seq("hello world", "spark rdd"))
val words = lines.flatMap(_.split(" "))
// 结果:hello, world, spark, rdd

分区级操作

mapPartitions —— 以分区为单位处理,减少函数调用开销(适合需要建立数据库连接等场景):

val result = rdd.mapPartitions { iter =>
  // iter 是当前分区所有元素的迭代器
  iter.map(x => process(x))
}

mapPartitionsWithIndex —— 同上,额外传入分区索引:

rdd.mapPartitionsWithIndex { (partIdx, iter) =>
  iter.map(x => s"Partition $partIdx: $x")
}

聚合与重组

groupBy —— 按函数结果分组(产生 Shuffle):

val words = sc.parallelize(Seq("apple", "banana", "avocado", "blueberry"))
val grouped = words.groupBy(_.charAt(0))
// 结果:('a', [apple, avocado]), ('b', [banana, blueberry])

distinct —— 去重(产生 Shuffle):

val deduped = sc.parallelize(Seq(1, 2, 2, 3, 3, 3)).distinct()
// 结果:1, 2, 3

sortBy —— 按指定函数排序(产生 Shuffle):

val sorted = sc.parallelize(Seq(3, 1, 4, 1, 5, 9)).sortBy(x => x)
// 结果:1, 1, 3, 4, 5, 9

repartition / coalesce —— 调整分区数量:

// repartition:增减分区均可,总是触发 Shuffle
val rdd10 = rdd.repartition(10)

// coalesce:主要用于减少分区,默认不触发 Shuffle(合并相邻分区)
val rdd3 = rdd10.coalesce(3)

触发执行:Action 操作

Transformation 构建计算图,Action 触发执行并返回结果:

val wordCounts = sc.textFile("hdfs:///input/")
  .flatMap(_.split("\\s+"))
  .map(w => (w, 1))
  .reduceByKey(_ + _)     // Transformation,产生 Shuffle

// 以下是 Action,触发实际计算
wordCounts.collect()           // 收集所有结果到 Driver
wordCounts.count()             // 统计元素数量
wordCounts.take(10)            // 取前 10 条
wordCounts.saveAsTextFile("hdfs:///output/")  // 写出到文件

性能小结

算子是否 Shuffle建议使用场景
map / filter / flatMap否(窄依赖)元素级转换,优先选择
mapPartitions需要复用连接/资源的批量处理
groupByKey尽量用 reduceByKey 替代
reduceByKey是(本地预聚合)键值对聚合的首选
repartition增加分区数,提高并行度
coalesce否(默认)减少分区数,避免小文件

理解各算子是否产生 Shuffle 是 Spark 性能调优的核心,宽依赖(Shuffle)操作应尽量减少或用等价的窄依赖替代。