本文是大数据系列第 75 篇,在基础 WordCount 的基础上增加文本预处理和数据库持久化,构建一条接近生产可用的词频统计流水线。
需求分析
基础 WordCount 直接分词统计,存在几个明显问题:
- 大小写不统一(
The和the被视为不同词) - 标点粘连(
word.和word被视为不同词) - 停用词干扰(
the、a、is等高频但无意义的词会污染统计结果) - 结果只能写文件,无法便捷查询
Super WordCount 针对以上问题逐一优化,最终将结果写入 MySQL。
处理流水线
输入文本
→ 小写转换(toLowerCase)
→ 切词(split 空白)
→ 去除标点(regex replace)
→ 过滤停用词(filter)
→ 词频统计(reduceByKey)
→ 排序(sortBy 降序)
→ 写入 MySQL(foreachPartition)
数据库准备
CREATE DATABASE spark_demo;
USE spark_demo;
CREATE TABLE wordcount (
id INT AUTO_INCREMENT PRIMARY KEY,
word VARCHAR(255) NOT NULL,
count INT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
基础版实现(SuperWordCount1)
package icu.wzk
import org.apache.spark.{SparkConf, SparkContext}
object SuperWordCount1 {
// 停用词列表
private val stopWords = Set(
"in", "on", "to", "from", "by", "a", "an", "the",
"is", "are", "were", "was", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will",
"would", "could", "should", "may", "might", "shall",
"of", "for", "with", "at", "as", "it", "its"
)
// 需要去除的标点字符(正则)
private val punctuation = "[\\)\\(\\.,;:!?'\"\\/\\\\\\-]"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SuperWordCount1")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val result = sc.textFile(args(0))
.flatMap(_.toLowerCase.split("\\s+")) // 小写 + 切词
.map(_.replaceAll(punctuation, "")) // 去标点
.filter(w => w.nonEmpty && !stopWords(w)) // 去停用词、空串
.map(w => (w, 1))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false) // 按词频降序
result.foreach { case (word, count) =>
println(s"$word\t$count")
}
sc.stop()
}
}
MySQL 写出版(SuperWordCount3)
依赖配置
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
逐条写入版(性能差,仅供对比)
result.foreach { case (word, count) =>
val conn = DriverManager.getConnection(url, user, password)
val stmt = conn.prepareStatement(
"INSERT INTO wordcount(word, count) VALUES(?, ?)"
)
stmt.setString(1, word)
stmt.setInt(2, count)
stmt.executeUpdate()
conn.close() // 每条记录开关一次连接,代价极高
}
分区批量写入版(推荐)
object SuperWordCount3 {
private val jdbcUrl = "jdbc:mysql://localhost:3306/spark_demo?useSSL=false"
private val jdbcUser = "root"
private val jdbcPass = "your_password"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SuperWordCount3")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val stopWords = Set("in", "on", "to", "from", "by", "a", "an", "the",
"is", "are", "were", "was", "of", "for", "with", "at", "as", "it")
val punctuation = "[\\)\\(\\.,;:!?'\"\\/\\\\\\-]"
val result = sc.textFile(args(0))
.flatMap(_.toLowerCase.split("\\s+"))
.map(_.replaceAll(punctuation, ""))
.filter(w => w.nonEmpty && !stopWords(w))
.map(w => (w, 1))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false)
// foreachPartition:每个分区只创建一次数据库连接
result.foreachPartition { iter =>
val conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPass)
conn.setAutoCommit(false) // 批量提交
val stmt = conn.prepareStatement(
"INSERT INTO wordcount(word, count) VALUES(?, ?)"
)
iter.foreach { case (word, count) =>
stmt.setString(1, word)
stmt.setInt(2, count)
stmt.addBatch()
}
stmt.executeBatch()
conn.commit()
conn.close()
}
sc.stop()
}
}
foreach vs foreachPartition 性能对比
| 维度 | foreach | foreachPartition |
|---|---|---|
| 连接创建次数 | 每条记录一次 | 每个分区一次 |
| 100 万条数据连接开销 | ~100 万次 | ~分区数(通常几十次) |
| 适用场景 | 本地调试、数据量极小 | 生产环境数据库写出 |
| 事务支持 | 逐条提交 | 可批量提交,性能更好 |
以 100 万条数据、10 个分区为例,foreachPartition 将连接创建次数从 100 万次降至 10 次,吞吐量提升可达数十倍。
提交运行
spark-submit \
--master local[*] \
--class icu.wzk.SuperWordCount3 \
--driver-memory 2g \
target/spark-wordcount-1.0-SNAPSHOT.jar \
/opt/wzk/input/article.txt
扩展方向
- 增量写入:改用
INSERT INTO ... ON DUPLICATE KEY UPDATE count = count + VALUES(count)支持追加统计 - 分布式连接池:引入 HikariCP,在 Executor 端复用连接
- 写出到 HBase/Redis:替换 JDBC 写出逻辑,适合高并发查询场景
- 结合 Spark SQL:用 DataFrame API 替代 RDD,一行代码写出到 MySQL:
df.write.jdbc(url, "wordcount", props)