本文是大数据系列第 74 篇,通过两个经典 Spark 案例——蒙特卡洛求 π 和共同好友分析,进一步熟悉 RDD 的分布式计算模式和算子组合技巧。
案例一:蒙特卡洛方法估算 π
数学原理
在边长为 1 的正方形中随机投点,单位圆内的点与总点数之比趋近于 π/4:
圆面积 / 正方形面积 = π·r² / (2r)² = π/4
因此 π ≈ 4 × (落在圆内的点数 / 总点数)。样本越多,结果越精确。
Spark 实现
package icu.wzk
import org.apache.spark.{SparkConf, SparkContext}
import scala.math.random
object SparkPi {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("ScalaSparkPi")
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// args(0) 传入分区数,默认 15
val slices = if (args.length > 0) args(0).toInt else 15
val N = 100000000 // 1 亿次采样
val count = sc.makeRDD(1 to N, slices)
.map { _ =>
val (x, y) = (random, random)
if (x * x + y * y <= 1) 1 else 0
}
.reduce(_ + _)
println(s"Pi is approximately ${4.0 * count / N}")
sc.stop()
}
}
关键点说明
sc.makeRDD(1 to N, slices):将 1 亿个整数均匀分配到slices个分区,每个 Executor 并行处理一部分map:每个任务独立生成随机点并判断是否在圆内(无依赖,天然适合并行)reduce(_ + _):聚合各分区的命中计数- 分区数越多,并行度越高,但调度开销也随之增加,通常设为 CPU 核数的 2-3 倍
运行方式
# 本地模式,15 个分区
spark-submit --master local[*] \
--class icu.wzk.SparkPi \
spark-wordcount-1.0-SNAPSHOT.jar 15
输出示例:Pi is approximately 3.14159268
案例二:共同好友分析
问题描述
给定社交网络的好友关系数据(每行格式:用户ID 好友1,好友2,好友3,...),找出任意两个用户之间的共同好友列表。
示例数据:
100 200,300,400,500
200 100,300,600
300 100,200,400
方法一:笛卡尔积(直觉解法)
对所有用户两两配对,取好友列表的交集:
val friendsRDD = sc.textFile(input).map { line =>
val parts = line.split("\\s+")
(parts(0).toInt, parts(1).split(",").map(_.toInt).toSet)
}
friendsRDD.cartesian(friendsRDD)
.filter { case ((id1, _), (id2, _)) => id1 < id2 } // 去重
.map { case ((id1, f1), (id2, f2)) =>
(s"$id1 & $id2", f1.intersect(f2))
}
.collect()
缺点:cartesian 产生 O(n²) 的数据量,n 较大时性能极差,不适合生产环境。
方法二:数据变换(推荐)
核心思路:反转视角——不是”两个人的好友有哪些交集”,而是”对于每对朋友,他们的共同认识者是谁”。
val friendsRDD = sc.textFile(input).map { line =>
val parts = line.split("\\s+")
val user = parts(0)
val friends = parts(1).split(",")
(user, friends)
}
friendsRDD
// 为每个用户的好友列表生成所有两两组合
.flatMapValues(friends => friends.combinations(2).toSeq)
// 反转:(用户, 好友对) -> (好友对, 用户)
.map { case (user, pair) => (pair.mkString(" & "), Set(user)) }
// 按好友对合并,得到共同认识该对好友的用户集合
.reduceByKey(_ | _)
.collect()
优势:
- 避免了笛卡尔积,时间复杂度从 O(n²) 降为 O(n·k²)(k 为平均好友数)
- 利用
reduceByKey代替groupByKey,减少 Shuffle 数据量 - 代码更简洁,易于扩展到多跳关系分析
结果示例
(100 & 200, Set(300)) // 100 和 200 的共同好友是 300
(100 & 300, Set(200, 400)) // 100 和 300 的共同好友是 200、400
(200 & 300, Set(100)) // 200 和 300 的共同好友是 100
两个案例的共同思路
| 特性 | Pi 估算 | 共同好友 |
|---|---|---|
| 并行化方式 | 数据分区独立计算 | flatMap 展开后 reduceByKey |
| 关键算子 | makeRDD + map + reduce | flatMapValues + map + reduceByKey |
| 性能关键 | 分区数(并行度) | 避免 cartesian,用 reduceByKey |
| 数学本质 | 大数定律(随机采样) | 集合运算(交集/并集) |
两个案例都体现了 Spark 的核心范式:将问题拆解为独立的局部计算,再通过聚合得到全局结果。