Spark(二):SparkCore

本文为学习笔记,对应视频教程来自尚硅谷大数据Spark教程从入门到精通

Spark运行架构

运行架构

image-20220522095950890

核心组件

Driver

Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。Driver 在 Spark 作业执行时主要负责:

  • 将用户程序转化为作业(job)
  • 在 Executor 之间调度任务(task)
  • 跟踪 Executor 的执行情况
  • 通过 UI 展示查询运行情况

Executor

Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。Executor 有两个核心功能:

  • 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程
  • 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

ApplicationMaster

Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster。

核心概念

Executor 与 与 Core

Spark Executor 是集群中运行在工作节点(Worker)中的一个 JVM 进程,是整个集群中的专门用于计算的节点。在提交应用中,可以提供参数指定计算节点的个数,以及对应的资源。这里的资源一般指的是工作节点 Executor 的内存大小和使用的虚拟 CPU 核(Core)数量。应用程序相关启动参数如下:

名称 说明
–num-executors 配置 Executor 的数量
–executor-memory 配置每个 Executor 的内存大小
–executor-cores 配置每个 Executor 的虚拟 CPU core 数量

并行度(Parallelism )

在分布式计算框架中一般都是多个任务同时执行,由于任务分布在不同的计算节点进行计算,所以能够真正地实现多任务并行执行。这里我们将整个集群并行执行任务的数量称之为并行度。

提交流程

image-20220522105532798

Yarn Client 模式

Client 模式将用于监控和调度的 Driver 模块在客户端执行,而不是在 Yarn 中,所以一般用于测试。

  • Driver 在任务提交的本地机器上运行
  • Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster
  • ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 内存
  • ResourceManager 接到 ApplicationMaster 的资源申请后会分配 container,然后 ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程
  • Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行main 函数
  • 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行

Yarn Cluster 模式

Cluster 模式将用于监控和调度的 Driver 模块启动在 Yarn 集群资源中执行。一般应用于实际生产环境。

  • 在 YARN Cluster 模式下,任务提交后会和 ResourceManager 通讯申请启动 ApplicationMaster
  • 随后 ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,此时的 ApplicationMaster 就是 Driver
  • Driver 启动后向 ResourceManager 申请 Executor 内存,ResourceManager 接到 ApplicationMaster 的资源申请后会分配container,然后在合适的 NodeManager 上启动 Executor 进程
  • Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行 main 函数
  • 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,之后将 task 分发到各个 Executor 上执行

Spark 核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

  • RDD : 弹性分布式数据集
  • 累加器:分布式共享只写变量
  • 广播变量:分布式共享只读变量

RDD

RDD 概述

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  • 弹性
  • 存储的弹性:内存与磁盘的自动切换;
  • 容错的弹性:数据丢失可以自动恢复;
  • 计算的弹性:计算出错重试机制;
  • 分片的弹性:可根据需要重新分片。
  • 分布式:数据存储在大数据集群不同节点上
  • 数据集:RDD 封装了计算逻辑,并不保存数据
  • 数据抽象:RDD 是一个抽象类,需要子类具体实现
  • 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑

核心属性

1
2
3
4
5
6
7
8
9
10
/**
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*/

分区列表

RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。

1
2
3
4
5
6
7
8
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]

分区计算函数

Spark 在计算时,是使用分区函数对每一个分区进行计算

1
2
3
4
5
6
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]

RDD 之间的依赖关系

RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

1
2
3
4
5
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps

分区器(可选)

当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

1
2
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None

首选位置(可选)

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

1
2
3
4
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

RDD 的执行流程

启动 Yarn 集群环境

image-20220522162731363

Spark 通过申请资源创建调度节点和计算节点

image-20220522163016911

Spark 框架根据需求将计算逻辑根据分区划分成不同的任务

image-20220522163528738

调度节点将任务根据计算节点状态发送到对应的计算节点进行计算

image-20220522164117888

基础编程

RDD 的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
object Spark02_RDD_File {
def main(args: Array[String]): Unit = {
// TODO 准备环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sparkContext = new SparkContext(sparkConf)

// TODO 创建RDD
// 本地文件
val localRdd: RDD[String] = sparkContext.textFile("data/1.txt")
// 分布式文件
val hdfsRdd: RDD[String] = sparkContext.textFile("hdfs://spark152:8020/data/1.txt")
// textFile:以行为单位读取数据
// wholeTextFiles:以文件为单位读取数据
val wholeTextRdd: RDD[(String, String)] = sparkContext.wholeTextFiles("hdfs://spark152:8020/data")

println("================ localRdd ================")
localRdd.collect().foreach(println)
println("================ hdfsRdd ================")
hdfsRdd.collect().foreach(println)
println("================ wholeTextRdd ================")
wholeTextRdd.collect().foreach(println)

// TODO 关闭环境
sparkContext.stop()
}
}
1
2
3
4
5
6
7
8
9
10
11
================ localRdd ================
Hello World
Hello Spark
================ hdfsRdd ================
Hello World
Hello Spark
================ wholeTextRdd ================
(hdfs://spark152:8020/data/1.txt,Hello World
Hello Spark)
(hdfs://spark152:8020/data/2.txt,Hello World
Hello Spark)

RDD 并行度与分区

Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。这里的并行执行的任务数量,并不是指的切分任务的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  // Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}

override def slice(from: Int, until: Int): Array[T] = {
val reprVal = repr
val lo = math.max(from, 0)
val hi = math.min(math.max(until, 0), reprVal.length)
val size = math.max(hi - lo, 0)
val result = java.lang.reflect.Array.newInstance(elementClass, size)
if (size > 0) {
Array.copy(reprVal, lo, result, 0, size)
}
result.asInstanceOf[Array[T]]
}

内存数据的分区规则源码:

  1. length = 集合长度,numSlices = 分区个数
  2. 该函数将返回大小为 numSlices 的元组
  3. 元组的每一项开始位置 start = ((i * length) / numSlices).toInt
  4. 元组的每一项结束位置 end = (((i + 1) * length) / numSlices).toInt
  5. 将元组的每一项传入 slice(from: Int, until: Int) 实现左闭右开的分区规则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
StopWatch sw = new StopWatch().start();
FileStatus[] stats = listStatus(job);

// Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, stats.length);
long totalSize = 0; // compute total size
boolean ignoreDirs = !job.getBoolean(INPUT_DIR_RECURSIVE, false)
&& job.getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);

List<FileStatus> files = new ArrayList<>(stats.length);
for (FileStatus file: stats) { // check we have valid files
if (file.isDirectory()) {
if (!ignoreDirs) {
throw new IOException("Not a file: "+ file.getPath());
}
} else {
files.add(file);
totalSize += file.getLen();
}
}

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits.toArray(new FileSplit[splits.size()]);
}

文件数据的分区规则源码:

  1. 这段代码只能分析出部分实现逻辑,但是并不能完全解释文件数据的分区行为
  2. totalSize = 数据的总长度,goalSize = totalSize / numSplits 为每片分区的目标长度
  3. splitSize = Math.max(minSize, Math.min(goalSize, blockSize)) 实际分片大小
  4. start = length-bytesRemaining,bytesRemaining -= splitSize

RDD 转换算子

Value 类型
map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 从服务器日志数据 apache.log 中获取用户请求 URL 资源路径
* 83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
* 83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
*/
object Spark01_RDD_Operator_Transform_Example {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[String] = sparkContext.textFile("data/apache.log")

val mapRdd: RDD[String] = rdd.map(
line => {
val array: Array[String] = line.split(" ")
array(6)
}
)

mapRdd.collect().foreach(println)

sparkContext.stop()
}
}
mapPartitions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 获取每个分区的最大值
*/
object Spark01_RDD_Operator_Transform_MapPartitions {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4),2)

val mapPartitionsRdd: RDD[Int] = rdd.mapPartitions(
iterator => List(iterator.max).iterator
)

mapPartitionsRdd.collect().foreach(println)

sparkContext.stop()
}
}

map 和 mapPartitions 的区别:

  1. 数据处理角度

    Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作。

  2. 功能的角度

    Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

  3. 性能的角度

    Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。

mapPartitionsWithIndex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 获取第二个数据分区的数据
*/
object Spark04_RDD_Operator_Transform_MapPartitionsWithIndex {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)

val mapPartitionWithIndexRdd: RDD[Int] = rdd.mapPartitionsWithIndex(
(index, iterator) => {
if (index == 1) {
iterator
} else
Nil.iterator
}
)
mapPartitionWithIndexRdd.collect().foreach(println)

sparkContext.stop()
}
}
flatMap
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 对字符串扁平化处理为单词集合
*/
object Spark05_RDD_Operator_Transform_FlatMap {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[String] = sparkContext.makeRDD(List("Hello World", "Hello Spark"))

val flatMapRdd: RDD[String] = rdd.flatMap(_.split(" "))

flatMapRdd.collect().foreach(println)

sparkContext.stop()
}
}
glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 计算所有分区最大值求和(分区内取最大值,分区间最大值求和)
*/
object Spark06_RDD_Operator_Transform_Glom {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 2)

val glomRdd: RDD[Array[Int]] = rdd.glom()
val mapRdd: RDD[Int] = glomRdd.map(_.max)

println(mapRdd.collect().sum)

sparkContext.stop()
}
}
groupBy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 将 List("Hello", "hive", "hbase", "Hadoop") 根据单词首写字母进行分组
*/
object Spark07_RDD_Operator_Transform_GroupBy {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[String] = sparkContext.makeRDD(List("Hello", "hive", "hbase", "Hadoop"))

val groupRdd: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))

groupRdd.collect().foreach(println)

sparkContext.stop()
}
}

将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle;

一个组的数据被放到一个分区当中,但是并不是说一个分区之中只有一个组。

filter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的数据
* 83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
* 83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
*/
object Spark08_RDD_Operator_Transform_Filter {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[String] = sparkContext.textFile("data/apache.log")

val filterRdd: RDD[String] = rdd.filter(
line => {
line.split(" ")(3).startsWith("17/05/2015")
}
)

filterRdd.collect().foreach(println)

sparkContext.stop()
}
}
sample
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be greater
* than or equal to 0
* @param seed seed for the random number generator
*
* @note This is NOT guaranteed to provide exactly the fraction of the count
* of the given [[RDD]].
*/
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong)
distinct
1
2
3
4
5
6
7
8
9
10
11
12
13
14
object Spark09_RDD_Operator_Transform_Distinct {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 1, 2, 2, 3, 3, 4, 4, 5, 5))

// 去重通过 RDD 的 partitioner 逻辑:map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
rdd.distinct().collect().foreach(println)

sparkContext.stop()
}
}
coalesce

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions. If a larger number
* of partitions is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @note With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner. The optional partition coalescer
* passed in must be serializable.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
repartition

该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Return a new RDD that has exactly numPartitions partitions.
*
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
sortBy

该操作用于排序数据,默认为升序排列,第二个参数可以改变排序方式。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致,中间存在 shuffle 的过。

1
2
3
4
5
6
7
8
9
10
11
12
13
object Spark10_RDD_Operator_Transform_SortBy {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[(Int, Int)] = sparkContext.makeRDD(List((1, 9), (2, 8), (3, 7)))

rdd.sortBy(_._1).collect().foreach(println)

sparkContext.stop()
}
}
双 Value 类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
object Spark11_RDD_Operator_Transform {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd1: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sparkContext.makeRDD(List(3, 4, 5, 6))

// intersection 交集
println(rdd1.intersection(rdd2).collect().mkString(","))

// union 并集
println(rdd1.union(rdd2).collect().mkString(","))

// subtract 差集
println(rdd1.subtract(rdd2).collect().mkString(","))

// zip 拉链:将两个 RDD 中的元素,以键值对的形式进行合并。
println(rdd1.zip(rdd2).collect().mkString(","))

sparkContext.stop()
}
}
Key - Value 类型
partitionBy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object Spark12_RDD_Operator_Transform_PartitionBy {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4), 4)

rdd.map((_, 1)).
partitionBy(new HashPartitioner(2)).
saveAsTextFile("output")

sparkContext.stop()
}
}
reduceByKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 将数据按照相同的 Key 对 Value 进行聚合
*/
object Spark13_RDD_Operator_Transform_ReduceByKey {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[(String, Int)] = sparkContext.makeRDD(List(
("a", 1), ("a", 2), ("a", 3), ("b", 4)
))

// reduceByKey 中如果 Key 的数据只有一个,是不会参与运算的
rdd.reduceByKey((x, y) => {
println(s"x = ${x}, y = ${y}")
x + y
}).collect.foreach(println)

sparkContext.stop()
}
}
groupByKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 将数据按照相同的 Key 对 Value 进行聚合
*/
object Spark14_RDD_Operator_Transform_GroupByKey {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[(String, Int)] = sparkContext.makeRDD(List(
("a", 1), ("a", 2), ("a", 3), ("b", 4)
))

// groupByKey:将数据源中,相同 Key 的数据分在一个组中,形成一个对偶元组
val groupRdd: RDD[(String, Iterable[Int])] = rdd.groupByKey()

groupRdd.collect().foreach(println)

sparkContext.stop()
}
}
aggregateByKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 将数据根据不同的规则进行分区内计算和分区间计算
* 需求:分区内求最大值,分区间相加
*/
object Spark15_RDD_Operator_Transform_AggregateByKey {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[(String, Int)] = sparkContext.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)

// aggregateByKey 存在两个柯里化,有两个参数列表
// 第一个参数列表需要传入一个参数表示初始值
// 第二个参数列表需要传递两个参数
// 第一个参数表示分区内计算规则
// 第二个参数表示分区间计算规则
rdd.aggregateByKey(Int.MinValue)(
math.max(_, _),
_ + _
).collect.foreach(println)

sparkContext.stop()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 需求:求不同 Key 的 Value 平均值
*/
object Spark15_RDD_Operator_Transform_AggregateByKey_Average {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[(String, Int)] = sparkContext.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)

// (K, V) -> (K, (V, 1)) -> (K, (sum(V), sum(1))) -> (Key, sum(V)/ sum(1))
println("---------- method one ----------")
rdd.map(
tuple => {
(tuple._1, (tuple._2, 1))
}
).reduceByKey((t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2)
}).map(
tuple => {
(tuple._1, tuple._2._1 / tuple._2._2)
}
).collect().foreach(println)

// (K, V) -> (K, (sum(1), sum(V)) -> (K, sum(V) / sum(1)
println("---------- method two ----------")
val aggRdd: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(
(t, v) => {
(t._1 + 1, t._2 + v)
},
(t1, t2) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
aggRdd.map(
t => {
(t._1, t._2._2 / t._2._1)
}
).collect().foreach(println)

sparkContext.stop()
}
}
foldByKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
*/
object Spark16_RDD_Operator_Transform_FoldByKey {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[(String, Int)] = sparkContext.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)

rdd.foldByKey(Int.MinValue)(_ + _).collect.foreach(println)

sparkContext.stop()
}
}
combineByKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 将数据 List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6))求每个 key 的平均值
*/
object Spark17_RDD_Operator_Transform_CombineByKey {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[(String, Int)] = sparkContext.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)

val combineRdd: RDD[(String, (Int, Int))] = rdd.combineByKey(
(_, 1),
(x: (Int, Int), y) => {
(x._1 + y, x._2 + 1)
},
(x: (Int, Int), y: (Int, Int)) => {
(x._1 + y._1, x._2 + y._2)
}
)

combineRdd.map(
x => {
(x._1, x._2._1 / x._2._2)
}
).collect().foreach(println)

sparkContext.stop()
}
}

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别?

  • reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
  • FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
  • AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
  • CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
sortByKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序
*/
object Spark18_RDD_Operator_Transform_SortByKey {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[(String, Int)] = sparkContext.makeRDD(List(
("a", 1), ("a", 2), ("b", 3),
("b", 4), ("b", 5), ("a", 6)
), 2)

println("ascending = true")
rdd.sortByKey(true).collect().foreach(println)

println("ascending = false")
rdd.sortByKey(false).collect().foreach(println)

sparkContext.stop()
}
}
join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* -------- join --------
* (a,(1,4))
* (b,(2,5))
* -------- leftOuterJoin --------
* (a,(1,Some(4)))
* (b,(2,Some(5)))
* (c,(3,None))
* -------- rightOuterJoin --------
* (a,(Some(1),4))
* (b,(Some(2),5))
* (d,(None,6))
* (d,(None,7))
* -------- cogroup --------
* (a,(CompactBuffer(1),CompactBuffer(4)))
* (b,(CompactBuffer(2),CompactBuffer(5)))
* (c,(CompactBuffer(3),CompactBuffer()))
* (d,(CompactBuffer(),CompactBuffer(6, 7)))
*/
object Spark19_RDD_Operator_Transform_Join {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd1: RDD[(String, Int)] = sparkContext.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2: RDD[(String, Int)] = sparkContext.makeRDD(List(("a", 4), ("b", 5), ("d", 6), ("d", 7)))

println("-------- join --------")
rdd1.join(rdd2).collect().foreach(println)
println("-------- leftOuterJoin --------")
rdd1.leftOuterJoin(rdd2).collect().foreach(println)
println("-------- rightOuterJoin --------")
rdd1.rightOuterJoin(rdd2).collect().foreach(println)
println("-------- cogroup --------")
rdd1.cogroup(rdd2).collect().foreach(println)

sparkContext.stop()
}
}
案例实操
数据准备

agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔

1
2
3
4
5
6
7
8
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
1516609143869 2 8 92 9
1516609143869 6 7 84 24
1516609143869 1 8 95 5
1516609143869 8 1 90 29
1516609143869 3 3 36 16
需求描述

统计出每一个省份 每个广告被点击数量排行的 Top3

功能实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
object Spark20_RDD_Operator_Transform_Case {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[String] = sparkContext.textFile("data/agent.log")
rdd.map(
line => {
val data: Array[String] = line.split(" ")
((data(1), data(4)), 1)
}
).reduceByKey(_ + _)
.map(
data => {
(data._1._1, (data._1._2, data._2))
}
).groupByKey()
.mapValues(
_.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
).collect().foreach(println)

sparkContext.stop()
}
}

RDD 行动算子

reduce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据
*/
object Spark01_RDD_Operator_Action_Reduce {
def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4))

// TODO - 行动算子
// 所谓的行动算子,其实就是触发作业(Job)执行的方法
// 底层调用的是环境对象的 runJob 方法
// 底层代码中会创建 ActiveJob,并提交执行
println(rdd.reduce(_ + _))

sparkContext.stop()
}
}
collect
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 以数组 Array 的形式返回数据集的所有元素
*/
object Spark02_RDD_Operator_Action_Collect {
def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4))

rdd.collect().foreach(println)

sparkContext.stop()
}
}
count
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 返回 RDD 中元素的个数
*/
object Spark03_RDD_Operator_Action_Count {
def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4))

println(rdd.count())

sparkContext.stop()
}
}
first
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 返回 RDD 中的第一个元素
*/
object Spark04_RDD_Operator_Action_First {
def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4))

println(rdd.first())

sparkContext.stop()
}
}
take
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 返回一个由 RDD 的前 n 个元素组成的数组
*/
object Spark05_RDD_Operator_Action_Take {
def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4))

rdd.take(3).foreach(println)

sparkContext.stop()
}
}
takeOrdered
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 返回该 RDD 排序后的前 n 个元素组成的数组
*/
object Spark05_RDD_Operator_Action_TakeOrdered {
def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(4, 2, 3, 1))

rdd.takeOrdered(3)(Ordering.Int.reverse).foreach(println)

sparkContext.stop()
}
}
aggregate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
*/
object Spark06_RDD_Operator_Action_Aggregate {
def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(4, 2, 3, 1), 8)

// aggregateByKey:初始值只会参与分区内计算
// aggregate:初始值会参与分区内计算,也会参与分区间计算
println(rdd.aggregate(0)(_ + _, _ + _))
println(rdd.aggregate(10)(_ + _, _ + _))

sparkContext.stop()
}
}
fold
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 折叠操作,aggregate 的简化版操作
*/
object Spark06_RDD_Operator_Action_Fold {
def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(4, 2, 3, 1), 8)

println(rdd.fold(0)(_ + _))

sparkContext.stop()
}
}
countByKey
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 统计每种 key 的个数
*/
object Spark08_RDD_Operator_Action_CountByKey {
def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

println("---------- countByValue ----------")
val rdd1: RDD[Int] = sparkContext.makeRDD(List(1, 1, 3, 4))
val intToLong: collection.Map[Int, Long] = rdd1.countByValue()
println(intToLong)

println("---------- countByKey ----------")
val rdd2: RDD[(String, Int)] = sparkContext.makeRDD(List(
("a", 1), ("a", 1), ("b", 3), ("d", 4)
))
val stringToLong: collection.Map[String, Long] = rdd2.countByKey()
println(stringToLong)

sparkContext.stop()
}
}
save 相关算子
1
2
3
4
5
6
// 保存成 Text 文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成 Sequencefile 文件
rdd.map((_,1)).saveAsSequenceFile("output2")
foreach
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

/**
* 分布式遍历 RDD 中的每一个元素,调用指定函数
*/
object Spark09_RDD_Operator_Action_Foreach {
def main(args: Array[String]): Unit = {

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 1, 3, 4))

// Driver 端内存集合的循环遍历方法
rdd.collect().foreach(println)
println("================")
// Executor 端内存数据的循环遍历
rdd.foreach(println)

sparkContext.stop()
}
}

RDD 序列化

闭包检查

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变

序列化方法和属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
object Spark01_RDD_Serial {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Serial")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[String] = sparkContext.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))

val search = new Search("h")

search.getMatch1(rdd).collect().foreach(println)

sparkContext.stop()
}

// 类的构造参数其实是类的属性,所以需要闭包检测
class Search(query: String) extends Serializable {
def isMatch(s: String): Boolean = {
s.contains(query)
}

// 函数序列化案例
def getMatch1(rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}

// 属性序列化案例
def getMatch2(rdd: RDD[String]): RDD[String] = {
rdd.filter(x => x.contains(query))
}
}

}
Kryo 序列化框架

参考地址: https://github.com/EsotericSoftware/kryo
Java 的序列化能够序列化任何的类。但是比较重(字节多),序列化后,对象的提交也比较大。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。
注意:即使使用 Kryo 序列化,也要继承 Serializable 接口

RDD 依赖关系

RDD 血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* ---------------- lines ----------------
* (2) data\1.txt,data\2.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_Dependence.scala:11 []
* | data\1.txt,data\2.txt HadoopRDD[0] at textFile at Spark01_RDD_Dependence.scala:11 []
* ---------------- words ----------------
* (2) MapPartitionsRDD[2] at flatMap at Spark01_RDD_Dependence.scala:15 []
* | data\1.txt,data\2.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_Dependence.scala:11 []
* | data\1.txt,data\2.txt HadoopRDD[0] at textFile at Spark01_RDD_Dependence.scala:11 []
* ---------------- wordToOne ----------------
* (2) MapPartitionsRDD[3] at map at Spark01_RDD_Dependence.scala:19 []
* | MapPartitionsRDD[2] at flatMap at Spark01_RDD_Dependence.scala:15 []
* | data\1.txt,data\2.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_Dependence.scala:11 []
* | data\1.txt,data\2.txt HadoopRDD[0] at textFile at Spark01_RDD_Dependence.scala:11 []
* ---------------- wordToSum ----------------
* (2) ShuffledRDD[4] at reduceByKey at Spark01_RDD_Dependence.scala:23 []
* +-(2) MapPartitionsRDD[3] at map at Spark01_RDD_Dependence.scala:19 []
* | MapPartitionsRDD[2] at flatMap at Spark01_RDD_Dependence.scala:15 []
* | data\1.txt,data\2.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_Dependence.scala:11 []
* | data\1.txt,data\2.txt HadoopRDD[0] at textFile at Spark01_RDD_Dependence.scala:11 []
* (Hello,4)
* (World,2)
* (Spark,2)
*/
object Spark01_RDD_Dependence {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("Dependence")
val sparkContext = new SparkContext(sparkConf)

val lines: RDD[String] = sparkContext.textFile("data\\1.txt,data\\2.txt")
println("---------------- lines ----------------")
println(lines.toDebugString)

val words: RDD[String] = lines.flatMap(_.split(" "))
println("---------------- words ----------------")
println(words.toDebugString)

val wordToOne: RDD[(String, Int)] = words.map((_, 1))
println("---------------- wordToOne ----------------")
println(wordToOne.toDebugString)

val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
println("---------------- wordToSum ----------------")
println(wordToSum.toDebugString)

val array: Array[(String, Int)] = wordToSum.collect()
array.foreach(println)

sparkContext.stop()
}
}
RDD 依赖关系

这里所谓的依赖关系,其实就是两个相邻 RDD 之间的关系

RDD 窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用。

1
2
3
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
RDD 宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set
* explicitly then the default serializer, as specified by `spark.serializer`
* config option, will be used.
* @param keyOrdering key ordering for RDD's shuffles
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
* @param shuffleWriterProcessor the processor to control the write behavior in ShuffleMapTask
*/
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
extends Dependency[Product2[K, V]] with Logging
RDD 阶段划分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Create a ResultStage associated with the provided jobId.
*/
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd, resourceProfile)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(shuffleDeps, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
callSite, resourceProfile.id)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}

/**
* Get or create the list of parent stages for the given shuffle dependencies. The new
* Stages will be created with the provided firstJobId.
*/
private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]],
firstJobId: Int): List[Stage] = {
shuffleDeps.map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
  1. 提交 Job 时,会查找当前 RDD 有无 shuffleDeps
  2. 为每个 shuffleDeps 创建对应的 shuffleMapStage
  3. 不管有没有 shuffleDeps 都会创建 ResultStage
RDD 任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

RDD 持久化

RDD Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* RDD 不存储数据,如果一个 RDD 需要重复使用,那么需要从头再次执行来获取数据
* // mapRdd.cache() mapRdd.cache()
* ------ map ------ ------ map ------
* ------ map ------ ------ map ------
* ------ map ------ ------ map ------
* ------ map ------ ------ map ------
* (Hello,2) (Hello,2)
* (Spark,1) (Spark,1)
* (World,1) (Spark,1)
* ==================== ====================
* ------ map ------ (Hello,2)
* ------ map ------ (Spark,1)
* ------ map ------ (Spark,1)
* ------ map ------
* (Hello,2)
* (Spark,1)
* (World,1)
*/
object Spark01_RDD_Cache {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Cache")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[String] = sparkContext.makeRDD(Array("Hello World", "Hello Spark"))

val flatMapRdd: RDD[String] = rdd.flatMap(_.split(" "))

val mapRdd: RDD[(String, Int)] = flatMapRdd.map(
word => {
println("------ map ------")
(word, 1)
}
)

mapRdd.cache()

mapRdd.reduceByKey(_ + _).collect().foreach(println)

println("====================")

mapRdd.aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)

sparkContext.stop()
}
}
persist 更改存储级别
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
object Spark02_RDD_Persist {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Persist")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[String] = sparkContext.makeRDD(Array("Hello World", "Hello Spark"))

val flatMapRdd: RDD[String] = rdd.flatMap(_.split(" "))

val mapRdd: RDD[(String, Int)] = flatMapRdd.map(
word => {
println("------ map ------")
(word, 1)
}
)

// 可以更改存储级别
mapRdd.persist(StorageLevel.DISK_ONLY)

mapRdd.reduceByKey(_ + _).collect().foreach(println)

println("====================")

mapRdd.aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)

sparkContext.stop()
}
}
级别 使用的空间 CPU时间 是否在内存中 是否在磁盘上 备注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 中等 部分 部分 如果数据在内存中放不下,则溢写到磁盘上
MEMORY_AND_DISK_SER 部分 部分· 如果数据在内存中放不下,则溢写到磁盘上。在内存中存放序列化后的数据
DISK_ONLY
RDD CheckPoint 检查点

所谓的检查点其实就是通过将 RDD 中间结果写入磁盘由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
object Spark03_RDD_CheckPoint {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("CheckPoint")
val sparkContext = new SparkContext(sparkConf)
sparkContext.setCheckpointDir("checkpoint")

val rdd: RDD[String] = sparkContext.makeRDD(Array("Hello World", "Hello Spark"))

val flatMapRdd: RDD[String] = rdd.flatMap(_.split(" "))

val mapRdd: RDD[(String, Int)] = flatMapRdd.map(
word => {
println("------ map ------")
(word, 1)
}
)

// checkpoint 需要落盘,需要指定检查点保存路径
// 检查点路径保存的文件,当作业执行完毕后不会被删除
mapRdd.checkpoint()

mapRdd.reduceByKey(_ + _).collect().foreach(println)

println("====================")

mapRdd.aggregateByKey(0)(_ + _, _ + _).collect().foreach(println)

sparkContext.stop()
}
}
缓存和检查点区别
  1. Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖;
  2. Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在 HDFS 等容错、高可用的文件系统,可靠性高;
  3. 建议对 checkpoint() 的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次 RDD。

RDD 分区器

Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。

  • 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None;
  • 每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
object Spark01_RDD_Partitioner {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Partitioner")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[(String, String)] = sparkContext.makeRDD(Array(
("湖北", "武汉"), ("福建", "厦门"), ("上海", "上海"),
("福建", "福州"), ("福建", "三明"), ("湖北", "十堰")
),3)

rdd.partitionBy(new MyHashPartitioner).saveAsTextFile("output")

sparkContext.stop()
}

/**
* 自定义分区器
* 1. 继承 Partitioner
* 2. 重写方法
*/
class MyPartitioner extends Partitioner {
override def numPartitions: Int = 3

override def getPartition(key: Any): Int = {
key match {
case "福建" => 0
case "湖北" => 1
case "上海" => 2
}
}
}

}

文件读取与保存

text 文件
1
2
3
4
// 读取输入文件
val inputRDD: RDD[String] = sc.textFile("input/1.txt")
// 保存数据
inputRDD.saveAsTextFile("output")
sequence 文件
1
2
3
4
// 保存数据为 SequenceFile
dataRDD.saveAsSequenceFile("output")
// 读取 SequenceFile 文件
sc.sequenceFile[Int,Int]("output").collect().foreach(println)
object 对象
1
2
3
4
// 保存数据
dataRDD.saveAsObjectFile("output")
// 读取数据
sc.objectFile[Int]("output").collect().foreach(println)

累加器

实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

基础编程

系统累加器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
object Spark01_Accumulator {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Accumulator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4))

// 获取系统累加器
val sumAcc: LongAccumulator = sparkContext.longAccumulator("sum")

rdd.foreach(sumAcc.add(_))

// 获取累加器的值
println(sumAcc.value)

sparkContext.stop()
}
}

注意细节:

  • 少加:转换算子中调用累加器,如果没用调用行动算子,那么变不会执行,导致累加器少加
  • 多加:行动算子多次调用,则累加器多次被调用造成多加
  • 一般情况下,累加器放在行动算子中进行操作
自定义累加器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
object Spark02_Accumulator_WordCount {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Accumulator")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[String] = sparkContext.makeRDD(List("Hello Spark", "Hello World", "Hello Scala"))

// 创建累加器
val wordCountAccumulator = new WordCountAccumulator
// 注册累加器到 sparkContext
sparkContext.register(wordCountAccumulator, "wordCountAccumulator")

rdd.flatMap(_.split(" ")).foreach(wordCountAccumulator.add(_))

println(wordCountAccumulator.value)

sparkContext.stop()
}

class WordCountAccumulator() extends AccumulatorV2[String, mutable.Map[String, Int]] {
private var wcMap: mutable.Map[String, Int] = mutable.Map[String, Int]()

// 判断是否为初始状态
override def isZero: Boolean = {
wcMap.isEmpty
}

// 复制累加器
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
val copy = new WordCountAccumulator()
copy.wcMap = this.wcMap
copy
}

// 重置累加器
override def reset(): Unit = {
wcMap.clear()
}

// 获取累加器要计算的值
override def add(v: String): Unit = {
val newCount: Int = wcMap.getOrElse(v, 0) + 1
wcMap.update(v, newCount)
}

// Driver合并多个累加器
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
val map1: mutable.Map[String, Int] = other.value
val map2: mutable.Map[String, Int] = this.value

map1.foreach(
t => {
val newCount: Int = map2.getOrElse(t._1, 0) + t._2
map2.update(t._1, newCount)
}
)
}

// 累加器结果
override def value: mutable.Map[String, Int] = {
wcMap
}
}
}

广播变量

实现原理

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务分别发送。

image-20220526155535133

基础编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
object Spark03_Broadcast {

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Broadcast")
val sparkContext = new SparkContext(sparkConf)

val rdd: RDD[(String, Int)] = sparkContext.makeRDD(List(
("a", 1), ("b", 2), ("c", 3)
))

val map: mutable.Map[String, Int] = mutable.Map(
("a", 3), ("b", 4), ("c", 5)
)
// 声明广播变量
val broadcast: Broadcast[mutable.Map[String, Int]] = sparkContext.broadcast(map)

rdd.map {
case (k, v) => {
// 使用广播变量
val value: Int = broadcast.value.getOrElse(k, 0)
(k, (v, value))
}
}.collect().foreach(println)

sparkContext.stop()
}
}

Spark 案例实操

数据准备

image-20220526165850309

上面的数据图是从数据文件中截取的一部分内容,表示为电商网站的用户行为数据,主要包含用户的 4 种行为:搜索,点击,下单,支付。数据规则如下:

  1. 数据文件中每行数据采用下划线分隔数据
  2. 每一行数据表示用户的一次行为,这个行为只能是 4 种行为的一种
  3. 如果搜索关键字为 null,表示数据不是搜索数据
  4. 如果点击的品类 ID 和产品 ID 为-1,表示数据不是点击数据
  5. 针对于下单行为,一次可以下单多个商品,所以品类 ID 和产品 ID 可以是多个,id 之间采用逗号分隔,如果本次不是下单行为,则数据采用 null 表示
  6. 支付行为和下单行为类似

详细字段说明

编码 字段名称 字段类型 字段含义
1 date String 用户点击行为的日期
2 user_id Long 用户的 ID
3 session_id String Session 的 ID
4 page_id Long 某个页面的 ID
5 action_time String 动作的时间点
6 search_key String 用户搜索的关键词
7 click_category_id Long 某一个商品品类的 ID
8 click_product_id Long 某一个商品的 ID
9 order_category_ids String 一次订单中所有品类的 ID 集合
10 order_product_ids String 一次订单中所有商品的 ID 集合
11 pay_category_ids String 一次支付中所有品类的 ID 集合
12 pay_product_ids String 一次支付中所有商品的 ID 集合
13 city_id Long 城市 ID

需求一 :Top10 热门品类

需求说明

先按照点击数排名,靠前的就排名高;如果点击数相同,再比较下单数;下单数再相同,就比较支付数

实现一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
object Spark01_HotCategoryTop10 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Case")
val sparkContext = new SparkContext(sparkConf)

// 1. 读取原始日志数据
val actionRDD: RDD[String] = sparkContext.textFile("data\\user_visit_action.txt")

// 2. 统计品类的点击数量:(品类ID,点击数量)
val clickActionRdd: RDD[String] = actionRDD.filter(_.split("_")(6) != "-1")

val clickCountRdd: RDD[(String, Int)] = clickActionRdd.map(
action => {
val datas: Array[String] = action.split("_")
(datas(6), 1)
}
).reduceByKey(_ + _)

// 3. 统计品类的下单数量:(品类ID,下单数量)
val orderActionRdd: RDD[String] = actionRDD.filter(_.split("_")(8) != "null")

val orderCountRdd: RDD[(String, Int)] = orderActionRdd.flatMap(
action => {
val datas: Array[String] = action.split("_")
val cidStr: String = datas(8)
val cids: Array[String] = cidStr.split(",")
cids.map((_, 1))
}
).reduceByKey(_ + _)

// 4. 统计品类的支付数量:(品类ID,支付数量)
val payActionRdd: RDD[String] = actionRDD.filter(_.split("_")(10) != "null")

val payCountRdd: RDD[(String, Int)] = payActionRdd.flatMap(
action => {
val datas: Array[String] = action.split("_")
val cidStr: String = datas(10)
val cids: Array[String] = cidStr.split(",")
cids.map((_, 1))
}
).reduceByKey(_ + _)

// 5. 将品类进行排序
// 元组排序:先比较第一个,再比较第二个,再比较第三,以此类推
// (品类ID,(点击数量,下单数量,支付数量))
val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = clickCountRdd.cogroup(orderCountRdd, payCountRdd)

cogroupRdd.mapValues {
case (clickIter, orderIter, payIter) => {
val clickCount: Int = if (clickIter.iterator.hasNext) clickIter.iterator.next() else 0
val orderCount: Int = if (orderIter.iterator.hasNext) orderIter.iterator.next() else 0
val payCount: Int = if (payIter.iterator.hasNext) payIter.iterator.next() else 0
(clickCount, orderCount, payCount)
}
}.sortBy(_._2, false).take(10).foreach(println)

sparkContext.stop()
}
}
实现二
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
object Spark02_HotCategoryTop10 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Case")
val sparkContext = new SparkContext(sparkConf)

// 1. 读取原始日志数据
val actionRDD: RDD[String] = sparkContext.textFile("data\\user_visit_action.txt")
actionRDD.cache()

// 2. 统计品类的点击数量:(品类ID,点击数量)
val clickActionRdd: RDD[String] = actionRDD.filter(_.split("_")(6) != "-1")

val clickCountRdd: RDD[(String, Int)] = clickActionRdd.map(
action => {
val datas: Array[String] = action.split("_")
(datas(6), 1)
}
).reduceByKey(_ + _)

// 3. 统计品类的下单数量:(品类ID,下单数量)
val orderActionRdd: RDD[String] = actionRDD.filter(_.split("_")(8) != "null")

val orderCountRdd: RDD[(String, Int)] = orderActionRdd.flatMap(
action => {
val datas: Array[String] = action.split("_")
val cidStr: String = datas(8)
val cids: Array[String] = cidStr.split(",")
cids.map((_, 1))
}
).reduceByKey(_ + _)

// 4. 统计品类的支付数量:(品类ID,支付数量)
val payActionRdd: RDD[String] = actionRDD.filter(_.split("_")(10) != "null")

val payCountRdd: RDD[(String, Int)] = payActionRdd.flatMap(
action => {
val datas: Array[String] = action.split("_")
val cidStr: String = datas(10)
val cids: Array[String] = cidStr.split(",")
cids.map((_, 1))
}
).reduceByKey(_ + _)

val clickCountFormatRdd: RDD[(String, (Int, Int, Int))] = clickCountRdd.map(x => (x._1, (x._2, 0, 0)))
val orderCountFormatRdd: RDD[(String, (Int, Int, Int))] = orderCountRdd.map(x => (x._1, (0, x._2, 0)))
val payCountFormatRdd: RDD[(String, (Int, Int, Int))] = payCountRdd.map(x => (x._1, (0, 0, x._2)))

// 5. 将品类进行排序
// 元组排序:先比较第一个,再比较第二个,再比较第三,以此类推
// (品类ID,(点击数量,下单数量,支付数量))
clickCountFormatRdd.union(orderCountFormatRdd).union(payCountFormatRdd).reduceByKey(
(v1, v2) => {
(v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3)
}
).sortBy(_._2, false).take(10).foreach(println)

sparkContext.stop()
}
}
实现三
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
object Spark03_HotCategoryTop10 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Case")
val sparkContext = new SparkContext(sparkConf)

// 1. 读取原始日志数据
val actionRDD: RDD[String] = sparkContext.textFile("data\\user_visit_action.txt")

// 2. 将数据转换结构
// 点击:(品类ID,(1,0,0))
// 下单:(品类ID,(0,1,0))
// 支付:(品类ID,(0,0,1))
val flatmapRdd: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
line => {
val data: Array[String] = line.split("_")
if (data(6) != "-1") {
List((data(6), (1, 0, 0)))
} else if (data(8) != "null") {
data(8).split(",").map((_, (0, 1, 0)))
} else if (data(10) != "null") {
data(10).split(",").map((_, (0, 0, 1)))
} else {
Nil
}
}
)

// 3. 将相同的品类ID数据进行分组聚合
val analysisRdd: RDD[(String, (Int, Int, Int))] = flatmapRdd.reduceByKey(
(v1, v2) => {
(v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3)
}
)

// 4. 将统计数据排序并打印
analysisRdd.sortBy(_._2, false).take(10).foreach(println)

sparkContext.stop()
}
}
实现四
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
object Spark04_HotCategoryTop10 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Case")
val sparkContext = new SparkContext(sparkConf)

// 1. 读取原始日志数据
val actionRDD: RDD[String] = sparkContext.textFile("data\\user_visit_action.txt")

// 2. 声明累加器
val accumulator = new HotCategoryAccumulator
sparkContext.register(accumulator, "hotCategoryAccumulator")

// 3. 遍历累加
actionRDD.foreach(
line => {
val data: Array[String] = line.split("_")
if (data(6) != "-1") {
accumulator.add(data(6), "click")
} else if (data(8) != "null") {
data(8).split(",").foreach(
accumulator.add(_, "order")
)
} else if (data(10) != "null") {
data(10).split(",").foreach(
accumulator.add(_, "pay")
)
}
}
)

// 4. 使用累加器
val accVal: mutable.Map[String, HotCategory] = accumulator.value
val list: List[HotCategory] = accVal.map(_._2).toList

list.sortWith(
(left, right) => {
if (left.clickCnt > right.clickCnt) {
true
} else if (left.clickCnt == right.clickCnt && left.orderCnt > right.orderCnt) {
true
} else if (left.clickCnt == right.clickCnt && left.orderCnt == right.orderCnt && left.payCnt > right.payCnt) {
true
} else {
false
}
}
).take(10).foreach(println)

sparkContext.stop()
}

case class HotCategory(cid: String, var clickCnt: Int, var orderCnt: Int, var payCnt: Int)

/**
* 自定义累加器
* 1. 继承 AccumulatorV2,定义泛型
* IN:(品类ID,行为类型)
* OUT:mutable.Map[String, HotCategory]
* 2. 重写方法
*
*/
class HotCategoryAccumulator extends AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] {

private val hcMap = mutable.Map[String, HotCategory]()

override def isZero: Boolean = {
hcMap.isEmpty
}

override def copy(): AccumulatorV2[(String, String), mutable.Map[String, HotCategory]] = {
new HotCategoryAccumulator
}

override def reset(): Unit = {
hcMap.clear()
}

override def add(v: (String, String)): Unit = {
val cid: String = v._1
val actionType: String = v._2
val category: HotCategory = hcMap.getOrElse(cid, new HotCategory(cid, 0, 0, 0))
if (actionType == "click") {
category.clickCnt += 1;
} else if (actionType == "order") {
category.orderCnt += 1;
} else if (actionType == "pay") {
category.payCnt += 1;
}
hcMap.update(cid, category)
}

override def merge(other: AccumulatorV2[(String, String), mutable.Map[String, HotCategory]]): Unit = {
val map1: mutable.Map[String, HotCategory] = this.value
val map2: mutable.Map[String, HotCategory] = other.value

map2.foreach {
case (cid, hc) => {
val category: HotCategory = map1.getOrElse(cid, new HotCategory(cid, 0, 0, 0))
category.clickCnt += hc.clickCnt
category.orderCnt += hc.orderCnt
category.payCnt += hc.payCnt
map1.update(cid, category)
}
}
}

override def value: mutable.Map[String, HotCategory] = hcMap
}
}

需求二:Top10 热门品类中每个品类的 的 Top10 活跃 Session 统计

需求说明

在需求一的基础上,增加每个品类用户 session 的点击统计

实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
object Spark05_HotCategoryAndSessionTop10 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Case")
val sparkContext = new SparkContext(sparkConf)

// 1.读取原始日志数据
val actionRDD: RDD[String] = sparkContext.textFile("data\\user_visit_action.txt")
actionRDD.cache()

// 2. 获取热门品类Top10的ID
val ids: Array[String] = hotCategoryIDsTop10(actionRDD)

// 3.过滤原始数据保留前10品类的点击数据
val filterActionRdd: RDD[String] = actionRDD.filter(
line => {
val data: Array[String] = line.split("_")
ids.contains(data(6))
}
)

// 4.根据品类ID和session进行点击量统计
val reduceRdd: RDD[((String, String), Int)] = filterActionRdd.map(
line => {
val data: Array[String] = line.split("_")
((data(6), data(2)), 1)
}
).reduceByKey(_ + _)

// 5.将统计结果结构转换
val mapRdd: RDD[(String, (String, Int))] = reduceRdd.map {
case ((cid, sid), sum) => {
(cid, (sid, sum))
}
}

// 6.相同品类分组
val groupRdd: RDD[(String, Iterable[(String, Int)])] = mapRdd.groupByKey()

// 7.将分组后的数据取前10
val resultRdd: RDD[(String, List[(String, Int)])] = groupRdd.mapValues(
iter => {
iter.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
}
)

resultRdd.collect().foreach(println)

sparkContext.stop()
}

def hotCategoryIDsTop10(actionRDD: RDD[String]) = {
val flatmapRdd: RDD[(String, (Int, Int, Int))] = actionRDD.flatMap(
line => {
val data: Array[String] = line.split("_")
if (data(6) != "-1") {
List((data(6), (1, 0, 0)))
} else if (data(8) != "null") {
data(8).split(",").map((_, (0, 1, 0)))
} else if (data(10) != "null") {
data(10).split(",").map((_, (0, 0, 1)))
} else {
Nil
}
}
)

val analysisRdd: RDD[(String, (Int, Int, Int))] = flatmapRdd.reduceByKey(
(v1, v2) => {
(v1._1 + v2._1, v1._2 + v2._2, v1._3 + v2._3)
}
)

analysisRdd.sortBy(_._2, false).take(10).map(_._1)
}
}

需求三:页面单跳转换率统计

需求说明

计算页面单跳转化率,什么是页面单跳转换率,比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率。
比如:计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率。

需求分析

需求实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
object Spark07_PageFlowAnalysis_Filter {

def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Case")
val sparkContext = new SparkContext(sparkConf)

val actionRDD: RDD[String] = sparkContext.textFile("data\\user_visit_action.txt")

val actionDataRdd: RDD[UserVisitAction] = actionRDD.map(
castToUserVisitAction(_)
)
actionDataRdd.cache()

// TODO 对指定页面连续跳转进行统计
// 1-2,2-3,3-4,4-5,5-6,6-7
val ids: List[Long] = List[Long](1, 2, 3, 4, 5, 6, 7)
val wantFlowIds: List[(Long, Long)] = ids.zip(ids.tail)

// TODO 计算分母
val pageIdToCountMap: Map[Long, Int] = actionDataRdd.filter(
action => {
ids.init.contains(action.page_id)
}
).map(
action => {
(action.page_id, 1)
}
).reduceByKey(_ + _).collect().toMap

// TODO 计算分子
// 根据session进行分组
val sessionRdd: RDD[(String, Iterable[UserVisitAction])] = actionDataRdd.groupBy(_.session_id)

// 分组后,根据访问时间从小到大排序
val mvRdd: RDD[(String, List[((Long, Long), Int)])] = sessionRdd.mapValues(
iter => {
val sortList: List[UserVisitAction] = iter.toList.sortBy(_.action_time)
val flowIds: List[Long] = sortList.map(_.page_id)
// Sliding:滑窗
// Zip:拉链
val pageFlowIds: List[(Long, Long)] = flowIds.zip(flowIds.tail)
// 将不合法的页面过滤
pageFlowIds.filter(
tuple => {
wantFlowIds.contains(tuple)
}
).map(
tuple => {
(tuple, 1)
}
)
}
)

val dataRdd: RDD[((Long, Long), Int)] = mvRdd.map(_._2).flatMap(list => list).reduceByKey(_ + _)

// TODO 计算单调转换率
dataRdd.foreach {
case ((pageId1, pageId2), sum) => {
val denominator: Int = pageIdToCountMap.get(pageId1).get
println(s"页面【${pageId1}】到页面【${pageId2}】单跳转换率为:" + sum.toDouble / denominator)
denominator
}
}

sparkContext.stop()
}

def castToUserVisitAction(str: String): UserVisitAction = {
val data: Array[String] = str.split("_")
UserVisitAction(
data(0),
data(1).toLong,
data(2),
data(3).toLong,
data(4),
data(5),
data(6).toLong,
data(7).toLong,
data(8),
data(9),
data(10),
data(11),
data(12).toLong
)
}

//用户访问动作表
case class UserVisitAction(
date: String, //用户点击行为的日期
user_id: Long, //用户的 ID
session_id: String, //SessionID
page_id: Long, //某个页面的 ID
action_time: String, //动作的时间点
search_keyword: String, //用户搜索的关键词
click_category_id: Long, //某一个商品品类的 ID
click_product_id: Long, //某一个商品的 ID
order_category_ids: String, //一次订单中所有品类的 ID 集合
order_product_ids: String, //一次订单中所有商品的 ID 集合
pay_category_ids: String, //一次支付中所有品类的 ID 集合
pay_product_ids: String, //一次支付中所有商品的 ID 集合
city_id: Long //城市 id
)

}

工程化代码

image-20220527103546600

application
1
2
3
4
5
6
7
8
9
10
11
package com.eitan.bigdata.spark.core.framework.application

import com.eitan.bigdata.spark.core.framework.common.TApplication
import com.eitan.bigdata.spark.core.framework.controller.WordCountController

object WordCountApplication extends App with TApplication {
start() {
val controller = new WordCountController
controller.dispatch()
}
}
common
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.eitan.bigdata.spark.core.framework.common

import com.eitan.bigdata.spark.core.framework.util.EnvUtil
import org.apache.spark.{SparkConf, SparkContext}

trait TApplication {
def start(master: String = "local[*]", app: String = "Application")(op: => Unit) = {
val sparkConf: SparkConf = new SparkConf().setMaster(master).setAppName(app)
val sc = new SparkContext(sparkConf)
EnvUtil.put(sc)

try {
op
} catch {
case ex => println(ex.getMessage)
}

sc.stop()
EnvUtil.clear()
}

}
1
2
3
4
5
package com.eitan.bigdata.spark.core.framework.common

trait TController {
def dispatch(): Unit
}
1
2
3
4
5
6
7
8
9
package com.eitan.bigdata.spark.core.framework.common

import com.eitan.bigdata.spark.core.framework.util.EnvUtil

trait TDao {
def readFile(path: String) = {
EnvUtil.take().textFile(path)
}
}
1
2
3
4
5
package com.eitan.bigdata.spark.core.framework.common

trait TService {
def dataAnalysis(): Any
}
controller
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.eitan.bigdata.spark.core.framework.controller

import com.eitan.bigdata.spark.core.framework.common.TController
import com.eitan.bigdata.spark.core.framework.service.WordCountService

/**
* 控制层
*/
class WordCountController extends TController {
private val wordCountService = new WordCountService

def dispatch(): Unit = {
val array = wordCountService.dataAnalysis()
array.foreach(println)
}
}
dao
1
2
3
4
5
6
7
8
9
10
11
package com.eitan.bigdata.spark.core.framework.dao

import com.eitan.bigdata.spark.core.framework.common.TDao


/**
* 持久层
*/
class WordCountDao extends TDao {

}
service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.eitan.bigdata.spark.core.framework.service

import com.eitan.bigdata.spark.core.framework.common.TService
import com.eitan.bigdata.spark.core.framework.dao.WordCountDao
import org.apache.spark.rdd.RDD

/**
* 服务层
*/
class WordCountService extends TService {
private val wordCountDao = new WordCountDao

// 数据分析
def dataAnalysis() = {
// 读取文件数据
val lines: RDD[String] = wordCountDao.readFile("data\\1.txt,data\\2.txt")
// 将文件中的数据进行分词
val words: RDD[String] = lines.flatMap(_.split(" "))

val wordToOne: RDD[(String, Int)] = words.map(
word => (word, 1)
)

// Spark 框架提供了更多功能,可以将分组和聚合使用一个方法实现
// reduceByKey: 相同Key的数据,可以对value进行reduce聚合
val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)

// 将转换结果采集到控制台打印出来
wordToCount.collect()
}
}
util
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.eitan.bigdata.spark.core.framework.util

import org.apache.spark.SparkContext

object EnvUtil {
private val scLocal = new ThreadLocal[SparkContext]

def put(sc: SparkContext): Unit = {
scLocal.set(sc)
}

def take(): SparkContext = {
scLocal.get()
}

def clear(): Unit = {
scLocal.remove()
}
}