本文是大数据系列第 86 篇,介绍 Spark Streaming 三种核心数据源:文件流、Socket 流与 RDD 队列流的原理、使用场景与代码实现。
数据源概览
Spark Streaming 支持多种数据接入方式,按可靠性和生产适用性可分为:
| 数据源 | 适用场景 | 可靠性 |
|---|---|---|
| 文件流 | 日志轮转、批量文件入流 | 高(无 Receiver) |
| Socket 流 | 原型验证、本地测试 | 低 |
| RDD 队列流 | 单元测试、压测模拟 | 测试专用 |
| Kafka Direct | 生产环境实时流 | 极高(Exactly-Once) |
Maven 依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
文件流(File Data Stream)
文件流通过持续监控指定目录中的新文件来触发计算,支持 HDFS、S3、本地文件系统等 Hadoop 兼容存储。
特点:
- 无需 Receiver,不占用额外 CPU 核心
- 只处理新增文件,已有文件和文件修改不触发
- 不支持嵌套目录,目录内文件格式须一致
package icu.wzk
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("FileDStream")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
// 监控目录,每 5 秒检查一次新文件
val lines = ssc.textFileStream("hdfs://h121.wzk.icu:8020/streaming/input/")
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
StreamingContext(conf, Seconds(5)) 设置微批间隔为 5 秒,textFileStream() 监控目录并自动处理新增文件。
Socket 数据流
Socket 流通过 TCP 连接接收指定主机和端口的文本数据,适合本地快速验证逻辑。
启动数据发送端:
nc -lk 9999
object SocketDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("SocketStream")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("0.0.0.0", 9999)
val words = lines.flatMap(_.split("\\s+"))
val wordCount = words.map(x => (x.trim, 1)).reduceByKey(_ + _)
wordCount.print()
ssc.start()
ssc.awaitTermination()
}
}
批次间隔设为 1 秒,每秒输出一次词频统计。Socket 流简单易用但无容错保证,不推荐用于生产。
RDD 队列流
RDD 队列流允许手动将 RDD 放入队列,Spark Streaming 将其作为数据流处理,是编写测试用例和模拟真实流量的最佳方式。
import scala.collection.mutable.Queue
object RDDQueueDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val sparkConf = new SparkConf()
.setAppName("RDDQueueStream")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val mappedStream = queueStream.map(r => (r % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()
// 每 2 秒生产一批 RDD,共 5 批
for (i <- 1 to 5) {
rddQueue.synchronized {
val range = (1 to 100).map(_ * i)
rddQueue += ssc.sparkContext.makeRDD(range, 2)
}
Thread.sleep(2000)
}
ssc.stop()
}
}
关键点:
rddQueue.synchronized保证线程安全- 对整数取模 10 后统计分组,验证聚合逻辑
- 完整控制数据内容、数量和生产速率,测试场景灵活
注意事项
使用 local[*] 运行时,若 CPU 核心数为 1,Receiver 线程会占满资源导致无法处理数据。建议明确指定 local[2] 或更高,确保至少有一个核心用于数据处理。
生产环境优先选择 Kafka Direct 模式,原生支持 Exactly-Once 语义,吞吐和可靠性远优于 Socket。