0%

spark杂谈-使用textFile读取HDFS的分区规则

使用 textFile 读取HDFS的数据分区规则

跟着源码走

测试文件:大小 516.06 MB ,54个 block,blockSize 大小是128M,但每个 block 里面的数据只有10M 左右

1. 进入 sc.textFile()

val textFile: RDD[String] = sc.textFile("hdfs://xxxx")

// textFile() 有个默认值:minPartitions
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions)

// 它是取 defaultParallelism 和 2 的最小值
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

// 这个 defaultParallelism 不指定就是 totalCores,我这里是4
scheduler.conf.getInt("spark.default.parallelism", totalCores)

// 所以 defaultMinPartitions 最终为2

2. 创建 HadoopRDD

new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}

3. 每个RDD 都有一个 getPartitions 函数,由它得到分区号

override def getPartitions: Array[Partition] = {
...
try {
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
...
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
...
}

4. 采用FileInputFormat 里的 getSplits() 划分分区,先计算 splitSize

getPartitions 的 核心是 getSplits(),下面是计算分区关键步骤

// 总大小除2,为258M
long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);

// 这是人为设定的分区最小值,这个很好理解
long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);

// HDFS 文件的块大小,128M
long blockSize = file.getBlockSize();

// 计算 splitSize
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
  • 假设文件大小为 20M: splitSize = max(1,min(10,128)) = 10M

  • 假设文件大小为 516M:splitSize = max(1, min(258,128)) = 128M (本文)

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}

5. 最后,按 splitSize 切分区

// 可以发现为了防止最后一个分区过小的问题,引入了数字 1.1,保证最后一个分区的大小大于 splitSize  的 10%
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
}
if (bytesRemaining != 0L) {
splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
}
}

6 分区结果

每块128M,最后一块略大,符合预期。

hdfs://xxxx:0+134217728
hdfs://xxxx:134217728+134217728
hdfs://xxxx:268435456+134217728
hdfs://xxxx:402653184+138476793

小结

  • 这里比较奇葩的是 minPartitions 这个设定,它最大只能是2。我觉得之所以这样设定,是防止文件切的过小。假设整个文件大小只有5M,公式:Math.min(goalSize, blockSize) blockSize假定128M,此时 splitSize 由 minPartitions 决定(不考虑人为设定的那个minSize)。那么它最多只能被切成2份。

  • 当文件较大时(大于blockSize两倍),只和 blockSize 有关。尽管我的测试文件中每个 block 实际大小只有10M,然鹅这个并没有什么软用。

  • 这是单文件情况,如果是读一个目录下的多文件,那就是单独对每个文件进行切分。(从源码可以发现,其中的 totalSize 是所有文件大小总和)。

  • 当然这只是分区划分,实际读取数据没这么简单。假如我们是一条一条读,那么如果该分区最后一条数据没读完,它会接着向下一块继续读,参考