This is article 86 in the Big Data series, introducing three core Spark Streaming data sources: file stream, Socket stream, and RDD queue stream—principles, use cases, and code implementation.

Full illustrated version: CSDN Original | Juejin

Data Sources Overview

Spark Streaming supports multiple data ingestion methods, categorized by reliability and production suitability:

Data SourceUse CaseReliability
File StreamLog rotation, batch file ingestionHigh (No Receiver)
Socket StreamPrototype validation, local testingLow
RDD Queue StreamUnit testing, load testing simulationTest-only
Kafka DirectProduction real-time streamingVery High (Exactly-Once)

Maven Dependency

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>

File Data Stream

File stream triggers computation by continuously monitoring new files in a specified directory, supporting HDFS, S3, local filesystem, and other Hadoop-compatible storage.

Characteristics:

  • No Receiver needed, doesn’t occupy extra CPU cores
  • Only processes new files, existing files and modifications don’t trigger
  • Doesn’t support nested directories, file formats within directory must be consistent
package icu.wzk

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FileDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("FileDStream")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(5))
    // Monitor directory, check for new files every 5 seconds
    val lines = ssc.textFileStream("hdfs://h121.wzk.icu:8020/streaming/input/")
    val words = lines.flatMap(_.split("\\s+"))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

StreamingContext(conf, Seconds(5)) sets micro-batch interval to 5 seconds, textFileStream() monitors directory and automatically processes new files.

Socket Data Stream

Socket stream receives text data from a specified host and port via TCP connection, suitable for quick local logic validation.

Start data sender:

nc -lk 9999
object SocketDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("SocketStream")
      .setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(1))
    val lines = ssc.socketTextStream("0.0.0.0", 9999)
    val words = lines.flatMap(_.split("\\s+"))
    val wordCount = words.map(x => (x.trim, 1)).reduceByKey(_ + _)
    wordCount.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Batch interval set to 1 second, outputs word frequency statistics every second. Socket stream is simple but has no fault tolerance guarantee—not recommended for production.

RDD Queue Stream

RDD queue stream allows manually placing RDDs into a queue, which Spark Streaming treats as data streams—best for writing test cases and simulating real traffic.

import scala.collection.mutable.Queue

object RDDQueueDStream {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    val sparkConf = new SparkConf()
      .setAppName("RDDQueueStream")
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val rddQueue = new Queue[RDD[Int]]()
    val queueStream = ssc.queueStream(rddQueue)
    val mappedStream = queueStream.map(r => (r % 10, 1))
    val reducedStream = mappedStream.reduceByKey(_ + _)
    reducedStream.print()

    ssc.start()

    // Produce one batch every 2 seconds, total 5 batches
    for (i <- 1 to 5) {
      rddQueue.synchronized {
        val range = (1 to 100).map(_ * i)
        rddQueue += ssc.sparkContext.makeRDD(range, 2)
      }
      Thread.sleep(2000)
    }

    ssc.stop()
  }
}

Key Points:

  • rddQueue.synchronized ensures thread safety
  • Take modulo 10 of integers and count groupings to verify aggregation logic
  • Full control over data content, quantity, and production rate—flexible testing scenarios

Notes

When running with local[*], if CPU core count is 1, the Receiver thread will consume all resources causing data processing failure. It’s recommended to explicitly specify local[2] or higher, ensuring at least one core for data processing.

For production, prefer Kafka Direct mode—native Exactly-Once semantics support, throughput and reliability far superior to Socket.