本文是大数据系列第 83 篇,介绍 SparkSQL 的语句编写方式与 Hive 集成配置实践。

完整图文版(含截图):CSDN 原文 | 掘金

SparkSQL 简介

SparkSQL 是 Apache Spark 处理结构化与半结构化数据的核心模块,提供两种查询方式:

  • DataFrame API:命令式操作,类型安全,与 Scala/Java 无缝集成
  • SQL 字符串:声明式查询,与 HiveQL 90% 以上语法兼容

两者底层都经过 Catalyst 优化器处理,性能一致。


DataFrame API 操作

读取数据

// 读取 JSON 文件
val df = spark.read.json("people.json")
df.show()
df.printSchema()

// 读取 CSV 文件(带 header)
val df2 = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("employees.csv")

注册临时视图

DataFrame 可注册为临时视图,之后用 SQL 查询:

df.createOrReplaceTempView("people")
spark.sql("SELECT name, age FROM people WHERE age > 20").show()

支持的数据源格式

格式特点
Parquet列式存储,高压缩率,支持嵌套结构,Spark 默认格式
JSON轻量格式,支持嵌套,Web 友好
CSV通用平面格式,简单但不支持嵌套
Avro行式序列化,支持 Schema 演进
ORC高压缩,I/O 性能优化,Hive 友好
Hive Table与 Hive 元数据无缝集成,支持 HiveQL 语法

SQL 查询语法实战

基础查询

spark.sql("""
  SELECT name, age, salary
  FROM employees
  WHERE age > 25
  ORDER BY salary DESC
  LIMIT 10
""").show()

聚合与分组

spark.sql("""
  SELECT department,
         COUNT(*) AS headcount,
         AVG(salary) AS avg_sal,
         MAX(salary) AS max_sal
  FROM employees
  GROUP BY department
  HAVING AVG(salary) > 5000
""").show()

窗口函数

spark.sql("""
  SELECT name, department, salary,
         RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rk
  FROM employees
""").show()

高级功能:lateral view explode

lateral view explode 是 SparkSQL 中处理数组/Map 类型列的利器,类似 SQL 中的行展开。

场景示例

有如下数据,每行的 tags 字段是逗号分隔的标签列表:

1 1,2,3
2 2,3
3 1,2

实现代码:

case class Info(id: String, tags: String)

val arr = Array("1 1,2,3", "2 2,3", "3 1,2")
val rdd = sc.makeRDD(arr).map { line =>
  val fields = line.split("\\s+")
  Info(fields(0), fields(1))
}

val df = rdd.toDF()
df.createOrReplaceTempView("t1")

spark.sql(
  """SELECT id, tag
    |FROM t1
    |LATERAL VIEW explode(split(tags, ',')) t2 AS tag
  """.stripMargin
).show()

输出结果:

+---+---+
| id|tag|
+---+---+
|  1|  1|
|  1|  2|
|  1|  3|
|  2|  2|
|  2|  3|
|  3|  1|
|  3|  2|
+---+---+

Hive 集成

依赖配置

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-hive_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

启用 Hive 支持

val spark = SparkSession.builder()
  .appName("SparkHiveDemo")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
  .enableHiveSupport()   // 关键配置:启用 Hive 元数据支持
  .getOrCreate()

读写 Hive 表

// 读取 Hive 表
spark.sql("SELECT * FROM ods.user_behavior LIMIT 100").show()

// 写入 Hive 表(追加)
resultDf.write.mode("append").saveAsTable("dwd.user_behavior_clean")

// 分区写入
resultDf.write
  .partitionBy("dt")
  .mode("overwrite")
  .saveAsTable("dwd.user_behavior_partitioned")

SparkSQL vs HiveQL 性能对比

SparkSQL 通过内存计算将延迟从 HiveQL 的分钟级降低到毫秒到秒级:

指标SparkSQLHiveQL(MapReduce)
内存计算支持不支持
延迟毫秒~秒分钟级
语法兼容性90%+ HiveQL 兼容原生 HiveQL
UDF 支持支持 Hive UDF支持

小结

SparkSQL 的三种使用方式在底层执行上是等价的,选择取决于开发场景:

  • DataFrame API:适合在 Scala/Java 程序中进行类型安全的数据处理
  • SQL 字符串:适合数据分析师或需要快速验证逻辑的场景
  • Hive 集成:适合在已有 Hive 数据仓库的企业中使用 Spark 加速查询

对于大多数企业级数据仓库场景,推荐使用 enableHiveSupport() 模式,这样可以复用已有的 Hive 元数据,并通过 SparkSQL 获得数量级的性能提升。