大数据技术之Spark性能优化攻略

发布时间:2026/5/26 20:37:28

大数据技术之Spark性能优化攻略 一、常规性能调优1.1 最优资源配置Spark 性能调优的第一步就是为任务分配更多的资源。在一定范围内增加资源的分配与性能的提升是成正比的。标准的 Spark 任务提交脚本bin/spark-submit\--classcom.atguigu.spark.Analysis\--masteryarn\--deploy-mode cluster\--num-executors80\--driver-memory 6g\--executor-memory 6g\--executor-cores3\/usr/opt/modules/spark/jar/spark.jar可配置资源参数参数说明--num-executors配置 Executor 的数量--driver-memory配置 Driver 内存影响不大--executor-memory配置每个 Executor 的内存大小--executor-cores配置每个 Executor 的 CPU core 数量生产环境推荐配置bin/spark-submit\--classcom.atguigu.spark.WordCount\--masteryarn\--deploy-mode cluster\--num-executors80\--driver-memory 6g\--executor-memory 6g\--executor-cores3\--queueroot.default\--confspark.yarn.executor.memoryOverhead2048\--confspark.core.connection.ack.wait.timeout300\/path/to/your.jar参数参考值--num-executors50~100--driver-memory1G~5G--executor-memory6G~10G--executor-cores3--master实际生产环境一定使用 yarn增加资源带来的性能提升资源效果增加 Executor 个数提高执行 task 的并行度增加每个 Executor 的 CPU core 个数提高执行 task 的并行度增加每个 Executor 的内存量1. 缓存更多数据减少磁盘 IO2. 为 Shuffle 提供更多内存3. 避免频繁 GC1.2 RDD 优化1.2.1 RDD 复用在对 RDD 进行算子时要避免相同的算子和计算逻辑之下对 RDD 进行重复的计算。优化前RDD1 被重复计算两次分别生成 RDD2 和 RDD3优化后先计算 RDD2然后 RDD2 复用生成 RDD3 和 RDD41.2.2 RDD 持久化当多次对同一个 RDD 执行算子操作时每一次都会对这个 RDD 以之前的父 RDD 重新计算一次这是对资源的极大浪费。持久化策略对多次使用的 RDD 进行持久化将公共 RDD 的数据缓存到内存/磁盘中可以进行序列化减小数据体积可以使用副本机制提高数据可靠性1.2.3 尽早执行 filter 操作获取到初始 RDD 后应该考虑尽早地过滤掉不需要的数据进而减少对内存的占用提升 Spark 作业的运行效率。1.3 并行度调节Spark 作业中的并行度指各个 stage 的 task 的数量。如果并行度设置不合理而导致并行度过低会导致资源的极大浪费。Spark 官方推荐task 数量应该设置为 Spark 作业总 CPU core 数量的2~3 倍。valconfnewSparkConf().set(spark.default.parallelism,500)1.4 广播大变量默认情况下task 中的算子如果使用了外部变量每个 task 都会获取一份变量的复本造成内存的极大消耗。示例20 个 Executor500 个 task20M 的变量不使用广播500 个副本耗费集群10G内存使用广播每个 Executor 保存一个副本一共消耗400M内存内存消耗减少25 倍广播变量在每个 Executor 保存一个副本此 Executor 的所有 task 共用此广播变量。1.5 Kryo 序列化默认情况下Spark 使用 Java 的序列化机制。Kryo 序列化机制比 Java 序列化机制性能提高10 倍左右。从 Spark 2.0.0 版本开始简单类型、简单类型数组、字符串类型的 Shuffling RDDs 已经默认使用 Kryo 序列化方式。valconfnewSparkConf().set(spark.serializer,org.apache.spark.serializer.KryoSerializer).registerKryoClasses(Array(classOf[StartupReportLogs]))1.6 调节本地化等待时长Spark 希望 task 能够运行在它要计算的数据所在的节点数据本地化思想。如果目标节点资源用尽Spark 会等待一段时间默认 3s然后自动降级。本地化级别说明PROCESS_LOCAL进程本地化性能最好NODE_LOCAL节点本地化数据需要在进程间传输RACK_LOCAL机架本地化数据需要通过网络传输NO_PREF没有好坏之分ANY性能最差valconfnewSparkConf().set(spark.locality.wait,6)二、算子调优2.1 mapPartitions 替代 map普通的 map 算子对 RDD 中的每一个元素进行操作而 mapPartitions 算子对 RDD 中每一个分区进行操作。map 算子假设一个 partition 有 1 万条数据function 要执行 1 万次mapPartitions 算子一个 task 处理一个 RDD 的 partitionfunction 一次接收所有的 partition 数据适用场景将 RDD 中的所有数据通过 JDBC 写入数据库时使用 mapPartitions 可以每个分区只建立一个数据库连接注意数据量非常大时可能导致 OOM2.2 foreachPartition 优化数据库操作生产环境中通常使用 foreachPartition 算子来完成数据库的写入。foreach 算子每条数据都会建立一个数据库连接资源浪费严重foreachPartition 算子一个分区的数据只需要创建一次数据库连接rdd.foreachPartition(partitionOfRecords{valconnectioncreateNewConnection()partitionOfRecords.foreach(record{connection.write(record)})connection.close()})2.3 filter 与 coalesce 的配合使用filter 过滤后每个分区的数据量可能会存在较大差异导致数据倾斜问题。解决方案使用 coalesce 算子压缩 partition 数量让每个 partition 的数据量尽量均匀紧凑。场景操作A B多数分区合并为少数分区相差不大使用 coalesce无需 shuffleA B多数分区合并为少数分区相差很大coalesce 并启用 shuffleA B少数分区分解为多数分区使用 repartition2.4 repartition 解决 SparkSQL 低并行度问题Spark SQL 的并行度不允许用户自己指定而是根据 Hive 表对应的 HDFS 文件的 split 个数自动设置。解决方案对 Spark SQL 查询出来的 RDD立即使用 repartition 算子重新进行分区。valsqlRDDspark.sql(SELECT * FROM big_table).rddvalrepartitionedRDDsqlRDD.repartition(500)2.5 reduceByKey 预聚合reduceByKey 相较于普通的 shuffle 操作一个显著的特点就是会进行map 端的本地聚合。reduceByKey 原理map 端先对本地的数据进行 combine 操作然后将数据写入给下个 stage 的每个 task 创建的文件中。使用 reduceByKey 对性能的提升本地聚合后map 端的数据量变少减少了磁盘 IO下一个 stage 拉取的数据量变少减少了网络传输reduce 端进行数据缓存的内存占用减少reduce 端进行聚合的数据量减少reduceByKey vs groupByKeygroupByKey不会进行 map 端聚合将所有数据 shuffle 到 reduce 端再聚合reduceByKey有 map 端聚合特性网络传输数据量减小效率明显高于 groupByKey三、Shuffle 调优3.1 调节 map 端缓冲区大小如果 shuffle 的 map 端处理的数据量比较大但是 map 端缓冲的大小是固定的可能会出现 map 端缓冲数据频繁 spill 溢写到磁盘文件中的情况。默认配置32KBvalconfnewSparkConf().set(spark.shuffle.file.buffer,64)3.2 调节 reduce 端拉取数据缓冲区大小reduce 端数据拉取缓冲区的大小决定了 reduce task 每次能够缓冲的数据量。默认配置48MBvalconfnewSparkConf().set(spark.reducer.maxSizeInFlight,96)3.3 调节 reduce 端拉取数据重试次数对于包含了特别耗时的 shuffle 操作的作业建议增加重试最大次数以避免由于 JVM 的 full gc 或者网络不稳定等因素导致的数据拉取失败。默认配置3 次valconfnewSparkConf().set(spark.shuffle.io.maxRetries,6)3.4 调节 reduce 端拉取数据等待间隔一次失败后会等待一定的时间间隔再进行重试。可以通过加大间隔时长以增加 shuffle 操作的稳定性。默认配置5svalconfnewSparkConf().set(spark.shuffle.io.retryWait,60s)3.5 调节 SortShuffle 排序操作阈值对于 SortShuffleManager如果 shuffle reduce task 的数量小于某一阈值则 shuffle write 过程中不会进行排序操作。触发条件shuffle reduce task 数量 spark.shuffle.sort.bypassMergeThreshold默认 200不是聚合类的 shuffle 算子valconfnewSparkConf().set(spark.shuffle.sort.bypassMergeThreshold,400)四、JVM 调优4.1 降低 cache 操作的内存占比静态内存管理机制根据 Spark 静态内存管理机制堆内存被划分为了 Storage 和 Execution 两块Storage主要用于缓存 RDD 数据和 broadcast 数据占系统内存的 60%Execution主要用于缓存在 shuffle 过程中产生的中间数据占系统内存的 20%如果在某些情况下 cache 操作内存不是很紧张而 task 的算子中创建的对象很多Execution 内存又相对较小会导致频繁的 minor gc甚至于频繁的 full gc。valconfnewSparkConf().set(spark.storage.memoryFraction,0.5)统一内存管理机制Spark 统一内存管理机制下Storage 和 Execution 各占统一内存的 50%由于动态占用机制的实现shuffle 过程需要的内存过大时会自动占用 Storage 的内存区域因此无需手动进行调节。4.2 调节 Executor 堆外内存Executor 的堆外内存主要用于程序的共享库、Perm Space、线程 Stack 和一些 Memory mapping 等。当 Spark 作业处理的数据量非常大时可能会出现shuffle output file cannot find、executor lost、task lost、out of memory等错误。默认堆外内存上限大概为 300 多 MB生产环境建议调节到至少 1G甚至于 2G、4G--confspark.yarn.executor.memoryOverhead20484.3 调节连接等待时长如果 task 在运行过程中创建大量对象或者创建的对象较大会导致频繁的垃圾回收。垃圾回收会导致工作现场全部停止无法提供响应此时无法建立网络连接会导致网络连接超时。--confspark.core.connection.ack.wait.timeout300五、数据倾斜解决方案Spark 中的数据倾斜问题主要指 shuffle 过程中出现的数据倾斜问题是由于不同的 key 对应的数据量不同导致的不同 task 所处理的数据量不同的问题。数据倾斜的表现大部分 task 执行迅速只有有限的几个 task 执行非常慢大部分 task 执行迅速但有的 task 在运行过程中突然报出 OOM定位数据倾斜问题查阅代码中的 shuffle 算子reduceByKey、countByKey、groupByKey、join 等查看 Spark 作业的 log 文件根据异常定位代码位置5.1 解决方案一聚合原数据1) 避免 shuffle 过程如果 Spark 作业的数据来源于 Hive 表可以先在 Hive 表中对数据进行聚合例如按照 key 进行分组将同一 key 对应的所有 value 用一种特殊的格式拼接到一个字符串里去这样一个 key 就只有一条数据了。2) 缩小 key 粒度key 的数量增加可能使数据倾斜更严重。3) 增大 key 粒度将 key 的粒度扩大例如从(省, 城市, 区, 日期)扩大为(省, 城市, 日期)key 的数量会减少数据量差异也有可能会减少。5.2 解决方案二过滤导致倾斜的 key如果在 Spark 作业中允许丢弃某些数据那么可以考虑将可能导致数据倾斜的 key 进行过滤。5.3 解决方案三提高 shuffle 操作中的 reduce 并行度增加 reduce 端并行度就增加了 reduce 端 task 的数量那么每个 task 分配到的数据量就会相应减少。// 对 RDD 操作rdd.reduceByKey(500)// 对 Spark SQLspark.conf.set(spark.sql.shuffle.partitions,500)缺陷并没有从根本上改变数据倾斜的本质只是尽可能地去缓解和减轻 shuffle reduce task 的数据压力。5.4 解决方案四使用随机 key 实现双重聚合首先通过 map 算子给每个数据的 key 添加随机数前缀对 key 进行打散然后进行第一次聚合随后去除掉每个 key 的前缀再次进行聚合。适用场景由 groupByKey、reduceByKey 这类算子造成的数据倾斜不适用场景join 类的 shuffle 操作5.5 解决方案五将 reduce join 转换为 map join核心思路不使用 join 算子进行连接操作而使用 Broadcast 变量与 map 类算子实现 join 操作进而完全规避掉 shuffle 类的操作。将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来然后对其创建一个 Broadcast 变量接着对另外一个 RDD 执行 map 类算子在算子函数内从 Broadcast 变量中获取较小 RDD 的全量数据与当前 RDD 的每一条数据按照连接 key 进行比对。不适用场景如果两个 RDD 数据量都比较大将一个数据量比较大的 RDD 做成广播变量可能会造成内存溢出。5.6 解决方案六sample 采样对倾斜 key 单独进行 join当由单个 key 导致数据倾斜时可以将发生数据倾斜的 key 单独提取出来组成一个 RDD然后用这个原本会导致倾斜的 key 组成的 RDD 跟其他 RDD 单独 join。根据 Spark 的运行机制此 RDD 中的数据会在 shuffle 阶段被分散到多个 task 中去进行 join 操作。5.7 解决方案七使用随机数扩容进行 join如果在进行 join 操作时RDD 中有大量的 key 导致数据倾斜那么进行分拆 key 也没什么意义此时就只能对整个 RDD 进行数据扩容。核心思想选择一个 RDD使用 flatMap 进行扩容对每条数据的 key 添加数值前缀1~N将一条数据映射为多条数据扩容选择另外一个 RDD进行 map 映射操作每条数据的 key 都打上一个随机数作为前缀1~N 的随机数稀释将两个处理后的 RDD 进行 join 操作局限性如果两个 RDD 都很大将 RDD 进行 N 倍的扩容显然行不通只能缓解数据倾斜不能彻底解决六、故障排除6.1 控制 reduce 端缓冲大小以避免 OOM在 Shuffle 过程reduce 端 task 能够拉取多少数据由 reduce 拉取数据的缓冲区 buffer 来决定默认 48MB。如果 map 端的数据量非常大reduce 端的所有 task 在拉取时有可能全部达到缓冲的最大极限值再加上 reduce 端执行的聚合函数创建大量对象可能会导致内存溢出OOM。解决方案减小 reduce 端拉取数据缓冲区的大小例如减少为 12MB。valconfnewSparkConf().set(spark.reducer.maxSizeInFlight,12)6.2 JVM GC 导致的 shuffle 文件拉取失败有时会出现shuffle file not found的错误可能的原因是后面 stage 的 task 想要去上一个 stage 的 task 所在的 Executor 拉取数据结果对方正在执行 GC导致 Executor 内所有的工作现场全部停止。解决方案调整 reduce 端拉取数据重试次数和时间间隔。valconfnewSparkConf().set(spark.shuffle.io.maxRetries,60).set(spark.shuffle.io.retryWait,60s)6.3 解决各种序列化导致的报错当报错信息中含有Serializable等类似词汇可能是序列化问题导致的报错。注意三点作为 RDD 的元素类型的自定义类必须是可以序列化的算子函数里可以使用的外部的自定义变量必须是可以序列化的不可以在 RDD 的元素类型、算子函数里使用第三方的不支持序列化的类型例如 Connection6.4 解决算子函数返回 NULL 导致的问题在算子函数里如果直接返回 NULL会报错例如 Scala.Math(NULL) 异常。解决方案返回特殊值不返回 NULL例如 “-1”对 RDD 执行 filter 操作将数值为 -1 的数据过滤掉使用完 filter 算子后继续调用 coalesce 算子进行优化6.5 解决 YARN-Client 模式导致的网卡流量激增问题在 YARN-client 模式下Driver 启动在本地机器上负责所有的任务调度需要与 YARN 集群上的多个 Executor 进行频繁的通信。假设有 100 个 Executor1000 个 taskDriver 要频繁地跟 Executor 上运行的 1000 个 task 进行通信导致本地机器的网卡流量会激增。解决方案生产环境下使用YARN-cluster 模式。6.6 解决 YARN-Cluster 模式的 JVM 栈内存溢出当 Spark 作业中包含 SparkSQL 的内容时可能会碰到 YARN-client 模式下可以运行但是 YARN-cluster 模式下无法提交运行报出 OOM 错误的情况。原因YARN-cluster 模式下Driver 运行在 YARN 集群的某个节点上使用的是没有经过配置的默认设置PermGen 永久代大小为 82MB。SparkSQL 的内部要进行很复杂的 SQL 的语义解析、语法树转换等对 PermGen 的占用会比较大。解决方案增加 PermGen 的容量。--confspark.driver.extraJavaOptions-XX:PermSize128M -XX:MaxPermSize256M6.7 解决 SparkSQL 导致的 JVM 栈内存溢出当 SparkSQL 的 SQL 语句有成百上千的or关键字时就可能会出现 Driver 端的 JVM 栈内存溢出。原因SparkSQL 有大量or语句的时候在解析 SQL 时对于or的处理是递归or非常多时会发生大量的递归超出了 JVM 栈深度限制。解决方案将一条 SQL 语句拆分为多条 SQL 语句来执行每条 SQL 语句尽量保证 100 个以内的子句。6.8 持久化与 checkpoint 的使用Spark 持久化在大部分情况下是没有问题的但是有时数据可能会丢失。为了避免数据的丢失可以选择对这个 RDD 进行 checkpoint也就是将数据持久化一份到容错的文件系统上比如 HDFS。一个 RDD 缓存并 checkpoint 后如果一旦发现缓存丢失就会优先查看 checkpoint 数据存不存在如果有就会使用 checkpoint 数据而不用重新计算。优点提高了 Spark 作业的可靠性一旦缓存出现问题不需要重新计算数据缺点checkpoint 时需要将数据写入 HDFS 等文件系统对性能的消耗较大七、总结优化维度核心策略资源调优合理配置 Executor 数量、内存、CPU coreRDD 优化RDD 复用、持久化、尽早 filter算子优化mapPartitions、foreachPartition、reduceByKey 替代 groupByKeyShuffle 调优调节缓冲区大小、重试次数、等待间隔JVM 调优降低 cache 内存占比、调节堆外内存、连接等待时长数据倾斜聚合原数据、过滤倾斜 key、随机 key 双重聚合、map join故障排除控制 reduce 缓冲、调节 GC 参数、避免返回 NULL、使用 cluster 模式Spark 性能优化是一个系统工程需要从资源、代码、配置多个层面综合考虑。在实际生产环境中建议遵循以下原则先保证任务能够运行再考虑性能的优化遇到 OOM 先减小内存占用而非增大内存数据倾斜是性能杀手需要优先解决生产环境一定使用 YARN-cluster 模式合理使用 checkpoint 作为 cache 的保障机制

相关新闻