本文是大数据系列第 83 篇,介绍 SparkSQL 的语句编写方式与 Hive 集成配置实践。
SparkSQL 简介
SparkSQL 是 Apache Spark 处理结构化与半结构化数据的核心模块,提供两种查询方式:
- DataFrame API:命令式操作,类型安全,与 Scala/Java 无缝集成
- SQL 字符串:声明式查询,与 HiveQL 90% 以上语法兼容
两者底层都经过 Catalyst 优化器处理,性能一致。
DataFrame API 操作
读取数据
// 读取 JSON 文件
val df = spark.read.json("people.json")
df.show()
df.printSchema()
// 读取 CSV 文件(带 header)
val df2 = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("employees.csv")
注册临时视图
DataFrame 可注册为临时视图,之后用 SQL 查询:
df.createOrReplaceTempView("people")
spark.sql("SELECT name, age FROM people WHERE age > 20").show()
支持的数据源格式
| 格式 | 特点 |
|---|---|
| Parquet | 列式存储,高压缩率,支持嵌套结构,Spark 默认格式 |
| JSON | 轻量格式,支持嵌套,Web 友好 |
| CSV | 通用平面格式,简单但不支持嵌套 |
| Avro | 行式序列化,支持 Schema 演进 |
| ORC | 高压缩,I/O 性能优化,Hive 友好 |
| Hive Table | 与 Hive 元数据无缝集成,支持 HiveQL 语法 |
SQL 查询语法实战
基础查询
spark.sql("""
SELECT name, age, salary
FROM employees
WHERE age > 25
ORDER BY salary DESC
LIMIT 10
""").show()
聚合与分组
spark.sql("""
SELECT department,
COUNT(*) AS headcount,
AVG(salary) AS avg_sal,
MAX(salary) AS max_sal
FROM employees
GROUP BY department
HAVING AVG(salary) > 5000
""").show()
窗口函数
spark.sql("""
SELECT name, department, salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rk
FROM employees
""").show()
高级功能:lateral view explode
lateral view explode 是 SparkSQL 中处理数组/Map 类型列的利器,类似 SQL 中的行展开。
场景示例
有如下数据,每行的 tags 字段是逗号分隔的标签列表:
1 1,2,3
2 2,3
3 1,2
实现代码:
case class Info(id: String, tags: String)
val arr = Array("1 1,2,3", "2 2,3", "3 1,2")
val rdd = sc.makeRDD(arr).map { line =>
val fields = line.split("\\s+")
Info(fields(0), fields(1))
}
val df = rdd.toDF()
df.createOrReplaceTempView("t1")
spark.sql(
"""SELECT id, tag
|FROM t1
|LATERAL VIEW explode(split(tags, ',')) t2 AS tag
""".stripMargin
).show()
输出结果:
+---+---+
| id|tag|
+---+---+
| 1| 1|
| 1| 2|
| 1| 3|
| 2| 2|
| 2| 3|
| 3| 1|
| 3| 2|
+---+---+
Hive 集成
依赖配置
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
启用 Hive 支持
val spark = SparkSession.builder()
.appName("SparkHiveDemo")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.enableHiveSupport() // 关键配置:启用 Hive 元数据支持
.getOrCreate()
读写 Hive 表
// 读取 Hive 表
spark.sql("SELECT * FROM ods.user_behavior LIMIT 100").show()
// 写入 Hive 表(追加)
resultDf.write.mode("append").saveAsTable("dwd.user_behavior_clean")
// 分区写入
resultDf.write
.partitionBy("dt")
.mode("overwrite")
.saveAsTable("dwd.user_behavior_partitioned")
SparkSQL vs HiveQL 性能对比
SparkSQL 通过内存计算将延迟从 HiveQL 的分钟级降低到毫秒到秒级:
| 指标 | SparkSQL | HiveQL(MapReduce) |
|---|---|---|
| 内存计算 | 支持 | 不支持 |
| 延迟 | 毫秒~秒 | 分钟级 |
| 语法兼容性 | 90%+ HiveQL 兼容 | 原生 HiveQL |
| UDF 支持 | 支持 Hive UDF | 支持 |
小结
SparkSQL 的三种使用方式在底层执行上是等价的,选择取决于开发场景:
- DataFrame API:适合在 Scala/Java 程序中进行类型安全的数据处理
- SQL 字符串:适合数据分析师或需要快速验证逻辑的场景
- Hive 集成:适合在已有 Hive 数据仓库的企业中使用 Spark 加速查询
对于大多数企业级数据仓库场景,推荐使用 enableHiveSupport() 模式,这样可以复用已有的 Hive 元数据,并通过 SparkSQL 获得数量级的性能提升。