This is article 86 in the Big Data series, introducing three core Spark Streaming data sources: file stream, Socket stream, and RDD queue stream—principles, use cases, and code implementation.
Full illustrated version: CSDN Original | Juejin
Data Sources Overview
Spark Streaming supports multiple data ingestion methods, categorized by reliability and production suitability:
| Data Source | Use Case | Reliability |
|---|---|---|
| File Stream | Log rotation, batch file ingestion | High (No Receiver) |
| Socket Stream | Prototype validation, local testing | Low |
| RDD Queue Stream | Unit testing, load testing simulation | Test-only |
| Kafka Direct | Production real-time streaming | Very High (Exactly-Once) |
Maven Dependency
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
File Data Stream
File stream triggers computation by continuously monitoring new files in a specified directory, supporting HDFS, S3, local filesystem, and other Hadoop-compatible storage.
Characteristics:
- No Receiver needed, doesn’t occupy extra CPU cores
- Only processes new files, existing files and modifications don’t trigger
- Doesn’t support nested directories, file formats within directory must be consistent
package icu.wzk
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FileDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("FileDStream")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
// Monitor directory, check for new files every 5 seconds
val lines = ssc.textFileStream("hdfs://h121.wzk.icu:8020/streaming/input/")
val words = lines.flatMap(_.split("\\s+"))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
StreamingContext(conf, Seconds(5)) sets micro-batch interval to 5 seconds, textFileStream() monitors directory and automatically processes new files.
Socket Data Stream
Socket stream receives text data from a specified host and port via TCP connection, suitable for quick local logic validation.
Start data sender:
nc -lk 9999
object SocketDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("SocketStream")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("0.0.0.0", 9999)
val words = lines.flatMap(_.split("\\s+"))
val wordCount = words.map(x => (x.trim, 1)).reduceByKey(_ + _)
wordCount.print()
ssc.start()
ssc.awaitTermination()
}
}
Batch interval set to 1 second, outputs word frequency statistics every second. Socket stream is simple but has no fault tolerance guarantee—not recommended for production.
RDD Queue Stream
RDD queue stream allows manually placing RDDs into a queue, which Spark Streaming treats as data streams—best for writing test cases and simulating real traffic.
import scala.collection.mutable.Queue
object RDDQueueDStream {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
val sparkConf = new SparkConf()
.setAppName("RDDQueueStream")
.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val rddQueue = new Queue[RDD[Int]]()
val queueStream = ssc.queueStream(rddQueue)
val mappedStream = queueStream.map(r => (r % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()
// Produce one batch every 2 seconds, total 5 batches
for (i <- 1 to 5) {
rddQueue.synchronized {
val range = (1 to 100).map(_ * i)
rddQueue += ssc.sparkContext.makeRDD(range, 2)
}
Thread.sleep(2000)
}
ssc.stop()
}
}
Key Points:
rddQueue.synchronizedensures thread safety- Take modulo 10 of integers and count groupings to verify aggregation logic
- Full control over data content, quantity, and production rate—flexible testing scenarios
Notes
When running with local[*], if CPU core count is 1, the Receiver thread will consume all resources causing data processing failure. It’s recommended to explicitly specify local[2] or higher, ensuring at least one core for data processing.
For production, prefer Kafka Direct mode—native Exactly-Once semantics support, throughput and reliability far superior to Socket.