Apache Spark 第 6 章 附加篇:Tungsten 引擎深度解析

发布时间:2026/6/26 9:26:31

Apache Spark 第 6 章 附加篇:Tungsten 引擎深度解析 摘要Tungsten 是 Apache Spark 于 2015 年发布的底层执行引擎旨在突破 JVM 的内存管理与 CPU 效率瓶颈通过堆外内存、缓存感知计算、向量化执行和 WholeStage CodeGen 四大机制将 Spark 的执行性能提升了数倍乃至数十倍。本文从背景出发逐层剖析各项技术的实现原理并提供新老方案的详细对比。目录背景JVM 的天花板Tungsten 整体架构核心机制一堆外内存与 UnsafeRow核心机制二缓存感知计算核心机制三向量化执行与 SIMD核心机制四WholeStage CodeGen新老方案对比实际使用效果与调优指南总结背景JVM 的天花板Spark 早期构建在 JVM 之上这带来了跨平台能力和丰富的生态却也继承了 JVM 在大数据场景下的三个根本性瓶颈① 内存效率低下Java 对象模型为每个对象附加了 12~16 字节的对象头Mark Word 类指针一个仅包含两个int字段的对象实际需要占用 24 字节而其真实数据只有 8 字节。在处理数十亿行数据时这种膨胀意味着大量内存被元数据消耗。② GC 停顿不可控Spark 作业大量创建和销毁 Java 对象频繁触发 JVM 的垃圾回收。Full GC 时整个 JVM 暂停Stop-The-World在生产环境中常见秒级甚至十几秒的 GC 停顿严重影响作业稳定性。③ CPU 利用率不足传统的火山模型Volcano Model每次从下层算子拉取一行数据产生大量虚函数调用CPU 分支预测器无法有效工作流水线频繁被清空数据以行式存储无法利用 CPU 的 SIMD 向量化指令批量处理。Tungsten 项目正是为了系统性地解决这三个问题而生。Tungsten 整体架构Tungsten 的设计哲学是让 Spark 贴近硬件而不是依赖 JVM 这个中间层。它的四大机制分别从内存管理、数据访问模式、指令级并行和代码结构四个维度系统性地消除 JVM 带来的性能损耗。核心机制一堆外内存与 UnsafeRow传统 JVM 对象模型的问题UnsafeRow 的内存格式UnsafeRow 是一块连续的堆外或堆内内存区域其格式严格定义为三部分区域大小说明Null 位图ceil(N/64) × 8字节每个 bit 表示对应字段是否为 null固定宽度字段区N × 8字节所有字段统一占 8 字节int/long/double 直接存值string/array 存偏移量长度变长字段区按需字符串、数组等变长数据紧跟其后这种布局的核心优势是随机访问任意字段的时间复杂度为 O(1)只需一次指针加法计算偏移量即可完全不依赖 JVM 的对象图遍历。堆外内存的配置# 启用堆外内存默认关闭sparkSparkSession.builder \.config(spark.memory.offHeap.enabled,true)\.config(spark.memory.offHeap.size,4g)\.getOrCreate()堆外内存完全由 Tungsten 的MemoryManager管理通过sun.misc.Unsafe的allocateMemory/freeMemory直接申请和释放操作系统内存JVM GC 无法感知也不会对其进行扫描和回收。核心机制二缓存感知计算CPU Cache 与内存访问延迟现代 CPU 的性能不仅仅取决于时钟频率数据访问模式同样关键。如果程序的内存访问是随机的CPU 需要频繁从主内存加载数据带来大量缓存缺失Cache Miss性能急剧下降。缓存友好的排序算法传统 Spark 在排序时需要移动完整的行数据Row Object这些对象分散在堆的各处排序过程会产生大量随机内存访问。Tungsten 的策略是只排序指针传统方式移动完整行 ┌──────────────────────────────────────────────────┐ │ [Row A: 200B] → swap → [Row B: 200B] → swap ... │ 大量内存移动 └──────────────────────────────────────────────────┘ Tungsten 方式排序指针 key ┌──────────────────────────────────────────────┐ │ [(ptr_A, key_A), (ptr_B, key_B), ...] │ 64B 指针数组完整放入 L1 Cache └──────────────────────────────────────────────┘ 排序完成后按指针顺序顺序读取完整行 → 顺序访问CPU prefetch 全部命中Tungsten 还将 QuickSort 替换为RadixSort基数排序基数排序的访问模式是顺序的非常适合缓存感知场景在整数类型的 key 上比 QuickSort 快 3~5 倍。核心机制三向量化执行与 SIMD逐行处理 vs 向量化批处理SIMDSingle Instruction Multiple Data单指令多数据是现代 CPU 的核心特性。Intel AVX2 寄存器宽 256 位可同时处理 8 个 32 位整数AVX-512 宽 512 位可处理 16 个。列式存储使同一列的数据在内存中连续排列天然与向量寄存器对齐是实现 SIMD 的前提。Tungsten 的ColumnarBatch格式将数据以列式内存布局组织每列是一个连续的原始数组配合 JVM 的 JIT 编译器生成 SIMD 指令实现向量化的 filter、scan、hash 操作。核心机制四WholeStage CodeGen这是 Tungsten 中影响最深远的机制也是性能提升最显著的部分。火山模型的根本问题CodeGen 的编译流程WholeStage CodeGen 使用Janino一个轻量级的 Java 编译器在运行时动态生成并编译 Java 代码。生成的代码经过 JVM 的 JITJust-In-Time编译器进行二次优化最终生成包含 SIMD 指令的本地机器码。整个编译过程在每个 Stage 开始执行前触发一次编译耗时通常在 100ms 以内对长时间运行的作业几乎没有影响。识别 CodeGen 是否生效-- 使用 explain 查看执行计划df.explain(formatted)输出示例 Physical Plan *(1) HashAggregate(keys[dept#1], functions[avg(salary#2)]) ← *(1) 表示 CodeGen 已启用 - *(1) Filter (age#3 30) - *(1) Project [dept#1, salary#2, age#3] - *(1) FileScan parquet [dept#1,salary#2,age#3] Exchange hashpartitioning(dept#1, 200) ← Shuffle 边界CodeGen 断开 *(2) HashAggregate(keys[dept#1], functions[partial_avg(...)] ← 新的 CodeGen Stage*(N)星号前缀表示该算子已被 CodeGen 融合数字相同表示属于同一个融合块。ExchangeShuffle是天然的 CodeGen 边界下一个 Stage 重新开始编译。新老方案对比实际使用效果与调优指南验证 Tungsten 是否启用# 默认全部开启可通过以下方式验证spark.conf.get(spark.sql.tungsten.enabled)# 已废弃默认 truespark.conf.get(spark.sql.codegen.wholeStage)# WholeStage CodeGenspark.conf.get(spark.sql.codegen.maxFields)# 最大列数限制默认 100spark.conf.get(spark.memory.offHeap.enabled)# 堆外内存# 查看 CodeGen 生成的代码调试用spark.conf.set(spark.sql.codegen.comments,true)spark.conf.set(spark.sql.codegen.logging.maxLines,1000)影响 Tungsten 效果的常见场景场景一宽表导致 CodeGen 降级# 列数超过 codegen.maxFields默认 100时 CodeGen 自动关闭wide_dfspark.read.parquet(wide_table_with_200_cols/)# 解决方案只选择需要的列narrow_dfwide_df.select(col1,col2,col3,col4,col5)# 或调大阈值需权衡生成代码的大小spark.conf.set(spark.sql.codegen.maxFields,200)场景二Python UDF 打断 CodeGen 链frompyspark.sql.functionsimportudffrompyspark.sql.typesimportDoubleType# 不推荐Python UDF 跨语言调用CodeGen 完全失效udf(returnTypeDoubleType())defcalc_tax_python(salary):returnsalary*0.2df.withColumn(tax,calc_tax_python(salary))# 无法 CodeGen# 推荐方案一改用内置函数完整 CodeGenfrompyspark.sql.functionsimportcol df.withColumn(tax,col(salary)*0.2)# 完整 CodeGen ✓# 推荐方案二Pandas UDFArrow 列式传输批量处理frompyspark.sql.functionsimportpandas_udfimportpandasaspdpandas_udf(DoubleType())defcalc_tax_pandas(salary:pd.Series)-pd.Series:returnsalary*0.2df.withColumn(tax,calc_tax_pandas(salary))# 批量传输效率高 ✓场景三堆外内存减少 GC 停顿# 适用于大量 cache() / Shuffle 操作GC 停顿频繁的场景sparkSparkSession.builder \.config(spark.executor.memory,8g)\.config(spark.memory.offHeap.enabled,true)\.config(spark.memory.offHeap.size,4g)\.getOrCreate()# 监控 GC 停顿在 Spark UI 的 Executor 页面查看 GC Time 列# 如果 GC Time / Task Time 10%考虑开启堆外内存性能调优决策流程关键配置速查# ── WholeStage CodeGen ──spark.conf.set(spark.sql.codegen.wholeStage,true)# 默认 truespark.conf.set(spark.sql.codegen.maxFields,100)# 最大列数超出自动关闭spark.conf.set(spark.sql.codegen.fallback.enabled,true)# 失败时优雅降级# ── 堆外内存 ──spark.conf.set(spark.memory.offHeap.enabled,true)spark.conf.set(spark.memory.offHeap.size,4g)# 按 executor 内存的 50% 设置# ── 向量化读取Parquet / ORC──spark.conf.set(spark.sql.parquet.enableVectorizedReader,true)# 默认 truespark.conf.set(spark.sql.orc.enableVectorizedReader,true)# 默认 truespark.conf.set(spark.sql.columnVector.offheap.enabled,true)# 列向量也用堆外# ── AQE配合 Tungsten 效果更好──spark.conf.set(spark.sql.adaptive.enabled,true)# Spark 3.0 默认 truespark.conf.set(spark.sql.adaptive.advisoryPartitionSizeInBytes,64mb)总结Tungsten 本质上是一次从上到下的硬件感知重构机制解决的问题性能收益UnsafeRow 堆外内存JVM 对象膨胀 GC 停顿内存节省 50~70%GC 从秒级降至毫秒级缓存感知计算CPU Cache Miss 频繁排序性能提升 3~5xSIMD 向量化CPU 利用率低逐行处理扫描 / Filter 吞吐提升 2~8xWholeStage CodeGen火山模型虚调用开销聚合类操作提升 10x对使用者而言这四个机制默认全部开启无需手动干预。需要关注的场景是列数超过 100导致 CodeGen 降级Python UDF打断执行链以及 GC 停顿影响稳定性时考虑开启堆外内存。df.explain(formatted)中的*(N)星号是判断 Tungsten 是否充分发挥作用最直接的指标。参考资料Tungsten: Bringing Apache Spark Closer to Bare MetalDatabricks BlogApache Spark 源码sql/core/src/main/scala/org/apache/spark/sql/execution/Apache Spark 4.1 官方文档https://spark.apache.org/docs/latest/sql-performance-tuning.html本文基于 Apache Spark 3.x / 4.x部分实现细节因版本而异。

相关新闻