本文是大数据系列第 80 篇,全面介绍 SparkSQL 的设计理念、核心组件和实战用法。

完整图文版(含截图):CSDN 原文 | 掘金

从 Hive 到 SparkSQL 的演进

早期 Hive 将 SQL 翻译为 MapReduce 执行,每个查询都需要冷启动 MR Job,延迟高、迭代慢。Shark(Spark 0.x 时代)通过替换 Hive 的执行引擎来加速查询,保持了 HiveQL 兼容性。

2014 年 Spark 放弃了 Shark,分化为两个方向:

  • Hive on Spark:让 Hive 将执行引擎替换为 Spark,仍由 Hive 主导
  • Spark SQL:Spark 主导的结构化数据处理模块,提供全新 API 体系

Spark SQL 成为主流,相比 Hive 性能提升 10-100x,来源于内存优化(4-5x 内存效率)、Catalyst 查询优化器和代码生成技术。

核心数据抽象

DataFrame

DataFrame 是带 Schema 信息的分布式 Row 集合,等同于 RDD[Row] + Schema。Schema 描述每列的字段名和类型,使引擎能进行列剪裁、谓词下推等优化。

支持的数据类型:

  • 基础类型:String、Boolean、Integer、Long、Double、Decimal、Date、Timestamp
  • 复杂类型:Array、Map、Struct(支持嵌套)
val df = spark.read.json("hdfs://data/users.json")
df.printSchema()
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

df.select("name", "age")
  .filter($"age" > 18)
  .groupBy("age")
  .count()
  .show()

Dataset

Spark 1.6 引入,在 DataFrame 基础上增加编译期类型检查,避免运行时类型错误。Spark 2.0 将两者统一:DataFrame = Dataset[Row]

case class User(name: String, age: Long)

val ds: Dataset[User] = df.as[User]
ds.filter(_.age > 18).map(_.name).show()

Catalyst 优化器

Catalyst 是 SparkSQL 的查询优化引擎,处理流程分四步:

SQL/DataFrame API


① 解析(Parsing)→ 未解析逻辑计划(Unresolved Logical Plan)


② 分析(Analysis)→ 解析逻辑计划(绑定列名、类型推断)


③ 优化(Optimization)→ 优化后的逻辑计划
      │  · 谓词下推(Predicate Pushdown)
      │  · 列裁剪(Column Pruning)
      │  · 常量折叠(Constant Folding)
      │  · 分区裁剪(Partition Pruning)

④ 物理计划生成 → 选择最优执行策略(sort merge join / broadcast join 等)

SQL 查询示例

SparkSQL 支持完整 ANSI SQL,包括窗口函数、子查询等高级特性:

spark.sql("""
  SELECT
    department,
    name,
    salary,
    RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rank
  FROM employees
  WHERE salary > 50000
""").show()

也可以混用 DataFrame API 和 SQL:

val employeesDF = spark.read.parquet("hdfs://data/employees")
employeesDF.createOrReplaceTempView("employees")

// 之后可以用 spark.sql() 查询这张临时视图
spark.sql("SELECT department, AVG(salary) FROM employees GROUP BY department").show()

多数据源集成

SparkSQL 原生支持多种数据源的统一读写:

// Parquet(推荐格式,列式存储)
val df1 = spark.read.parquet("hdfs://data/events")

// JSON
val df2 = spark.read.json("hdfs://data/users.json")

// CSV
val df3 = spark.read.option("header", "true").csv("hdfs://data/orders.csv")

// JDBC(MySQL、PostgreSQL 等)
val df4 = spark.read.format("jdbc")
  .option("url", "jdbc:mysql://localhost:3306/mydb")
  .option("dbtable", "orders")
  .option("user", "root")
  .option("password", "password")
  .load()

// 写出到 Parquet,按日期分区
df4.write.partitionBy("date").parquet("hdfs://output/orders")

SparkSQL vs 传统 Hive 对比

维度HiveSparkSQL
执行引擎MapReduceSpark(内存计算)
查询延迟分钟级秒级(小查询毫秒级)
迭代计算低效高效(内存缓存中间结果)
APIHiveQLSQL + DataFrame + Dataset
适用场景离线大批量 ETL交互式查询、实时流处理

SparkSQL 已成为大数据栈中结构化处理的首选方案,结合 Spark Streaming 或 Structured Streaming 还可统一批流处理逻辑。