本文是大数据系列第 75 篇,在基础 WordCount 的基础上增加文本预处理和数据库持久化,构建一条接近生产可用的词频统计流水线。

完整图文版(含截图):CSDN 原文 | 掘金

需求分析

基础 WordCount 直接分词统计,存在几个明显问题:

  • 大小写不统一(Thethe 被视为不同词)
  • 标点粘连(word.word 被视为不同词)
  • 停用词干扰(theais 等高频但无意义的词会污染统计结果)
  • 结果只能写文件,无法便捷查询

Super WordCount 针对以上问题逐一优化,最终将结果写入 MySQL。

处理流水线

输入文本
  → 小写转换(toLowerCase)
  → 切词(split 空白)
  → 去除标点(regex replace)
  → 过滤停用词(filter)
  → 词频统计(reduceByKey)
  → 排序(sortBy 降序)
  → 写入 MySQL(foreachPartition)

数据库准备

CREATE DATABASE spark_demo;
USE spark_demo;

CREATE TABLE wordcount (
  id      INT AUTO_INCREMENT PRIMARY KEY,
  word    VARCHAR(255) NOT NULL,
  count   INT NOT NULL,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

基础版实现(SuperWordCount1)

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

object SuperWordCount1 {

  // 停用词列表
  private val stopWords = Set(
    "in", "on", "to", "from", "by", "a", "an", "the",
    "is", "are", "were", "was", "be", "been", "being",
    "have", "has", "had", "do", "does", "did", "will",
    "would", "could", "should", "may", "might", "shall",
    "of", "for", "with", "at", "as", "it", "its"
  )

  // 需要去除的标点字符(正则)
  private val punctuation = "[\\)\\(\\.,;:!?'\"\\/\\\\\\-]"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SuperWordCount1")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val result = sc.textFile(args(0))
      .flatMap(_.toLowerCase.split("\\s+"))       // 小写 + 切词
      .map(_.replaceAll(punctuation, ""))          // 去标点
      .filter(w => w.nonEmpty && !stopWords(w))   // 去停用词、空串
      .map(w => (w, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)             // 按词频降序

    result.foreach { case (word, count) =>
      println(s"$word\t$count")
    }

    sc.stop()
  }
}

MySQL 写出版(SuperWordCount3)

依赖配置

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>

逐条写入版(性能差,仅供对比)

result.foreach { case (word, count) =>
  val conn = DriverManager.getConnection(url, user, password)
  val stmt = conn.prepareStatement(
    "INSERT INTO wordcount(word, count) VALUES(?, ?)"
  )
  stmt.setString(1, word)
  stmt.setInt(2, count)
  stmt.executeUpdate()
  conn.close()  // 每条记录开关一次连接,代价极高
}

分区批量写入版(推荐)

object SuperWordCount3 {

  private val jdbcUrl  = "jdbc:mysql://localhost:3306/spark_demo?useSSL=false"
  private val jdbcUser = "root"
  private val jdbcPass = "your_password"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SuperWordCount3")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val stopWords = Set("in", "on", "to", "from", "by", "a", "an", "the",
      "is", "are", "were", "was", "of", "for", "with", "at", "as", "it")
    val punctuation = "[\\)\\(\\.,;:!?'\"\\/\\\\\\-]"

    val result = sc.textFile(args(0))
      .flatMap(_.toLowerCase.split("\\s+"))
      .map(_.replaceAll(punctuation, ""))
      .filter(w => w.nonEmpty && !stopWords(w))
      .map(w => (w, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)

    // foreachPartition:每个分区只创建一次数据库连接
    result.foreachPartition { iter =>
      val conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPass)
      conn.setAutoCommit(false)  // 批量提交
      val stmt = conn.prepareStatement(
        "INSERT INTO wordcount(word, count) VALUES(?, ?)"
      )
      iter.foreach { case (word, count) =>
        stmt.setString(1, word)
        stmt.setInt(2, count)
        stmt.addBatch()
      }
      stmt.executeBatch()
      conn.commit()
      conn.close()
    }

    sc.stop()
  }
}

foreach vs foreachPartition 性能对比

维度foreachforeachPartition
连接创建次数每条记录一次每个分区一次
100 万条数据连接开销~100 万次~分区数(通常几十次)
适用场景本地调试、数据量极小生产环境数据库写出
事务支持逐条提交可批量提交,性能更好

以 100 万条数据、10 个分区为例,foreachPartition 将连接创建次数从 100 万次降至 10 次,吞吐量提升可达数十倍。

提交运行

spark-submit \
  --master local[*] \
  --class icu.wzk.SuperWordCount3 \
  --driver-memory 2g \
  target/spark-wordcount-1.0-SNAPSHOT.jar \
  /opt/wzk/input/article.txt

扩展方向

  • 增量写入:改用 INSERT INTO ... ON DUPLICATE KEY UPDATE count = count + VALUES(count) 支持追加统计
  • 分布式连接池:引入 HikariCP,在 Executor 端复用连接
  • 写出到 HBase/Redis:替换 JDBC 写出逻辑,适合高并发查询场景
  • 结合 Spark SQL:用 DataFrame API 替代 RDD,一行代码写出到 MySQL:df.write.jdbc(url, "wordcount", props)