本文是大数据系列第 86 篇,介绍 Spark Streaming 三种核心数据源:文件流、Socket 流与 RDD 队列流的原理、使用场景与代码实现。

完整图文版(含截图):CSDN 原文 | 掘金

数据源概览

Spark Streaming 支持多种数据接入方式,按可靠性和生产适用性可分为:

数据源适用场景可靠性
文件流日志轮转、批量文件入流高(无 Receiver)
Socket 流原型验证、本地测试
RDD 队列流单元测试、压测模拟测试专用
Kafka Direct生产环境实时流极高(Exactly-Once)

Maven 依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

文件流(File Data Stream)

文件流通过持续监控指定目录中的新文件来触发计算,支持 HDFS、S3、本地文件系统等 Hadoop 兼容存储。

特点:

  • 无需 Receiver,不占用额外 CPU 核心
  • 只处理新增文件,已有文件和文件修改不触发
  • 不支持嵌套目录,目录内文件格式须一致
package icu.wzk

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FileDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("FileDStream")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(5))
    // 监控目录,每 5 秒检查一次新文件
    val lines = ssc.textFileStream("hdfs://h121.wzk.icu:8020/streaming/input/")
    val words = lines.flatMap(_.split("\\s+"))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

StreamingContext(conf, Seconds(5)) 设置微批间隔为 5 秒,textFileStream() 监控目录并自动处理新增文件。

Socket 数据流

Socket 流通过 TCP 连接接收指定主机和端口的文本数据,适合本地快速验证逻辑。

启动数据发送端:

nc -lk 9999
object SocketDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("SocketStream")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1))
    val lines = ssc.socketTextStream("0.0.0.0", 9999)
    val words = lines.flatMap(_.split("\\s+"))
    val wordCount = words.map(x => (x.trim, 1)).reduceByKey(_ + _)
    wordCount.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

批次间隔设为 1 秒,每秒输出一次词频统计。Socket 流简单易用但无容错保证,不推荐用于生产

RDD 队列流

RDD 队列流允许手动将 RDD 放入队列,Spark Streaming 将其作为数据流处理,是编写测试用例和模拟真实流量的最佳方式。

import scala.collection.mutable.Queue

object RDDQueueDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val sparkConf = new SparkConf()
      .setAppName("RDDQueueStream")
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val rddQueue = new Queue[RDD[Int]]()
    val queueStream = ssc.queueStream(rddQueue)
    val mappedStream = queueStream.map(r => (r % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()

    ssc.start()

    // 每 2 秒生产一批 RDD,共 5 批
    for (i <- 1 to 5) {
      rddQueue.synchronized {
        val range = (1 to 100).map(_ * i)
        rddQueue += ssc.sparkContext.makeRDD(range, 2)
      }
      Thread.sleep(2000)
    }

    ssc.stop()
  }
}

关键点:

  • rddQueue.synchronized 保证线程安全
  • 对整数取模 10 后统计分组,验证聚合逻辑
  • 完整控制数据内容、数量和生产速率,测试场景灵活

注意事项

使用 local[*] 运行时,若 CPU 核心数为 1,Receiver 线程会占满资源导致无法处理数据。建议明确指定 local[2] 或更高,确保至少有一个核心用于数据处理。

生产环境优先选择 Kafka Direct 模式,原生支持 Exactly-Once 语义,吞吐和可靠性远优于 Socket。