0%

spark问题-rdd分区2GB限制

spark在处理较大数据时,会遇到 Shuffle block 小于 2GB 的限制。一旦 Shuffle block 大于2GB,就会出现Size exceeds Integer.MAX_VALUE异常

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)
at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

解决方案

  1. 调大rdd分区数.

spark.read.format(“parquet”).option(“inferSchema”, “true”).load(app.inputDataPath).repartition(300)

  1. spark.default.parallelism

该参数只对 raw rdd有效。无法对 dataFrame 产生作用。

  1. spark.sql.shuffle.partitions

执行 join或者 aggregations 方法后,shuffle过程中的分区数

原因

Spark 使用 ByteBuffer 来存储 shuffle blocks。

然而,ByteBuffer 被限制为 Integer.MAX_SIZE (2GB)。

ByteBuffer.allocate(int capacity)

参考

rdd分区2GB限制
top 5 mistakes to avoid when writing spark
SparkSQL shuffle异常