This is article 83 in the Big Data series, introducing SparkSQL statement writing and Hive integration configuration practices.
SparkSQL Overview
SparkSQL is Apache Spark’s core module for processing structured and semi-structured data, providing two query methods:
- DataFrame API: Imperative operations with type safety, seamless integration with Scala/Java
- SQL Strings: Declarative queries, over 90% compatible with HiveQL syntax
Both are processed through the Catalyst optimizer with consistent performance.
DataFrame API Operations
Reading Data
// Read JSON file
val df = spark.read.json("people.json")
df.show()
df.printSchema()
// Read CSV file (with header)
val df2 = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("employees.csv")
Register Temporary Views
DataFrames can be registered as temporary views for SQL queries:
df.createOrReplaceTempView("people")
spark.sql("SELECT name, age FROM people WHERE age > 20").show()
Supported Data Source Formats
| Format | Characteristics |
|---|---|
| Parquet | Columnar storage, high compression, nested structures, Spark default |
| JSON | Lightweight format, nested support, Web-friendly |
| CSV | Common flat format, simple but no nested support |
| Avro | Row-based serialization, schema evolution |
| ORC | High compression, I/O optimization, Hive-compatible |
| Hive Table | Seamless Hive metadata integration, HiveQL syntax support |
SQL Query Syntax Practice
Basic Queries
spark.sql("""
SELECT name, age, salary
FROM employees
WHERE age > 25
ORDER BY salary DESC
LIMIT 10
""").show()
Aggregation and Grouping
spark.sql("""
SELECT department,
COUNT(*) AS headcount,
AVG(salary) AS avg_sal,
MAX(salary) AS max_sal
FROM employees
GROUP BY department
HAVING AVG(salary) > 5000
""").show()
Window Functions
spark.sql("""
SELECT name, department, salary,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rk
FROM employees
""").show()
Advanced Feature: lateral view explode
lateral view explode is a powerful tool in SparkSQL for handling array/Map type columns, similar to row expansion in SQL.
Scenario Example
Data as follows, each row’s tags field is a comma-separated tag list:
1 1,2,3
2 2,3
3 1,2
Implementation:
case class Info(id: String, tags: String)
val arr = Array("1 1,2,3", "2 2,3", "3 1,2")
val rdd = sc.makeRDD(arr).map { line =>
val fields = line.split("\\s+")
Info(fields(0), fields(1))
}
val df = rdd.toDF()
df.createOrReplaceTempView("t1")
spark.sql(
"""SELECT id, tag
|FROM t1
|LATERAL VIEW explode(split(tags, ',')) t2 AS tag
""".stripMargin
).show()
Output:
+---+---+
| id|tag|
+---+---+
| 1| 1|
| 1| 2|
| 1| 3|
| 2| 2|
| 2| 3|
| 3| 1|
| 3| 2|
+---+---+
Hive Integration
Dependency Configuration
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
Enable Hive Support
val spark = SparkSession.builder()
.appName("SparkHiveDemo")
.master("local[*]")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.enableHiveSupport() // Key: Enable Hive metadata support
.getOrCreate()
Read/Write Hive Tables
// Read Hive table
spark.sql("SELECT * FROM ods.user_behavior LIMIT 100").show()
// Write to Hive table (append)
resultDf.write.mode("append").saveAsTable("dwd.user_behavior_clean")
// Partitioned write
resultDf.write
.partitionBy("dt")
.mode("overwrite")
.saveAsTable("dwd.user_behavior_partitioned")
SparkSQL vs HiveQL Performance Comparison
SparkSQL reduces latency from minute-level (HiveQL) to millisecond-second level through in-memory computing:
| Metric | SparkSQL | HiveQL (MapReduce) |
|---|---|---|
| In-memory computing | Supported | Not supported |
| Latency | Millisecond~Second | Minute-level |
| Syntax compatibility | 90%+ HiveQL compatible | Native HiveQL |
| UDF support | Supports Hive UDF | Supported |
Summary
SparkSQL’s three usage methods are equivalent at the execution level—the choice depends on development scenario:
- DataFrame API: Suitable for type-safe data processing in Scala/Java programs
- SQL Strings: Suitable for data analysts or scenarios needing quick logic verification
- Hive Integration: Suitable for enterprises with existing Hive data warehouses using Spark to accelerate queries
For most enterprise data warehouse scenarios, using enableHiveSupport() mode is recommended—it reuses existing Hive metadata and provides orders of magnitude performance improvement through SparkSQL.