本文是大数据系列第 81 篇,全面介绍 Spark 三大核心数据抽象的特性、使用场景与相互转换。

完整图文版(含截图):CSDN 原文 | 掘金

SparkSession:统一入口

Spark 2.0 之前,开发者需要分别使用多个入口:

  • SQLContext:DataFrame 创建与 SQL 执行的入口
  • HiveContext:继承自 SQLContext,支持 HiveQL 语法

Spark 2.0 之后SparkSession 将上述入口统一,同时持有 SparkContext 的引用:

val spark = SparkSession.builder()
  .appName("Demo")
  .master("local[*]")
  .getOrCreate()

// 通过 SparkSession 获取 SparkContext
val sc = spark.sparkContext

RDD(Resilient Distributed Dataset)

RDD 是 Spark 最底层的数据抽象,具有以下核心特性:

不可变性

map()filter() 等操作会生成新的 RDD,而非修改原有数据,这简化了容错机制并支持安全的并行处理。

容错机制

  • 血缘追踪(Lineage):记录变换序列,可从任意祖先 RDD 重新计算
  • Checkpoint:可将 RDD 持久化到稳定存储,截断血缘链
  • 分区恢复:数据分布在集群节点,支持局部恢复

懒执行(Lazy Evaluation)

  • Transformation(如 mapfilterreduceByKey):仅记录逻辑,不触发计算
  • Action(如 countcollectsaveAsTextFile):触发实际计算

创建方式

// 从集合创建
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))

// 从外部存储创建
val rdd = sc.textFile("hdfs://path/to/file")

// 从已有 RDD 转换
val newRdd = rdd.map(x => x * 2)

适用场景

  • 非结构化数据处理
  • 迭代式机器学习算法
  • ETL 管道、图计算(GraphX)

DataFrame

DataFrame 是带有命名列的分布式数据集,类似关系型数据库的二维表,支持多种数据源(JSON、CSV、Parquet、JDBC 等)。

Catalyst 优化器

DataFrame 最大的优势在于自动查询优化,Catalyst 优化器会执行:

  • 谓词下推(Predicate Pushdown)
  • 列裁剪(Column Pruning)
  • 常量折叠(Constant Folding)
  • Join 重排序

API 示例

// SQL 风格
df.select("name", "salary").filter(df("salary") > 5000)

// DSL 操作
df.groupBy("department").agg(Map("salary" -> "avg"))

注意事项

DataFrame 只有运行时类型检查,缺乏编译时类型安全。Dataset[Row] 是 DataFrame 的本质。


Dataset

Dataset 在 Spark 1.6 引入,将 RDD 的强类型与 DataFrame 的优化能力合二为一。

核心优势

  • 编译时类型安全:类型错误在编译阶段暴露,而非运行时
  • 优化执行:复用 Catalyst 和 Tungsten 引擎
  • 统一 API:同时支持 RDD 风格的 map/filter 与 SQL 风格操作

创建示例

// 从 Range 创建
val numDS = spark.range(5, 100, 5)
numDS.orderBy(desc("id")).show(5)

// 从 case class 集合创建
case class Person(name: String, age: Int, height: Int)
val seq = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144))
val ds = spark.createDataset(seq)
ds.printSchema()

DataFrame 是 Dataset 的特殊形式:Dataset[Row],不要人为割裂二者。


三者相互转换

目标方法
RDDDataFrametoDF()createDataFrame(rdd, schema)
DataFrameRDD.rdd 属性
DataFrameDataset.as[T](T 为 case class)
DatasetDataFrame.toDF()
RDDDataset.toDS()(需类型匹配)
DatasetRDD.rdd 属性

RDD 转 DataFrame 示例

val arr = Array(("Alice", 30, 170), ("Bob", 25, 175))
val rdd = sc.makeRDD(arr).map(f => Row(f._1, f._2, f._3))

val schema = StructType(Seq(
  StructField("name", StringType, false),
  StructField("age", IntegerType, false),
  StructField("height", IntegerType, false)
))

val df = spark.createDataFrame(rdd, schema)
df.show()

横向对比总结

维度RDDDataFrameDataset
类型安全运行时运行时编译时
查询优化CatalystCatalyst
序列化效率Java/KyroTungstenTungsten
适用场景非结构化数据、图计算结构化查询、ETL类型敏感的结构化处理
  • RDD:适合非结构化数据和迭代计算,但缺乏优化智能
  • DataFrame:结构化查询性能最优,但牺牲了类型安全
  • Dataset:编译时类型检查 + 运行时查询优化,是类型意识开发者的最佳选择