本文是大数据系列第 80 篇,全面介绍 SparkSQL 的设计理念、核心组件和实战用法。
从 Hive 到 SparkSQL 的演进
早期 Hive 将 SQL 翻译为 MapReduce 执行,每个查询都需要冷启动 MR Job,延迟高、迭代慢。Shark(Spark 0.x 时代)通过替换 Hive 的执行引擎来加速查询,保持了 HiveQL 兼容性。
2014 年 Spark 放弃了 Shark,分化为两个方向:
- Hive on Spark:让 Hive 将执行引擎替换为 Spark,仍由 Hive 主导
- Spark SQL:Spark 主导的结构化数据处理模块,提供全新 API 体系
Spark SQL 成为主流,相比 Hive 性能提升 10-100x,来源于内存优化(4-5x 内存效率)、Catalyst 查询优化器和代码生成技术。
核心数据抽象
DataFrame
DataFrame 是带 Schema 信息的分布式 Row 集合,等同于 RDD[Row] + Schema。Schema 描述每列的字段名和类型,使引擎能进行列剪裁、谓词下推等优化。
支持的数据类型:
- 基础类型:String、Boolean、Integer、Long、Double、Decimal、Date、Timestamp
- 复杂类型:Array、Map、Struct(支持嵌套)
val df = spark.read.json("hdfs://data/users.json")
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
df.select("name", "age")
.filter($"age" > 18)
.groupBy("age")
.count()
.show()
Dataset
Spark 1.6 引入,在 DataFrame 基础上增加编译期类型检查,避免运行时类型错误。Spark 2.0 将两者统一:DataFrame = Dataset[Row]。
case class User(name: String, age: Long)
val ds: Dataset[User] = df.as[User]
ds.filter(_.age > 18).map(_.name).show()
Catalyst 优化器
Catalyst 是 SparkSQL 的查询优化引擎,处理流程分四步:
SQL/DataFrame API
│
▼
① 解析(Parsing)→ 未解析逻辑计划(Unresolved Logical Plan)
│
▼
② 分析(Analysis)→ 解析逻辑计划(绑定列名、类型推断)
│
▼
③ 优化(Optimization)→ 优化后的逻辑计划
│ · 谓词下推(Predicate Pushdown)
│ · 列裁剪(Column Pruning)
│ · 常量折叠(Constant Folding)
│ · 分区裁剪(Partition Pruning)
▼
④ 物理计划生成 → 选择最优执行策略(sort merge join / broadcast join 等)
SQL 查询示例
SparkSQL 支持完整 ANSI SQL,包括窗口函数、子查询等高级特性:
spark.sql("""
SELECT
department,
name,
salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rank
FROM employees
WHERE salary > 50000
""").show()
也可以混用 DataFrame API 和 SQL:
val employeesDF = spark.read.parquet("hdfs://data/employees")
employeesDF.createOrReplaceTempView("employees")
// 之后可以用 spark.sql() 查询这张临时视图
spark.sql("SELECT department, AVG(salary) FROM employees GROUP BY department").show()
多数据源集成
SparkSQL 原生支持多种数据源的统一读写:
// Parquet(推荐格式,列式存储)
val df1 = spark.read.parquet("hdfs://data/events")
// JSON
val df2 = spark.read.json("hdfs://data/users.json")
// CSV
val df3 = spark.read.option("header", "true").csv("hdfs://data/orders.csv")
// JDBC(MySQL、PostgreSQL 等)
val df4 = spark.read.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mydb")
.option("dbtable", "orders")
.option("user", "root")
.option("password", "password")
.load()
// 写出到 Parquet,按日期分区
df4.write.partitionBy("date").parquet("hdfs://output/orders")
SparkSQL vs 传统 Hive 对比
| 维度 | Hive | SparkSQL |
|---|---|---|
| 执行引擎 | MapReduce | Spark(内存计算) |
| 查询延迟 | 分钟级 | 秒级(小查询毫秒级) |
| 迭代计算 | 低效 | 高效(内存缓存中间结果) |
| API | HiveQL | SQL + DataFrame + Dataset |
| 适用场景 | 离线大批量 ETL | 交互式查询、实时流处理 |
SparkSQL 已成为大数据栈中结构化处理的首选方案,结合 Spark Streaming 或 Structured Streaming 还可统一批流处理逻辑。