本文是大数据系列第 73 篇,通过大数据界的 “Hello World”——WordCount,掌握 Spark RDD 的核心处理思路,并给出 Scala 和 Java 两个完整实现。

完整图文版(含截图):CSDN 原文 | 掘金

为什么从 WordCount 开始

WordCount 是分布式计算的入门经典,不只是因为简单,更因为它完整地体现了 Spark “分而治之” 的核心思想:

  1. 数据加载:从文件系统读取原始文本
  2. 切分:将行切分为单词(flatMap)
  3. 映射:每个单词映射为 (word, 1) 键值对
  4. 聚合:按 key 汇总计数(reduceByKey)
  5. 输出:写出到文件或打印到控制台

这五步与 Hadoop MapReduce 的 Map/Shuffle/Reduce 一一对应,但代码量大幅减少。

Maven 项目配置

<properties>
    <scala.version>2.12.10</scala.version>
    <spark.version>2.4.5</spark.version>
</properties>

<dependencies>
    <!-- Spark Core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Scala 标准库 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
</dependencies>

Scala 实现

package icu.wzk

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    // 1. 创建 SparkConf 和 SparkContext
    val conf = new SparkConf().setAppName("ScalaWordCount")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    // 2. 读取输入文件,生成 RDD[String](每个元素是一行)
    val lines: RDD[String] = sc.textFile(args(0))

    // 3. 按空白字符切分,生成 RDD[String](每个元素是一个单词)
    val words: RDD[String] = lines.flatMap(line => line.split("\\s+"))

    // 4. 映射为 (word, 1) 键值对
    val wordMap: RDD[(String, Int)] = words.map(x => (x, 1))

    // 5. 按 key 聚合
    val result: RDD[(String, Int)] = wordMap.reduceByKey(_ + _)

    // 6. 触发 Action,输出结果
    result.foreach(println)

    sc.stop()
  }
}

Scala 版本简洁直观,reduceByKey(_ + _) 利用函数式语法省去了大量样板代码。

Java 实现

package icu.wzk;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;

public class JavaWordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("JavaWordCount")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");

        // 读取文本文件
        JavaRDD<String> lines = sc.textFile(args[0]);

        // 切分单词
        JavaRDD<String> words = lines
                .flatMap(line -> Arrays.stream(line.split("\\s+")).iterator());

        // 映射为 (word, 1)
        JavaPairRDD<String, Integer> wordsMap = words
                .mapToPair(word -> new Tuple2<>(word, 1));

        // 按 key 聚合
        JavaPairRDD<String, Integer> results =
                wordsMap.reduceByKey((x, y) -> x + y);

        results.foreach(elem -> System.out.println(elem));
        sc.stop();
    }
}

Java 版本使用 JavaSparkContextJavaRDDJavaPairRDD 等 Java 友好 API,逻辑一致但代码更冗长。

编译与提交

打包

mvn clean package -DskipTests

生成 target/spark-wordcount-1.0-SNAPSHOT.jar

本地模式运行(测试)

spark-submit \
  --master local[*] \
  --class icu.wzk.WordCount \
  target/spark-wordcount-1.0-SNAPSHOT.jar \
  /opt/wzk/input.txt

提交到 YARN 集群

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --executor-memory 2g \
  --num-executors 3 \
  --class icu.wzk.WordCount \
  target/spark-wordcount-1.0-SNAPSHOT.jar \
  hdfs://namenode:9000/input/data.txt

关键概念回顾

概念说明
RDD弹性分布式数据集,Spark 核心抽象,支持容错和并行计算
惰性求值Transformation 只记录操作描述,Action 触发真正执行
ShufflereduceByKey 等算子会触发跨节点数据重分布,是性能瓶颈点
DAGSpark 将作业表示为有向无环图,Stage 以 Shuffle 为边界划分

实际应用场景

WordCount 的模式可直接迁移到:

  • 日志分析:统计 ERROR/WARN 出现频次
  • 关键词统计:电商评论热词提取
  • 文本特征提取:TF-IDF 的词频计算步骤
  • 推荐系统:用户行为序列中 item 出现频率统计