本文是大数据系列第 81 篇,全面介绍 Spark 三大核心数据抽象的特性、使用场景与相互转换。
SparkSession:统一入口
Spark 2.0 之前,开发者需要分别使用多个入口:
SQLContext:DataFrame 创建与 SQL 执行的入口HiveContext:继承自 SQLContext,支持 HiveQL 语法
Spark 2.0 之后,SparkSession 将上述入口统一,同时持有 SparkContext 的引用:
val spark = SparkSession.builder()
.appName("Demo")
.master("local[*]")
.getOrCreate()
// 通过 SparkSession 获取 SparkContext
val sc = spark.sparkContext
RDD(Resilient Distributed Dataset)
RDD 是 Spark 最底层的数据抽象,具有以下核心特性:
不可变性
map()、filter() 等操作会生成新的 RDD,而非修改原有数据,这简化了容错机制并支持安全的并行处理。
容错机制
- 血缘追踪(Lineage):记录变换序列,可从任意祖先 RDD 重新计算
- Checkpoint:可将 RDD 持久化到稳定存储,截断血缘链
- 分区恢复:数据分布在集群节点,支持局部恢复
懒执行(Lazy Evaluation)
- Transformation(如
map、filter、reduceByKey):仅记录逻辑,不触发计算 - Action(如
count、collect、saveAsTextFile):触发实际计算
创建方式
// 从集合创建
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
// 从外部存储创建
val rdd = sc.textFile("hdfs://path/to/file")
// 从已有 RDD 转换
val newRdd = rdd.map(x => x * 2)
适用场景
- 非结构化数据处理
- 迭代式机器学习算法
- ETL 管道、图计算(GraphX)
DataFrame
DataFrame 是带有命名列的分布式数据集,类似关系型数据库的二维表,支持多种数据源(JSON、CSV、Parquet、JDBC 等)。
Catalyst 优化器
DataFrame 最大的优势在于自动查询优化,Catalyst 优化器会执行:
- 谓词下推(Predicate Pushdown)
- 列裁剪(Column Pruning)
- 常量折叠(Constant Folding)
- Join 重排序
API 示例
// SQL 风格
df.select("name", "salary").filter(df("salary") > 5000)
// DSL 操作
df.groupBy("department").agg(Map("salary" -> "avg"))
注意事项
DataFrame 只有运行时类型检查,缺乏编译时类型安全。Dataset[Row] 是 DataFrame 的本质。
Dataset
Dataset 在 Spark 1.6 引入,将 RDD 的强类型与 DataFrame 的优化能力合二为一。
核心优势
- 编译时类型安全:类型错误在编译阶段暴露,而非运行时
- 优化执行:复用 Catalyst 和 Tungsten 引擎
- 统一 API:同时支持 RDD 风格的
map/filter与 SQL 风格操作
创建示例
// 从 Range 创建
val numDS = spark.range(5, 100, 5)
numDS.orderBy(desc("id")).show(5)
// 从 case class 集合创建
case class Person(name: String, age: Int, height: Int)
val seq = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144))
val ds = spark.createDataset(seq)
ds.printSchema()
DataFrame 是 Dataset 的特殊形式:
Dataset[Row],不要人为割裂二者。
三者相互转换
| 源 | 目标 | 方法 |
|---|---|---|
| RDD | DataFrame | toDF() 或 createDataFrame(rdd, schema) |
| DataFrame | RDD | .rdd 属性 |
| DataFrame | Dataset | .as[T](T 为 case class) |
| Dataset | DataFrame | .toDF() |
| RDD | Dataset | .toDS()(需类型匹配) |
| Dataset | RDD | .rdd 属性 |
RDD 转 DataFrame 示例
val arr = Array(("Alice", 30, 170), ("Bob", 25, 175))
val rdd = sc.makeRDD(arr).map(f => Row(f._1, f._2, f._3))
val schema = StructType(Seq(
StructField("name", StringType, false),
StructField("age", IntegerType, false),
StructField("height", IntegerType, false)
))
val df = spark.createDataFrame(rdd, schema)
df.show()
横向对比总结
| 维度 | RDD | DataFrame | Dataset |
|---|---|---|---|
| 类型安全 | 运行时 | 运行时 | 编译时 |
| 查询优化 | 无 | Catalyst | Catalyst |
| 序列化效率 | Java/Kyro | Tungsten | Tungsten |
| 适用场景 | 非结构化数据、图计算 | 结构化查询、ETL | 类型敏感的结构化处理 |
- RDD:适合非结构化数据和迭代计算,但缺乏优化智能
- DataFrame:结构化查询性能最优,但牺牲了类型安全
- Dataset:编译时类型检查 + 运行时查询优化,是类型意识开发者的最佳选择