使用 textFile 读取HDFS的数据分区规则
跟着源码走
测试文件:大小 516.06 MB ,54个 block,blockSize 大小是128M,但每个 block 里面的数据只有10M 左右
1. 进入 sc.textFile()
val textFile: RDD[String] = sc.textFile("hdfs://xxxx")
def textFile( path: String, minPartitions: Int = defaultMinPartitions)
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
scheduler.conf.getInt("spark.default.parallelism", totalCores)
|
2. 创建 HadoopRDD
new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
|
3. 每个RDD 都有一个 getPartitions
函数,由它得到分区号
override def getPartitions: Array[Partition] = { ... try { val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) ... val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } ... }
|
4. 采用FileInputFormat
里的 getSplits()
划分分区,先计算 splitSize
getPartitions
的 核心是 getSplits()
,下面是计算分区关键步骤
long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
long blockSize = file.getBlockSize();
long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
|
-
假设文件大小为 20M: splitSize = max(1,min(10,128)) = 10M
-
假设文件大小为 516M:splitSize = max(1, min(258,128)) = 128M
(本文)
protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }
|
5. 最后,按 splitSize 切分区
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); } if (bytesRemaining != 0L) { splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } }
|
6 分区结果
每块128M,最后一块略大,符合预期。
hdfs://xxxx:0+134217728 hdfs://xxxx:134217728+134217728 hdfs://xxxx:268435456+134217728 hdfs://xxxx:402653184+138476793
|
小结
-
这里比较奇葩的是 minPartitions 这个设定,它最大只能是2。我觉得之所以这样设定,是防止文件切的过小。假设整个文件大小只有5M,公式:Math.min(goalSize, blockSize)
blockSize假定128M,此时 splitSize 由 minPartitions 决定(不考虑人为设定的那个minSize)。那么它最多只能被切成2份。
-
当文件较大时(大于blockSize两倍),只和 blockSize 有关。尽管我的测试文件中每个 block 实际大小只有10M,然鹅这个并没有什么软用。
-
这是单文件情况,如果是读一个目录下的多文件,那就是单独对每个文件进行切分。(从源码可以发现,其中的 totalSize 是所有文件大小总和)。
-
当然这只是分区划分,实际读取数据没这么简单。假如我们是一条一条读,那么如果该分区最后一条数据没读完,它会接着向下一块继续读,参考它。