Skip to content

Spark性能调优

1. 常规性能调优

1.1 最优资源配置

在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的 性能调优策略。资源的分配在使用脚本提交Spark任务时进行指定,标准的Spark任务提交脚本如下所示:

sh
bin/spark-submit \
--class com.rocket.spark.Analysis \
--master yarn
--deploy-mode cluster
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
/usr/opt/modules/spark/jar/spark.jar \

调节原则:尽量将任务分配的资源调节到可以使用的资源的最大限度。
对于具体资源的分配,我们分别讨论Spark的两种Cluster运行模式:

  • 第一种是Spark Standalone模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源情况,在编写submit脚本的时候,就根据可用的资源情况进行资源的分配,比如说集群有15台机器,每台机器为8G内存,2个CPU core,那么就指定15个Executor,每个Executor分配8G内存,2个CPUcore。
  • 第二种是Spark Yarn模式,由于Yarn使用资源队列进行资源的分配和调度,在编写submit脚本的时候,就根据 Spark作业要提交到的资源队列,进行资源的分配,比如资源队列有400G内存,100个CPU core,那么指定50个 Executor,每个Executor分配8G内存,2个CPU core。

1.2 RDD优化

  1. RDD复用
    在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算 Alt text 对上图中的RDD计算架构进行修改,得到如下图所示的优化结果(上图中RDD2和RDD3一样的可以合并): Alt text
  2. RDD持久化
    在Spark中,当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算是对资源的极大浪费,因此,必须对多次使用的RDD进行持久化,通过持久化将公共 RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据。
  3. RDD尽可能早的filter操作 获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。

1.3 并行度调节

Spark作业中的并行度指各个stage的task的数量。如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如20个Executor,每个Executor分配3个CPU core,而Spark作业有40个task,这样每个Executor分配到的 task个数是2个,这就使得每个Executor有一个CPU core空闲,导致资源的浪费。
理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度。
Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。之所以没有推荐task数量与CPU core总数相等,是因为task的执行时间不同,有的task执行速度快而有的task执行速度慢,如果task数量与CPU core总数相等,那么执行快的task执行完成后,会出现CPU core空闲的情况。如果task数量设置为CPU core总数的2~3倍,那么一个task执行完毕后,CPU core会立刻执行下一个task,降低了资源的浪费,同时提升了Spark作业运行的效率。
Spark作业并行度的设置如下所示:

scala
val conf = new SparkConf().set("spark.default.parallelism", "500")

1.4 广播大变量

默认情况下,task中的算子中如果使用了外部的变量,每个task都会获取一份变量的副本,这就造成了内存的极大消耗。一方面,如果后续对RDD进行持久化,可能就无法将RDD数据存入内存,只能写入磁盘,磁盘IO将会严重消耗性能;另一方面,task在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的GC,GC会导致工作线程停止,进而导致Spark暂停工作一段时间,严重影响Spark性能。
假设当前任务配置了20个Executor,指定500个task,有一个20M的变量被所有task共用,此时会在500个task 中产生500个副本,耗费集群10G的内存,如果使用了广播变量, 那么每个Executor保存一个副本,一共消耗400M内存,内存消耗减少了5倍。广播变量在每个Executor保存一个副本,此Executor的所有task共用此广播变量,这让变量产生的副本数量大大减少。
在初始阶段,广播变量只在Driver中有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager 就会从 Driver 或者其他节点的BlockManager上远程拉取变量的副本,并由本地的BlockManager进行管理;之后此Executor的所有task都会直接从本地的BlockManager中获取变量。

1.5 Kryo序列化

默认情况下,Spark使用Java的序列化机制。Java的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现Serializable接口即可,但是Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。
Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便,但从Spark2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs已经默认使用Kryo序列化方式了。 详见RDD序列化

1.6 调节本地化等待时长

Spark作业运行过程中,Driver会对每一个stage的task进行分配。根据Spark的task分配算法,Spark希望task能够运行在它要计算的数据算在的节点(数据本地化思想),这样就可以避免数据的网络传输。通常来说,task可能不会被分配到它处理的数据所在的节点,因为这些节点可用的资源可能已经用尽,此时Spark会等待一段时间,默认3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,尝试将task分配到比较差的本地化级别所对应的节点上,比如将task分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级。
当task要处理的数据不在task所在节点上时,会发生数据的传输。task会通过所在节点的BlockManager获取数据,BlockManager发现数据不在本地时,户通过网络传输组件从数据所在节点的BlockManager处获取数据。
网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分task,那么当前的task将有机会得到执行,这样就能够改善Spark作业的整体性能。
Spark的本地化等级如表所示:

名称解析
PROCESS_LOCAL进程本地化,task 和数据在同一个Executor中,
性能最好。
NODE_LOCAL节点本地化,task和数据在同一个节点中,
但是task和数据不在同一个 Executor 中,
数据需要在进程间进行传输。
RACK_LOCAL机架本地化,task 和数据在同一个机架的两个节点上,
数据需要通过网络在节点之间进行传输。
NO_PREF对于task来说,从哪里获取都一样,没有好坏之分。
ANYtask 和数据可以在集群的任何地方,
而且不在一个机架中,性能最差。

在Spark项目开发阶段,可以使用client模式对程序进行测试,此时可以在本地看到比较全的日志信息,日志信息中有明确的task数据本地化的级别,如果大部分都是PROCESS_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是 NODE_LOCAL、ANY,那么需要对本地化的等待时长进行调节,通过延长本地化等待时长,看看task的本地化级别有没有提升,并观察Spark作业的运行时间有没有缩短。
注意,过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark作业的运行时间反而增加了。
Spark本地化等待时长的设置如代码所示:

scala
val conf = new SparkConf().set("spark.locality.wait", "6")

2. 算子调优

2.1 mapPartitions

普通的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作。如果是普通的 map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每个元素进行操作。
如果是mapPartition算子,由于一个task处理一个RDD的partition,那么一个task只会执行一次function,function一次接收所有的partition数据,效率比较高。比如当要把RDD中的所有数据通过JDBC写入数据库,如果使用map算子,那么需要对RDD中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用mapPartitions算子,那么针对一个分区的数据,只需要建立一个数据库连接。
mapPartitions算子也存在一些缺点:对于普通的map操作,一次处理一条数据,如果在处理了2000条数据后内存不足,那么可以将已经处理完的2000条数据从内存中垃圾回收掉;但是如果使用mapPartitions算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出。 因此,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM) 在项目中,应该首先估算一下RDD的数据量、每个partition的数据量,以及分配给每个Executor的内存资源,如果资源允许,可以考虑使用mapPartitions算子代替map。

2.2 foreachPartition优化数据库操作

在生产环境中,通常使用foreachPartition算子来完成数据库的写入,通过foreachPartition算子的特性,可以优化写数据库的性能。如果使用foreach算子完成数据库的操作,由于foreach算子是遍历RDD的每条数据,因此每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用foreachPartition算子。 与mapPartitions算子非常相似,foreachPartition是将RDD的每个分区作为遍历对象,一次处理一个分区的数据,也就是说,如果涉及数据库的相关操作,一个分区的数据只需要创建一次数据库连接,如图所示:
Alt text 使用了foreachPartition算子后,可以获得以下的性能提升:
➢ 对于我们写的function函数,一次处理一整个分区的数据;
➢ 对于一个分区内的数据,创建唯一的数据库连接;
➢ 只需要向数据库发送一次SQL语句和多组参数;
在生产环境中,全部都会使用foreachPartition算子完成数据库操作。foreachPartition算子存在一个问题,与 mapPartitions算子类似,如果一个分区的数据量特别大,可能会造成OOM,即内存溢出。

2.3 filter与coalesce的配合使用

在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filter过滤后,每个分区的数据量有可能会存在较大差异,如图所示:
Alt text 根据图中信息我们可以发现两个问题:
➢ 每个partition的数据量变小了,如果还按照之前与partition相等的task个数去处理当前数据,有点浪费task的计算资源; ➢ 每个partition的数据量不一样,会导致后面的每个task处理每个partition数据的时候,每个task要处理的数据量不同,这很有可能导致数据倾斜问题。
如上图所示,第二个分区的数据过滤后只剩100条,而第三个分区的数据过滤后剩下800条,在相同的处理逻辑下,第二个分区对应的task处理的数据量与第三个分区对应的task处理的数据量差距达到了8倍,这也会导致运行速度可能存在数倍的差距,这也就是数据倾斜问题。
针对上述的两个问题,我们分别进行分析:

  • 针对第一个问题,既然分区的数据量变小了,我们希望可以对分区数据进行重新分配,避免了资源的浪费。
  • 针对第二个问题,解决方法和第一个问题的解决方法非常相似,对分区数据重新分配, 让每个partition(分区)中的数据量差不多,这就避免了数据倾斜问题。
    那么具体应该如何实现上面的解决思路?我们需要coalesce算子。 repartition与coalesce都可以用来进行重分区,其中repartition只是coalesce接口中shuffle为true的简易实现,coalesce默认情况下不进行shuffle,但是可以通过参数进行设置。假设我们希望将原本的分区个数A 通过重新分区变为B,那么有以下几种情况:
  1. A > B(多数分区合并为少数分区)
  • A与B相差值不大: 此时使用coalesce即可,无需shuffle过程。
  • A与B相差值很大: 此时可以使用coalesce并且不启用shuffle过程,但是会导致合并过程性能低下,所以推荐设置 coalesce的第二个参数为true,即启动shuffle过程。
  1. A < B(少数分区分解为多数分区)
    此时使用repartition即可,如果使用coalesce需要将shuffle设置为true,否则coalesce无效。我们可以在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition(分区)的数量,而且让每个partition(分区)的数据量尽量均匀紧凑,以便于后面的task(任务)进行计算操作,在某种程度上能够在一定程度上提升性能。
    注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。

2.4 repartition提高并行度

在第一节的常规性能调优中我们讲解了并行度的调节策略,但是,并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL以外的所有Spark的stage生效。 Spark SQL的并行度不允许用户自己指定,Spark SQL自己会默认根据hive表对应的HDFS文件的split个数自动设置 Spark SQL所在的那个stage的并行度,用户自己通过spark.default.parallelism参数指定的并行度,只会在没Spark SQL的stage中生效。 由于Spark SQL所在stage的并行度无法手动设置,如果数据量较大,并且此stage中后续的transformation操作有着复杂的业务逻辑,而Spark SQL自动设置的task数量很少,这就意味着每个task要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有Spark SQL的stage速度很慢,而后续的没有Spark SQL的stage运行 速度非常快。为了解决Spark SQL无法设置并行度和task数量的问题,我们可以使用repartition算子。
Alt text Spark SQL这一步的并行度和task数量肯定是没有办法去改变了,但是对于Spark SQL查询出来的RDD,立即使用 repartition算子,去重新进行分区,这样可以重新分区为多个partition,从repartition之后的RDD操作,由于不再设计Spark SQL,因此stage的并行度就会等于你手动设置的值,这样就避免了Spark SQL所在的stage只能用少量的task去处理大量数据并执行复杂的算法逻辑。

2.5 reduceByKey预聚合

reduceByKey相较于普通的shuffle操作一个显著的特点就是会进行map端的本地聚合,map端会先对本地的数据进行 combine 操作,然后将数据写入给下个stage的每个task创建的文件中,也就是在map端,对每一个key对应的value,执行reduceByKey算子函数。reduceByKey算子的执行过程如图所示:
Alt text 使用reduceByKey对性能的提升如下:
➢ 本地聚合后,在map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用;
➢ 本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量;
➢ 本地聚合后,在reduce端进行数据缓存的内存占用减少;
➢ 本地聚合后,在reduce端进行聚合的数据量减少。
基于reduceByKey的本地聚合特征,我们应该考虑使用reduceByKey代替其他的shuffle算子,例如groupByKey。reduceByKey与groupByKey的运行原理如图所示: Alt textAlt text 根据上图可知,groupByKey不会进行map端的聚合,而是将所有map端的数据shuffle到reduce端,然后在reduce端进行数据的聚合操作。由于reduceByKey有map端聚合的特性,使得网络传输的数据量减小,因此效率要明显高于 groupByKey。

3. Shuffle调优

3.1 调节map端缓冲区大小

在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现 map端缓冲数据频繁spill溢写到磁盘文件中的情况,使得性能非常低下,通过调节map端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能。
map端缓冲的默认配置是32KB,如果每个task处理640KB的数据,那么会发生640/32=20次溢写,如果每个task处理64000KB的数据,机会发生64000/32=2000此溢写,这对于性能的影响是非常严重的。 map端缓冲的配置方法如代码清单所示:

scala
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")

3.2 调节reduce端拉取数据缓冲区大小

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB,该参数的设置方法如代码清单所示:

scala
val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")