本文是大数据系列第 74 篇,通过两个经典 Spark 案例——蒙特卡洛求 π 和共同好友分析,进一步熟悉 RDD 的分布式计算模式和算子组合技巧。

完整图文版(含截图):CSDN 原文 | 掘金

案例一:蒙特卡洛方法估算 π

数学原理

在边长为 1 的正方形中随机投点,单位圆内的点与总点数之比趋近于 π/4:

圆面积 / 正方形面积 = π·r² / (2r)² = π/4

因此 π ≈ 4 × (落在圆内的点数 / 总点数)。样本越多,结果越精确。

Spark 实现

package icu.wzk

import org.apache.spark.{SparkConf, SparkContext}
import scala.math.random

object SparkPi {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("ScalaSparkPi")
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    // args(0) 传入分区数,默认 15
    val slices = if (args.length > 0) args(0).toInt else 15
    val N = 100000000  // 1 亿次采样

    val count = sc.makeRDD(1 to N, slices)
      .map { _ =>
        val (x, y) = (random, random)
        if (x * x + y * y <= 1) 1 else 0
      }
      .reduce(_ + _)

    println(s"Pi is approximately ${4.0 * count / N}")
    sc.stop()
  }
}

关键点说明

  • sc.makeRDD(1 to N, slices):将 1 亿个整数均匀分配到 slices 个分区,每个 Executor 并行处理一部分
  • map:每个任务独立生成随机点并判断是否在圆内(无依赖,天然适合并行)
  • reduce(_ + _):聚合各分区的命中计数
  • 分区数越多,并行度越高,但调度开销也随之增加,通常设为 CPU 核数的 2-3 倍

运行方式

# 本地模式,15 个分区
spark-submit --master local[*] \
  --class icu.wzk.SparkPi \
  spark-wordcount-1.0-SNAPSHOT.jar 15

输出示例:Pi is approximately 3.14159268

案例二:共同好友分析

问题描述

给定社交网络的好友关系数据(每行格式:用户ID 好友1,好友2,好友3,...),找出任意两个用户之间的共同好友列表

示例数据:

100 200,300,400,500
200 100,300,600
300 100,200,400

方法一:笛卡尔积(直觉解法)

对所有用户两两配对,取好友列表的交集:

val friendsRDD = sc.textFile(input).map { line =>
  val parts = line.split("\\s+")
  (parts(0).toInt, parts(1).split(",").map(_.toInt).toSet)
}

friendsRDD.cartesian(friendsRDD)
  .filter { case ((id1, _), (id2, _)) => id1 < id2 }  // 去重
  .map { case ((id1, f1), (id2, f2)) =>
    (s"$id1 & $id2", f1.intersect(f2))
  }
  .collect()

缺点cartesian 产生 O(n²) 的数据量,n 较大时性能极差,不适合生产环境。

方法二:数据变换(推荐)

核心思路:反转视角——不是”两个人的好友有哪些交集”,而是”对于每对朋友,他们的共同认识者是谁”。

val friendsRDD = sc.textFile(input).map { line =>
  val parts = line.split("\\s+")
  val user = parts(0)
  val friends = parts(1).split(",")
  (user, friends)
}

friendsRDD
  // 为每个用户的好友列表生成所有两两组合
  .flatMapValues(friends => friends.combinations(2).toSeq)
  // 反转:(用户, 好友对) -> (好友对, 用户)
  .map { case (user, pair) => (pair.mkString(" & "), Set(user)) }
  // 按好友对合并,得到共同认识该对好友的用户集合
  .reduceByKey(_ | _)
  .collect()

优势

  • 避免了笛卡尔积,时间复杂度从 O(n²) 降为 O(n·k²)(k 为平均好友数)
  • 利用 reduceByKey 代替 groupByKey,减少 Shuffle 数据量
  • 代码更简洁,易于扩展到多跳关系分析

结果示例

(100 & 200, Set(300))        // 100 和 200 的共同好友是 300
(100 & 300, Set(200, 400))   // 100 和 300 的共同好友是 200、400
(200 & 300, Set(100))        // 200 和 300 的共同好友是 100

两个案例的共同思路

特性Pi 估算共同好友
并行化方式数据分区独立计算flatMap 展开后 reduceByKey
关键算子makeRDD + map + reduceflatMapValues + map + reduceByKey
性能关键分区数(并行度)避免 cartesian,用 reduceByKey
数学本质大数定律(随机采样)集合运算(交集/并集)

两个案例都体现了 Spark 的核心范式:将问题拆解为独立的局部计算,再通过聚合得到全局结果