This is article 75 in the Big Data series, on top of basic WordCount add text preprocessing and database persistence, build a near-production word frequency pipeline.
Requirements Analysis
Basic WordCount directly tokenizes and counts, has several obvious problems:
- Case inconsistency (
Theandthetreated as different words) - Punctuation attached (
word.andwordtreated as different words) - Stop word interference (
the,a,isetc high frequency but meaningless words pollute results) - Results can only be written to file, cannot query conveniently
Super WordCount addresses above problems one by one, finally writes results to MySQL.
Processing Pipeline
Input text
→ Lowercase conversion (toLowerCase)
→ Tokenize (split whitespace)
→ Remove punctuation (regex replace)
→ Filter stop words (filter)
→ Word frequency count (reduceByKey)
→ Sort (sortBy descending)
→ Write to MySQL (foreachPartition)
Database Preparation
CREATE DATABASE spark_demo;
USE spark_demo;
CREATE TABLE wordcount (
id INT AUTO_INCREMENT PRIMARY KEY,
word VARCHAR(255) NOT NULL,
count INT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Basic Version Implementation (SuperWordCount1)
package icu.wzk
import org.apache.spark.{SparkConf, SparkContext}
object SuperWordCount1 {
// Stop words list
private val stopWords = Set(
"in", "on", "to", "from", "by", "a", "an", "the",
"is", "are", "were", "was", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will",
"would", "could", "should", "may", "might", "shall",
"of", "for", "with", "at", "as", "it", "its"
)
// Punctuation characters to remove (regex)
private val punctuation = "[\\)\\(.,;:!?'\"\\/\\\\\\-]"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SuperWordCount1")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val result = sc.textFile(args(0))
.flatMap(_.toLowerCase.split("\\s+")) // Lowercase + tokenize
.map(_.replaceAll(punctuation, "")) // Remove punctuation
.filter(w => w.nonEmpty && !stopWords(w)) // Remove stop words, empty strings
.map(w => (w, 1))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false) // Sort by frequency descending
result.foreach { case (word, count) =>
println(s"$word\t$count")
}
sc.stop()
}
}
MySQL Write Version (SuperWordCount3)
Dependency Configuration
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
Row-by-Row Write Version (Poor Performance, Comparison Only)
result.foreach { case (word, count) =>
val conn = DriverManager.getConnection(url, user, password)
val stmt = conn.prepareStatement(
"INSERT INTO wordcount(word, count) VALUES(?, ?)"
)
stmt.setString(1, word)
stmt.setInt(2, count)
stmt.executeUpdate()
conn.close() // Open/close connection for each record, extremely high cost
}
Partition Batch Write Version (Recommended)
object SuperWordCount3 {
private val jdbcUrl = "jdbc:mysql://localhost:3306/spark_demo?useSSL=false"
private val jdbcUser = "root"
private val jdbcPass = "your_password"
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SuperWordCount3")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val stopWords = Set("in", "on", "to", "from", "by", "a", "an", "the",
"is", "are", "were", "was", "of", "for", "with", "at", "as", "it")
val punctuation = "[\\)\\(.,;:!?'\"\\/\\\\\\-]"
val result = sc.textFile(args(0))
.flatMap(_.toLowerCase.split("\\s+"))
.map(_.replaceAll(punctuation, ""))
.filter(w => w.nonEmpty && !stopWords(w))
.map(w => (w, 1))
.reduceByKey(_ + _)
.sortBy(_._2, ascending = false)
// foreachPartition: create database connection only once per partition
result.foreachPartition { iter =>
val conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPass)
conn.setAutoCommit(false) // Batch commit
val stmt = conn.prepareStatement(
"INSERT INTO wordcount(word, count) VALUES(?, ?)"
)
iter.foreach { case (word, count) =>
stmt.setString(1, word)
stmt.setInt(2, count)
stmt.addBatch()
}
stmt.executeBatch()
conn.commit()
conn.close()
}
sc.stop()
}
}
foreach vs foreachPartition Performance Comparison
| Dimension | foreach | foreachPartition |
|---|---|---|
| Connection creation count | Once per record | Once per partition |
| 1 million records connection overhead | ~1 million | ~partition count (typically tens) |
| Applicable scenario | Local debugging, extremely small data | Production database write |
| Transaction support | Commit per row | Batch commit, better performance |
Taking 1 million records, 10 partitions as example, foreachPartition reduces connection creation from 1 million to 10, throughput improvement can reach dozens of times.
Submit and Run
spark-submit \
--master local[*] \
--class icu.wzk.SuperWordCount3 \
--driver-memory 2g \
target/spark-wordcount-1.0-SNAPSHOT.jar \
/opt/wzk/input/article.txt
Extension Directions
- Incremental write: Use
INSERT INTO ... ON DUPLICATE KEY UPDATE count = count + VALUES(count)for追加 statistics - Distributed connection pool: Introduce HikariCP, reuse connections on Executor side
- Write to HBase/Redis: Replace JDBC write logic, suitable for high concurrency query scenarios
- Combine with Spark SQL: Use DataFrame API instead of RDD, one-line code to write to MySQL:
df.write.jdbc(url, "wordcount", props)