spark shuffle 学习起始篇,涉及源码中shuffle的相关概念 加 SortShuffleManager
Spark 里的 Shuffle
shuffle 主要分为:
- ShuffleWrite 阶段:上一个stage 的尾任务
ShuffleMapTask
把数据写入磁盘,也叫ShuffleMap
- ShuffleRead 阶段: 下一个stage 拉取数据,也叫
ShuffleReduce
源码中的一些概念:
-
如果把 spark 整个流程看成一辆火车,那么除了最后一节是ResultStage
,其它每一节车厢就是一个ShuffleMapStage
,连接车厢的部分就是shuffle
。
-
车厢头进行 read,车厢尾进行 write。很容易理解ShuffleMapStage
需要读前一个stage内容,也需要把输出写入下一个stage;而ResultStage
只需要读。这些可以在源码中发现。
-
ShuffleMapStage
对应 ShuffleMapTask
,ResultStage
对应 ResultTask
。
-
read 实质是ShuffleReader
里的 read()
方法;write 是实质是ShuffleWriter
里的 write()
方法。
-
ShuffleReader
和 ShuffleWriter
这两大组件都由ShuffleManager
进行选择。
好了,脑子里有了这些概念,就可以对这3个模块进行仔细研究了。
SortShuffleManager
由于 Spark 2.0以后,ShuffleManager
只提供一种实现:SortShuffleManager
,因此只深入研究它。
以下是ShuffleMapTask
中的写操作。可以发现 shuffleManager
是 SparkEnv
中的属性。
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
|
这里事先预告下,有3种ShuffleWriter
,1种ShuffleReader
。
那么如何选择呢?注意上面,它取决于dep.shuffleHandle
,而它来自shuffleManager
的registerShuffle()
方法:
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.length, this)
|
好的,让我们进入SortShuffleManager
中一探究竟。
registerShuffle(…)
其实就是选择 handle
的过程
-
如果 不需要map端的聚合操作 且 shuffle 后的分区数量小于等于200(spark.shuffle.sort.bypassMergeThreshold
),就选择 BypassMergeSortShuffleHandle
。否则进入第二步
-
如果 序列化器支持重定位 且 不需要map端聚合 且 shuffle 后的分区数目小于等于2^24),就选择 SerializedShuffleHandle
。否则进入第三步
-
选择 BaseShuffleHandle
override def registerShuffle[K, V, C]( shuffleId: Int, numMaps: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { new BypassMergeSortShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else { new BaseShuffleHandle(shuffleId, numMaps, dependency) } }
|
getWriter(…)
根据上面得到的handle
,进行模式匹配选择ShuffleWriter
,有3种:
BypassMergeSortHandle
--> BypassMergeSortShuffleWriter
SerializedShuffleHandle
--> UnsafeShuffleWriter
other(BaseShuffleHandle)
--> SortShuffleWriter
override def getWriter[K, V]( handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] = { numMapsForShuffle.putIfAbsent( handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) val env = SparkEnv.get handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], bypassMergeSortHandle, mapId, context, env.conf) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) } }
|
getReader(…)
只有一种ShuffleReader
:BlockStoreShuffleReader
override def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext): ShuffleReader[K, C] = { new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) }
|