本文是大数据系列第 70 篇,全面讲解 Spark RDD 的三种创建方式与常用 Transformation 算子实战。
SparkContext:入口对象
SparkContext 是 Spark 应用程序的核心组件,所有 RDD 创建操作都从它开始。它负责:
- 与集群管理器(Standalone/YARN)建立连接
- 创建 RDD、广播变量、累加器
- 提交 Job 并协调执行
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf()
.setMaster("spark://h121.wzk.icu:7077")
.setAppName("RDD Demo")
val sc = new SparkContext(conf)
三种 RDD 创建方式
方式一:从集合创建(parallelize / makeRDD)
适合测试和小规模数据,直接将内存中的 Scala 集合并行化为 RDD:
// parallelize:基础方法
val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5), numSlices = 3)
// makeRDD:parallelize 的封装,支持指定分区位置偏好
val rdd2 = sc.makeRDD(Seq("hello", "world", "spark"))
// 验证分区数量
println(rdd1.getNumPartitions) // 3
numSlices 控制分区数量,默认值为 sc.defaultParallelism(通常等于 CPU 核心总数)。
方式二:从文件系统创建(textFile)
生产环境最常用的方式,支持多种存储后端:
// 读取本地文件
val localRDD = sc.textFile("file:///opt/data/words.txt")
// 读取 HDFS 文件
val hdfsRDD = sc.textFile("hdfs://h121.wzk.icu:8020/data/input/")
// 读取目录(自动合并目录下所有文件)
val dirRDD = sc.textFile("hdfs:///data/logs/2024/")
// 指定最小分区数(实际分区数取 max(minPartitions, HDFS Block 数))
val rdd = sc.textFile("hdfs:///data/big-file.csv", minPartitions = 10)
textFile 默认按行读取,每行是一个字符串元素。读取整个文件(文件名→内容)用 wholeTextFiles。
方式三:从已有 RDD 转换
通过 Transformation 操作将一个 RDD 转换为另一个 RDD,是构建计算流水线的主要方式(见下节)。
Transformation 算子详解
Transformation 是惰性的——调用时不触发计算,只记录转换逻辑。只有遇到 Action 操作(collect、count、saveAsTextFile 等)时,Spark 才会真正执行整个 DAG。
基础转换
map —— 一对一映射,对每个元素应用函数:
val nums = sc.parallelize(1 to 5)
val doubled = nums.map(_ * 2)
// 结果:2, 4, 6, 8, 10
filter —— 按条件过滤元素:
val evens = nums.filter(_ % 2 == 0)
// 结果:2, 4
flatMap —— 一对多展开,先 map 再 flatten:
val lines = sc.parallelize(Seq("hello world", "spark rdd"))
val words = lines.flatMap(_.split(" "))
// 结果:hello, world, spark, rdd
分区级操作
mapPartitions —— 以分区为单位处理,减少函数调用开销(适合需要建立数据库连接等场景):
val result = rdd.mapPartitions { iter =>
// iter 是当前分区所有元素的迭代器
iter.map(x => process(x))
}
mapPartitionsWithIndex —— 同上,额外传入分区索引:
rdd.mapPartitionsWithIndex { (partIdx, iter) =>
iter.map(x => s"Partition $partIdx: $x")
}
聚合与重组
groupBy —— 按函数结果分组(产生 Shuffle):
val words = sc.parallelize(Seq("apple", "banana", "avocado", "blueberry"))
val grouped = words.groupBy(_.charAt(0))
// 结果:('a', [apple, avocado]), ('b', [banana, blueberry])
distinct —— 去重(产生 Shuffle):
val deduped = sc.parallelize(Seq(1, 2, 2, 3, 3, 3)).distinct()
// 结果:1, 2, 3
sortBy —— 按指定函数排序(产生 Shuffle):
val sorted = sc.parallelize(Seq(3, 1, 4, 1, 5, 9)).sortBy(x => x)
// 结果:1, 1, 3, 4, 5, 9
repartition / coalesce —— 调整分区数量:
// repartition:增减分区均可,总是触发 Shuffle
val rdd10 = rdd.repartition(10)
// coalesce:主要用于减少分区,默认不触发 Shuffle(合并相邻分区)
val rdd3 = rdd10.coalesce(3)
触发执行:Action 操作
Transformation 构建计算图,Action 触发执行并返回结果:
val wordCounts = sc.textFile("hdfs:///input/")
.flatMap(_.split("\\s+"))
.map(w => (w, 1))
.reduceByKey(_ + _) // Transformation,产生 Shuffle
// 以下是 Action,触发实际计算
wordCounts.collect() // 收集所有结果到 Driver
wordCounts.count() // 统计元素数量
wordCounts.take(10) // 取前 10 条
wordCounts.saveAsTextFile("hdfs:///output/") // 写出到文件
性能小结
| 算子 | 是否 Shuffle | 建议使用场景 |
|---|---|---|
map / filter / flatMap | 否(窄依赖) | 元素级转换,优先选择 |
mapPartitions | 否 | 需要复用连接/资源的批量处理 |
groupByKey | 是 | 尽量用 reduceByKey 替代 |
reduceByKey | 是(本地预聚合) | 键值对聚合的首选 |
repartition | 是 | 增加分区数,提高并行度 |
coalesce | 否(默认) | 减少分区数,避免小文件 |
理解各算子是否产生 Shuffle 是 Spark 性能调优的核心,宽依赖(Shuffle)操作应尽量减少或用等价的窄依赖替代。