Spark(四):SparkStreaming

本文为学习笔记,对应视频教程来自尚硅谷大数据Spark教程从入门到精通

SparkStreaming 概述

Spark Streaming 是什么

Spark Streaming 用于流式数据的处理,是准实时(秒,分钟),微批次的数据处理框架。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语,如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

image-20220530002409739

和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来讲,DStream 就是对 RDD 在实时数据处理场景的一种封装。

Spark Streaming 架构

image-20220530002443200

背压机制

Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前处理能力,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。

为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据 JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。通过性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值 false,即不启用。

Dstream 快速入门

WordCount 案例实操

1
2
3
4
5
6
<!-- 添加依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.2.1</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
object SparkStreaming01_WordCount {

def main(args: Array[String]): Unit = {
// TODO 创建环境对象
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")

val ssc = new StreamingContext(sparkConf, Seconds(3))

// TODO 逻辑处理
// 获取端口数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split(" "))

val wordToOne: DStream[(String, Int)] = words.map((_, 1))

val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)

wordToCount.print()


// 由于 SparkStreaming 是长期执行的的任务,所以不能直接关闭
// 1.启动采集器
ssc.start()
// 2.等待采集器的关闭
ssc.awaitTermination()
}
}
1
2
3
4
# 启动程序并通过 netcat 发送数据
C:\Users\xxx>nc -lp 9999
Hello World
Hello Spark

WordCount 解析

在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据

image-20220530100504036

对数据的操作也是按照 RDD 为单位来进行的

image-20220530100535991

计算过程由 Spark Engine 来完成

image-20220530100553497

DStream 创建

测试过程中,可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理。

RDD 队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
object SparkStreaming02_Queue {

def main(args: Array[String]): Unit = {
//1.初始化 Spark 配置信息
val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
//2.初始化 SparkStreamingContext
val ssc = new StreamingContext(conf, Seconds(4))
//3.创建 RDD 队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.创建 QueueInputDStream
val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)
//5.处理队列中的 RDD 数据
val mappedStream = inputStream.map((_, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//7.启动任务
ssc.start()
//8.循环创建并向 RDD 队列中放入 RDD
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
ssc.awaitTermination()
}
}

自定义数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 自定义数据源,实现监控某个端口号,获取该端口号内容
*/
object SparkStreaming03_CustomerReceiver {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
val ssc = new StreamingContext(conf, Seconds(4))

val lineStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomerReceiver("localhost", 9999))

val wordStream: DStream[String] = lineStream.flatMap(_.split(" "))

val wordToOne: DStream[(String, Int)] = wordStream.map((_, 1))

val wordCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)

wordCount.print()

ssc.start()
ssc.awaitTermination()
}

//最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Spark
class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

@volatile private var flag = true

override def onStart(): Unit = {
new Thread("Socket Receiver") {
override def run() {
receive()
}
}.start()
}

//读数据并将数据发送给 Spark
def receive(): Unit = {
//创建一个 Socket
var socket: Socket = new Socket(host, port)
//定义一个变量,用来接收端口传过来的数据
var input: String = null
//创建一个 BufferedReader 用于读取端口传来的数据
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
//读取数据
input = reader.readLine()
//当 receiver 没有关闭并且输入数据不为空,则循环发送数据给 Spark
while (flag && input != null) {
store(input)
input = reader.readLine()
}
//跳出循环则关闭资源
reader.close()
socket.close()
}

override def onStop(): Unit = {
flag = false
}
}
}

Kafka 数据源

1
2
3
4
5
6
7
8
9
10
11
<!-- 导入依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台
*/
object SparkStreaming04_DirectKafka {

def main(args: Array[String]): Unit = {
//1.创建 SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
//2.创建 StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3.定义 Kafka 参数
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "spark151:9092,spark152:9092,spark153:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
//4.读取 Kafka 数据创建 DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))
//5.将每条消息的 KV 取出
val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
//6.计算 WordCount
valueDStream.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
//7.开启任务
ssc.start()
ssc.awaitTermination()
}
}

DStream 转换

无状态转化操作

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。例如:reduceByKey() 会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如 reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。

函数名称 目的 Scala示例 用来操作DStream[T]的用户自定义函数的函数签名
map() 对 DStream 中的每个元素应用给定函数,返回由各个元素输出的元素组成的 DStream ds.map(x => x +1) f: T -> U
flatMap() 对 DStream 中的每个元素应用给定函数,返回由各个元素输出的迭代器组成的 DStream ds.flatMap(x => x.split(“ “)) f: T -> Iterable[U]
filter() 返回由给定 DStream 中通过筛选的元素组成的 DStream ds.filter(x => x != 1) f: T -> Boolean
repartition() 改变 DStream 的分区数 ds.repartition(10) N/A
reduceByKey() 将每个批次中键相同的记录归约 ds.reduceByKey((x, y) => x + y) f: (T, T) -> T
groupByKey() 将每个批次中的记录根据键分组 ds.groupByKey N/A
Transform
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
object SparkStreaming06_Transform {

def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(3))

val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

// transform 将底层的 RDD 获取后进行操作
// 使用场景:
// 1.DStream功能不完善
// 2.需要代码周期性执行

// Code:Driver端执行
val newDS: DStream[String] = lines.transform(
rdd => {
// Code:Driver端周期性执行
rdd.map(
str => {
// Code:Executor端
str
}
)
}
)

// Code:Driver端
val newDStream: DStream[String] = lines.map(
data => {
// Code:Executor端
data
}
)

ssc.start()
ssc.awaitTermination()
}
}
join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。
* 计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同
*/
object SparkStreaming07_Join {

def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(5))

val data9999: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val data8888: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)

val map9999: DStream[(String, Int)] = data9999.map((_, 9))
val map8888: DStream[(String, Int)] = data8888.map((_, 8))

val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)

joinDS.print()

ssc.start()
ssc.awaitTermination()
}
}

有状态转化操作

UpdateStateByKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 跨批次累加版 WordCount
*/
object SparkStreaming05_UpdateStateByKey {

def main(args: Array[String]): Unit = {


val sparkConf: SparkConf = new SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("checkpoint")

val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

val words: DStream[String] = lines.flatMap(_.split(" "))

val wordToOne: DStream[(String, Int)] = words.map((_, 1))

// updateStateByKey:根据 key 对数据状态进行更新
// 第一个参数表示相同的 key 对应的 value 数据
// 第二个参数表示缓存区相同 key 的 value 数据
val wordToCount: DStream[(String, Int)] = wordToOne.updateStateByKey(
(seq: Seq[Int], buff: Option[Int]) => {
val newCnt: Int = buff.getOrElse(0) + seq.sum
Option(newCnt)
}
)

wordToCount.print()

ssc.start()
ssc.awaitTermination()
}
}
WindowOperations
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 窗口时长:计算内容的时间范围;
* 滑动步长:隔多久触发一次计算。
* 注意:这两者都必须为采集周期大小的整数倍
*/
object SparkStreaming08_Window {

def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(3))

val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

val words: DStream[String] = lines.window(Seconds(6)).flatMap(_.split(" "))

val wordToOne: DStream[(String, Int)] = words.map((_, 1))

val wordToCount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _)

wordToCount.print()

ssc.start()
ssc.awaitTermination()
}
}

关于 Window 的操作有如下方法:

  1. window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream;
  2. countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
  3. reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
  4. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。

DStream 输出

  1. print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这用于开发和调试。在 Python API 中,同样的操作叫 print();
  2. saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”;
  3. saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为”prefix-TIME_IN_MS[.suffix]”;
  4. saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为”prefix-TIME_IN_MS[.suffix]”;
  5. foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将 RDD 存入文件或者通过网络将其写入数据库。

优雅关闭与恢复

流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内部程序关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
object SparkStreaming09_StopAndResume {

def createSSC(): () => StreamingContext = {
() => {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("checkpoint")

ssc
}
}

def main(args: Array[String]): Unit = {

val ssc: StreamingContext = StreamingContext.getActiveOrCreate("checkpoint", createSSC())

val wordToCount: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _) |

wordToCount.print()

ssc.start()

new Thread(new MonitorStop(ssc)).start()

ssc.awaitTermination()
}

class MonitorStop(ssc: StreamingContext) extends Runnable {
override def run(): Unit = {
val fs: FileSystem = FileSystem.get(new URI("hdfs://spark152:9000"), new Configuration(), "eitan")

while (true) {
try {
Thread.sleep(5000)
} catch {
case e: Exception => e.printStackTrace()
}

val state: StreamingContextState = ssc.getState()

val bool: Boolean = fs.exists(new Path("hdfs://spark152:9000/stopSpark"))

if (bool && state == StreamingContextState.ACTIVE) {
ssc.stop(true, true)
System.exit(0)
}
}
}
}
}