overridedefgetBlockData(blockId: ShuffleBlockId): ManagedBuffer = { val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
val channel = Files.newByteChannel(indexFile.toPath) // 根据reduceId选择索引 channel.position(blockId.reduceId * 8L) val in = newDataInputStream(Channels.newInputStream(channel)) try { val offset = in.readLong() val nextOffset = in.readLong() val actualPosition = channel.position() val expectedPosition = blockId.reduceId * 8L + 16 if (actualPosition != expectedPosition) { thrownewException(s"SPARK-22982: Incorrect channel position after index file reads: " + s"expected $expectedPosition but actual position was $actualPosition.") } newFileSegmentManagedBuffer( transportConf, getDataFile(blockId.shuffleId, blockId.mapId), offset, nextOffset - offset) } finally { in.close() } }
反序列化出实例,并生成 k,v 迭代器
val serializerInstance = dep.serializer.newInstance()
// Create a key/value iterator for each stream val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) => serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator }
添加readMetrics,再封装成InterruptibleIterator
readMetrics 里都是些记录数据,用于监控展示
InterruptibleIterator 封装了任务中断功能
// Update the context task metrics for each record read. val readMetrics = context.taskMetrics.createTempShuffleReadMetrics() val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( recordIter.map { record => readMetrics.incRecordsRead(1) // sparkUI 里的 record 数 record }, context.taskMetrics().mergeShuffleReadMetrics())
val interruptibleIter = newInterruptibleIterator[(Any, Any)](context, metricIter)
进行聚合操作
分有无聚合,当有聚合时,又分是否在 mapTask 时执行过map聚合
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { // We are reading values that are already combined // 有 mapSideCombine,就是聚合(K, C) val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]] dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context) } else { // 无 mapSideCombine,就是聚合(K, V) val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]] dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) } } else { // 不聚合 interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] }
进行排序操作
分是否需要排序。如果需要,就使用ExternalSorter在分区内部进行排序
val resultIter = dep.keyOrdering match { caseSome(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. val sorter = newExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) // Use completion callback to stop sorter if task was finished/cancelled. context.addTaskCompletionListener[Unit](_ => { sorter.stop() }) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) caseNone => aggregatedIter }