public void write(Iterator<Product2<K, V>> records) throwsIOException { assert (partitionWriters == null); if (!records.hasNext()) { partitionLengths = new long[numPartitions]; shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); return; } finalSerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); // partitionWriter 数组:分区号即是数组偏移量。它们将数据按分区号分别写入不同的个文件,有多少个分区就形成多少个文件 // 这里的 numPartitions 是分区后的数量 partitionWriters = newDiskBlockObjectWriter[numPartitions]; partitionWriterSegments = newFileSegment[numPartitions]; for (int i = 0; i < numPartitions; i++) { finalTuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); finalFile file = tempShuffleBlockIdPlusFile._2(); finalBlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be // included in the shuffle write time. writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
while (records.hasNext()) { finalProduct2<K, V> record = records.next(); finalK key = record._1(); // 按分区器的规则写入数据 partitionWriters[partitioner.getPartition(key)].write(key, record._2()); }
for (int i = 0; i < numPartitions; i++) { finalDiskBlockObjectWriter writer = partitionWriters[i]; // 刷写数据到磁盘 partitionWriterSegments[i] = writer.commitAndGet(); writer.close(); }
private long[] writePartitionedFile(File outputFile) throwsIOException { // Track location of the partition starts in the output file // lengths数组:记录每个分区文件的大小 final long[] lengths = new long[numPartitions]; if (partitionWriters == null) { // We were passed an empty iterator return lengths; }
// 合并文件 finalFileOutputStream out = newFileOutputStream(outputFile, true); final long writeStartTime = System.nanoTime(); boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { finalFile file = partitionWriterSegments[i].file(); if (file.exists()) { finalFileInputStream in = newFileInputStream(file); boolean copyThrewException = true; try { // 把分区文件 拷贝到 合并文件 中,存入文件大小到lengths数组中 lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); copyThrewException = false; } finally { Closeables.close(in, copyThrewException); } // 删除分区文件 if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); } } } threwException = false; } finally { Closeables.close(out, threwException); writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters = null; return lengths; }
writeIndexFileAndCommit(…)
生成索引文件
defwriteIndexFileAndCommit( shuffleId: Int, mapId: Int, lengths: Array[Long], dataTmp: File): Unit = { val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) try { val dataFile = getDataFile(shuffleId, mapId) // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure // the following check and rename are atomic. synchronized { val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) if (existingLengths != null) { // Another attempt for the same task has already written our map outputs successfully, // so just use the existing partition lengths and delete our temporary map outputs. System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) if (dataTmp != null && dataTmp.exists()) { dataTmp.delete() } } else { // This is the first successful attempt in writing the map outputs for this task, // so override any existing index and data files with the ones we wrote. val out = newDataOutputStream(newBufferedOutputStream(newFileOutputStream(indexTmp))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. // 索引其实就是按 分区文件大小 叠上去而已 var offset = 0L out.writeLong(offset) for (length <- lengths) { offset += length out.writeLong(offset) } } { out.close() }
if (indexFile.exists()) { indexFile.delete() } if (dataFile.exists()) { dataFile.delete() } // 重命名 indexTmp if (!indexTmp.renameTo(indexFile)) { thrownewIOException("fail to rename file " + indexTmp + " to " + indexFile) } // 重命名 dataTmp if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { thrownewIOException("fail to rename file " + dataTmp + " to " + dataFile) } } } } finally { if (indexTmp.exists() && !indexTmp.delete()) { logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}") } } }