0%

spark源码-shuffle之BypassMergeSortShuffleWriter

spark 三大ShuffleWriter 之 BypassMergeSortShuffleWriter

特点

  • 只适用不需要 map-side aggregation的Shuffle操作,且Reducer任务数量比较少(默认200)

  • 数据是直接写入文件,数据量较大的时候,网络I/O和内存负担较重

  • 写分区文件时开了多个DiskBlockObjectWriter,对内存消耗比较大

大致流程

  1. 为每个分区都创建一个DiskBlockObjectWriter

  2. 遍历数据,使用DiskBlockObjectWriterwrite方法将数据写入到不同分区文件中

  3. 刷写分区文件到磁盘

  4. 合并分区文件成一个大文件,并返回记录每个分区文件的长度的数组

  5. 根据分区长度数组生成索引文件

  6. 封装信息到MapStatus返回

总的来说就是生成了一个由分区文件合并形成的大文件和一个索引文件(上面流程是主要流程,重命名什么的就不写了)。

BypassMergeSortShuffleWriter.jpg

源码

write(…)

public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
// partitionWriter 数组:分区号即是数组偏移量。它们将数据按分区号分别写入不同的个文件,有多少个分区就形成多少个文件
// 这里的 numPartitions 是分区后的数量
partitionWriters = new DiskBlockObjectWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
// 按分区器的规则写入数据
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}

for (int i = 0; i < numPartitions; i++) {
final DiskBlockObjectWriter writer = partitionWriters[i];
// 刷写数据到磁盘
partitionWriterSegments[i] = writer.commitAndGet();
writer.close();
}

// 文件名:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
try {
// 合并生成的分区文件,并返回每个分区文件的大小,用于计算偏移量
partitionLengths = writePartitionedFile(tmp);
// 生成索引文件
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
// 这里可以删除是因为 writeIndexFileAndCommit 中重命名了它
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

writePartitionedFile(…)

合并生成的分区文件,并返回每个分区文件的大小,用于计算偏移量

private long[] writePartitionedFile(File outputFile) throws IOException {
// Track location of the partition starts in the output file
// lengths数组:记录每个分区文件的大小
final long[] lengths = new long[numPartitions];
if (partitionWriters == null) {
// We were passed an empty iterator
return lengths;
}

// 合并文件
final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
if (file.exists()) {
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
try {
// 把分区文件 拷贝到 合并文件 中,存入文件大小到lengths数组中
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
// 删除分区文件
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
}
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
}

writeIndexFileAndCommit(…)

生成索引文件

def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
lengths: Array[Long],
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// Another attempt for the same task has already written our map outputs successfully,
// so just use the existing partition lengths and delete our temporary map outputs.
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
// 索引其实就是按 分区文件大小 叠上去而已
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}

if (indexFile.exists()) {
indexFile.delete()
}
if (dataFile.exists()) {
dataFile.delete()
}
// 重命名 indexTmp
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
// 重命名 dataTmp
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
}
}
}
} finally {
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
}
}
}