0%

spark源码-dependency

每个RDD都有 dependencies,分窄依赖和宽依赖(ShuffleDependency)。遇到 ShuffleDependency 划分stage。

窄依赖

窄依赖就是 NarrowDependency

父RDD的Partition最多被子RDD的一个Partition使用。常见2种,OneToOneDependencyRangeDependency

@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

// 获取某分区 对应的父RDD的分区ID
def getParents(partitionId: Int): Seq[Int]

override def rdd: RDD[T] = _rdd
}

通过窄依赖可以:

  • 获取父RDD

  • 获取某分区 对应的父RDD的分区ID

OneToOneDependency

子RDD随父RDD一对一,partitionId自然不变

1.png
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
// 一对一
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

// RDD.scala 初始化
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))

RangeDependency

仅用用于Union操作,2个RDD Union 生成的UnionRDD 的 dependencies_ 属性含2个RangeDependency

getParents计算方式如图:

3.png
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {

// 在 UnionRDD 中用到了 RangeDependency
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}

// UnionRDD.scala 初始化
override def getDependencies: Seq[Dependency[_]] = {
val deps = new ArrayBuffer[Dependency[_]]
var pos = 0
for (rdd <- rdds) {
// 在 UnionRDD 中用到了 RangeDependency
deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
pos += rdd.partitions.length
}
deps
}

宽依赖

宽依赖也就是ShuffleDependency

父RDD的Partition被子RDD的多个Partition使用,伴随shuffle

2.png

ShuffleDependency

这里面的内容都很熟悉,通过宽依赖可以获取:

  • 父RDD
  • partitionerserializer
  • keyOrdering
  • aggregator、mapSideCombine
  • shuffleId、shuffleHandle
  • keyClassName、valueClassName、combinerClassName
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]], // RDD 必须是 k,v 的
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {

if (mapSideCombine) {
require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
}
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)

// ShuffleId
val shuffleId: Int = _rdd.context.newShuffleId()

// ShuffleHandle
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

// ShuffledRDD.scala 初始化
override def getDependencies: Seq[Dependency[_]] = {
val serializer = userSpecifiedSerializer.getOrElse {
val serializerManager = SparkEnv.get.serializerManager
if (mapSideCombine) {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[C]])
} else {
serializerManager.getSerializer(implicitly[ClassTag[K]], implicitly[ClassTag[V]])
}
}
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}

使用

第一次调用 dependencies -> getDependencies -> dependencies_

以后 dependencies -> dependencies_

// 调用 getDependencies 封装入 dependencies_ 属性中(该方法用户也可以调用)
// 很明显 getDependencies 仅被调用一次!
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
dependencies_
}
}

/**
* RDD 五大特性三:父RDD依赖列表
* 该方法只被调用一次!
*/
// 该方法默认是deps(普通RDD构造器中的deps)。但可以被重写!比如上面的 UnionRDD 和 ShuffleRDD
protected def getDependencies: Seq[Dependency[_]] = deps

// 真正的 dependencies 属性
private var dependencies_ : Seq[Dependency[_]] = _

小结

原始父类 dependency 里唯一的方法就是 def rdd: RDD[T],通过它获取父RDD。

其子类 NarrowDependencyShuffleDependency 都各自添加了一点功能。

在划分stage时,是按ShuffleDependency 划分。