本文是大数据系列第 73 篇,通过大数据界的 “Hello World”——WordCount,掌握 Spark RDD 的核心处理思路,并给出 Scala 和 Java 两个完整实现。
为什么从 WordCount 开始
WordCount 是分布式计算的入门经典,不只是因为简单,更因为它完整地体现了 Spark “分而治之” 的核心思想:
- 数据加载:从文件系统读取原始文本
- 切分:将行切分为单词(flatMap)
- 映射:每个单词映射为
(word, 1)键值对 - 聚合:按 key 汇总计数(reduceByKey)
- 输出:写出到文件或打印到控制台
这五步与 Hadoop MapReduce 的 Map/Shuffle/Reduce 一一对应,但代码量大幅减少。
Maven 项目配置
<properties>
<scala.version>2.12.10</scala.version>
<spark.version>2.4.5</spark.version>
</properties>
<dependencies>
<!-- Spark Core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Scala 标准库 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>
Scala 实现
package icu.wzk
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
// 1. 创建 SparkConf 和 SparkContext
val conf = new SparkConf().setAppName("ScalaWordCount")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// 2. 读取输入文件,生成 RDD[String](每个元素是一行)
val lines: RDD[String] = sc.textFile(args(0))
// 3. 按空白字符切分,生成 RDD[String](每个元素是一个单词)
val words: RDD[String] = lines.flatMap(line => line.split("\\s+"))
// 4. 映射为 (word, 1) 键值对
val wordMap: RDD[(String, Int)] = words.map(x => (x, 1))
// 5. 按 key 聚合
val result: RDD[(String, Int)] = wordMap.reduceByKey(_ + _)
// 6. 触发 Action,输出结果
result.foreach(println)
sc.stop()
}
}
Scala 版本简洁直观,reduceByKey(_ + _) 利用函数式语法省去了大量样板代码。
Java 实现
package icu.wzk;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class JavaWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("JavaWordCount")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("WARN");
// 读取文本文件
JavaRDD<String> lines = sc.textFile(args[0]);
// 切分单词
JavaRDD<String> words = lines
.flatMap(line -> Arrays.stream(line.split("\\s+")).iterator());
// 映射为 (word, 1)
JavaPairRDD<String, Integer> wordsMap = words
.mapToPair(word -> new Tuple2<>(word, 1));
// 按 key 聚合
JavaPairRDD<String, Integer> results =
wordsMap.reduceByKey((x, y) -> x + y);
results.foreach(elem -> System.out.println(elem));
sc.stop();
}
}
Java 版本使用 JavaSparkContext、JavaRDD、JavaPairRDD 等 Java 友好 API,逻辑一致但代码更冗长。
编译与提交
打包
mvn clean package -DskipTests
生成 target/spark-wordcount-1.0-SNAPSHOT.jar。
本地模式运行(测试)
spark-submit \
--master local[*] \
--class icu.wzk.WordCount \
target/spark-wordcount-1.0-SNAPSHOT.jar \
/opt/wzk/input.txt
提交到 YARN 集群
spark-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 2g \
--num-executors 3 \
--class icu.wzk.WordCount \
target/spark-wordcount-1.0-SNAPSHOT.jar \
hdfs://namenode:9000/input/data.txt
关键概念回顾
| 概念 | 说明 |
|---|---|
| RDD | 弹性分布式数据集,Spark 核心抽象,支持容错和并行计算 |
| 惰性求值 | Transformation 只记录操作描述,Action 触发真正执行 |
| Shuffle | reduceByKey 等算子会触发跨节点数据重分布,是性能瓶颈点 |
| DAG | Spark 将作业表示为有向无环图,Stage 以 Shuffle 为边界划分 |
实际应用场景
WordCount 的模式可直接迁移到:
- 日志分析:统计 ERROR/WARN 出现频次
- 关键词统计:电商评论热词提取
- 文本特征提取:TF-IDF 的词频计算步骤
- 推荐系统:用户行为序列中 item 出现频率统计