0%

spark源码-shuffle之ShuffleManager

spark shuffle 学习起始篇,涉及源码中shuffle的相关概念 加 SortShuffleManager

Spark 里的 Shuffle

shuffle 主要分为:

  • ShuffleWrite 阶段:上一个stage 的尾任务ShuffleMapTask 把数据写入磁盘,也叫ShuffleMap
  • ShuffleRead 阶段: 下一个stage 拉取数据,也叫ShuffleReduce

源码中的一些概念:

  • 如果把 spark 整个流程看成一辆火车,那么除了最后一节是ResultStage ,其它每一节车厢就是一个ShuffleMapStage ,连接车厢的部分就是shuffle

  • 车厢头进行 read,车厢尾进行 write。很容易理解ShuffleMapStage需要读前一个stage内容,也需要把输出写入下一个stage;而ResultStage只需要读。这些可以在源码中发现。

  • ShuffleMapStage对应 ShuffleMapTaskResultStage 对应 ResultTask

  • read 实质是ShuffleReader里的 read()方法;write 是实质是ShuffleWriter里的 write()方法。

  • ShuffleReaderShuffleWriter 这两大组件都由ShuffleManager进行选择。

好了,脑子里有了这些概念,就可以对这3个模块进行仔细研究了。

SortShuffleManager

由于 Spark 2.0以后,ShuffleManager只提供一种实现:SortShuffleManager,因此只深入研究它。

以下是ShuffleMapTask中的写操作。可以发现 shuffleManagerSparkEnv 中的属性。

// 从 SparkEnv 中 得到 shuffleManager
val manager = SparkEnv.get.shuffleManager
// 从 shuffleManager 中得到 ShuffleWriter
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// 执行 ShuffleWriter 里的 write 方法
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
// 成功写入,收尾工作
writer.stop(success = true).get

这里事先预告下,有3种ShuffleWriter,1种ShuffleReader

那么如何选择呢?注意上面,它取决于dep.shuffleHandle,而它来自shuffleManagerregisterShuffle()方法:

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)

好的,让我们进入SortShuffleManager中一探究竟。

registerShuffle(…)

其实就是选择 handle 的过程

  1. 如果 不需要map端的聚合操作shuffle 后的分区数量小于等于200spark.shuffle.sort.bypassMergeThreshold),就选择 BypassMergeSortShuffleHandle。否则进入第二步

  2. 如果 序列化器支持重定位不需要map端聚合shuffle 后的分区数目小于等于2^24),就选择 SerializedShuffleHandle。否则进入第三步

  3. 选择 BaseShuffleHandle

override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}

getWriter(…)

根据上面得到的handle,进行模式匹配选择ShuffleWriter,有3种:

BypassMergeSortHandle --> BypassMergeSortShuffleWriter

SerializedShuffleHandle --> UnsafeShuffleWriter

other(BaseShuffleHandle) --> SortShuffleWriter

override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}

getReader(…)

只有一种ShuffleReaderBlockStoreShuffleReader

override def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
new BlockStoreShuffleReader(
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}