This is article 75 in the Big Data series, on top of basic WordCount add text preprocessing and database persistence, build a near-production word frequency pipeline.

Requirements Analysis

Basic WordCount directly tokenizes and counts, has several obvious problems:

  • Case inconsistency (The and the treated as different words)
  • Punctuation attached (word. and word treated as different words)
  • Stop word interference (the, a, is etc high frequency but meaningless words pollute results)
  • Results can only be written to file, cannot query conveniently

Super WordCount addresses above problems one by one, finally writes results to MySQL.

Processing Pipeline

Input text
  → Lowercase conversion (toLowerCase)
  → Tokenize (split whitespace)
  → Remove punctuation (regex replace)
  → Filter stop words (filter)
  → Word frequency count (reduceByKey)
  → Sort (sortBy descending)
  → Write to MySQL (foreachPartition)

Database Preparation

CREATE DATABASE spark_demo;
USE spark_demo;

CREATE TABLE wordcount (
  id      INT AUTO_INCREMENT PRIMARY KEY,
  word    VARCHAR(255) NOT NULL,
  count   INT NOT NULL,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Basic Version Implementation (SuperWordCount1)

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}

object SuperWordCount1 {

  // Stop words list
  private val stopWords = Set(
    "in", "on", "to", "from", "by", "a", "an", "the",
    "is", "are", "were", "was", "be", "been", "being",
    "have", "has", "had", "do", "does", "did", "will",
    "would", "could", "should", "may", "might", "shall",
    "of", "for", "with", "at", "as", "it", "its"
  )

  // Punctuation characters to remove (regex)
  private val punctuation = "[\\)\\(.,;:!?'\"\\/\\\\\\-]"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SuperWordCount1")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val result = sc.textFile(args(0))
      .flatMap(_.toLowerCase.split("\\s+"))       // Lowercase + tokenize
      .map(_.replaceAll(punctuation, ""))          // Remove punctuation
      .filter(w => w.nonEmpty && !stopWords(w))  // Remove stop words, empty strings
      .map(w => (w, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)          // Sort by frequency descending

    result.foreach { case (word, count) =>
      println(s"$word\t$count")
    }

    sc.stop()
  }
}

MySQL Write Version (SuperWordCount3)

Dependency Configuration

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
</dependency>

Row-by-Row Write Version (Poor Performance, Comparison Only)

result.foreach { case (word, count) =>
  val conn = DriverManager.getConnection(url, user, password)
  val stmt = conn.prepareStatement(
    "INSERT INTO wordcount(word, count) VALUES(?, ?)"
  )
  stmt.setString(1, word)
  stmt.setInt(2, count)
  stmt.executeUpdate()
  conn.close()  // Open/close connection for each record, extremely high cost
}
object SuperWordCount3 {

  private val jdbcUrl  = "jdbc:mysql://localhost:3306/spark_demo?useSSL=false"
  private val jdbcUser = "root"
  private val jdbcPass = "your_password"

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SuperWordCount3")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val stopWords = Set("in", "on", "to", "from", "by", "a", "an", "the",
      "is", "are", "were", "was", "of", "for", "with", "at", "as", "it")
    val punctuation = "[\\)\\(.,;:!?'\"\\/\\\\\\-]"

    val result = sc.textFile(args(0))
      .flatMap(_.toLowerCase.split("\\s+"))
      .map(_.replaceAll(punctuation, ""))
      .filter(w => w.nonEmpty && !stopWords(w))
      .map(w => (w, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)

    // foreachPartition: create database connection only once per partition
    result.foreachPartition { iter =>
      val conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPass)
      conn.setAutoCommit(false)  // Batch commit
      val stmt = conn.prepareStatement(
        "INSERT INTO wordcount(word, count) VALUES(?, ?)"
      )
      iter.foreach { case (word, count) =>
        stmt.setString(1, word)
        stmt.setInt(2, count)
        stmt.addBatch()
      }
      stmt.executeBatch()
      conn.commit()
      conn.close()
    }

    sc.stop()
  }
}

foreach vs foreachPartition Performance Comparison

DimensionforeachforeachPartition
Connection creation countOnce per recordOnce per partition
1 million records connection overhead~1 million~partition count (typically tens)
Applicable scenarioLocal debugging, extremely small dataProduction database write
Transaction supportCommit per rowBatch commit, better performance

Taking 1 million records, 10 partitions as example, foreachPartition reduces connection creation from 1 million to 10, throughput improvement can reach dozens of times.

Submit and Run

spark-submit \
  --master local[*] \
  --class icu.wzk.SuperWordCount3 \
  --driver-memory 2g \
  target/spark-wordcount-1.0-SNAPSHOT.jar \
  /opt/wzk/input/article.txt

Extension Directions

  • Incremental write: Use INSERT INTO ... ON DUPLICATE KEY UPDATE count = count + VALUES(count) for追加 statistics
  • Distributed connection pool: Introduce HikariCP, reuse connections on Executor side
  • Write to HBase/Redis: Replace JDBC write logic, suitable for high concurrency query scenarios
  • Combine with Spark SQL: Use DataFrame API instead of RDD, one-line code to write to MySQL: df.write.jdbc(url, "wordcount", props)