0%

spark源码-Partitioner

Partitioner(分区器)学习

Partitioner

分区器,RDD五大特性之五(只针对(k,v)类型的RDD)。它的核心作用是使用 getPartition(key: Any)对每条数据进行分区

abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}

它有两大实现,HashPartitionerRangePartitioner。分区是为了并行处理,所以让每个分区的大小差不多是首要目标。

HashPartitioner

这个是最简单的,直接通过 key 的 hashCode 取模分区,能让数据大致均匀地分布在各个分区。

class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

def numPartitions: Int = partitions

// 看这里
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}

override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}

override def hashCode: Int = numPartitions
}

RangePartitioner

既然有了hash分区,为什么还要range分区呢。事想需要全局排序时,如果使用hash分区,排序后只是分区有序,然后再对分区进行归并排序,这样工作量是不是特别大。所以排序时一般用 RangePartitioner ,比如 sortByKey。它的效果让是一个分区中的元素肯定都是比另一个分区内的元素小或者大。这样分区排序后的数据就是全局有序的。并且它通过采样操作可以让数据比较均匀地分布到各个分区。

它的大致步骤是:对每个分区进行采样(蓄水池采样) -> 判断每个分区的采样结果是否合格,如果不合格再次采样 -> 把采样数据排序,每条采样数据都有权重,按权重,计算出分解边界数组rangeBounds -> 按边界,把数据划分到不同分区getPartition(key: Any)

rangeBounds

private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
// Cast to double to avoid overflowing ints or longs
// 总采样点的个数,不超过 1e6,注意 partitions 是分区后的分区个数
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
// 每个分区的采样点个数,并乘了3进行过采样
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
// 使用蓄水池采样法(见下)进行采样,返回总数据个数,和每个分区的采样情况(partitionId, 该分区数据总个数, sample)
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// If a partition contains much more than the average number of items, we re-sample from it
// to ensure that enough items are collected from that partition.
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
// 如果一个分区采样过多,就重新采样它
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
// 权重 = 该分区数据总个数 / 采样点数
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// Re-sample imbalanced partitions with the desired sampling probability.
// 重新采样
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
// 以采样率的倒数做权重
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
// 返回每个分区边界数据的数组(见下),数组长度为分区个数 - 1 (很好理解,切4份西瓜,需要3刀)
RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
}
}
}

蓄水池采样算法(Reservoir Sampling)

  • 场景:数据流长度N很大且不可知,不能一次性存入内存;保证时间复杂度为O(N);随机选取k个数,每个数被选中的概率为 k/N。
  • 步骤:
    1. 如果数据总量小于k,则依次放入蓄水池。池子满了,进入步骤2。
    2. 当遍历到第i个数据时,在[0, i]范围内取以随机数d,若d的落在[0, k-1]范围内,则用该数据替换蓄水池中的第d个数据。
    3. 重复步骤2,直到遍历完。

想深究原理的看这个,下面是 spark 中对该算法是实现。

def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}

核心步骤是通过 SamplingUtils.reservoirSampleAndCount(xxx) 得到采样结果

def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
val reservoir = new Array[T](k) // 装采样点的蓄水池
// Put the first k elements in the reservoir.
var i = 0
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}

// If we have consumed all the elements, return them. Otherwise do the replacement.
if (i < k) {
// If input size < k, trim the array to return only an array of input size.
// 如果数据总个数不足采样个数,那就全部采样了,然后返回
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
// If input size > k, continue the sampling process.
var l = i.toLong
val rand = new XORShiftRandom(seed)
// 对整个 input 遍历一次
while (input.hasNext) {
val item = input.next()
l += 1
// There are k elements in the reservoir, and the l-th element has been
// consumed. It should be chosen with probability k/l. The expression
// below is a random long chosen uniformly from [0,l)
val replacementIndex = (rand.nextDouble() * l).toLong // 取[0,l)的随机数d
// 如果 d 在 k 的范围内,则用 item 替换池子里的第d个数据。
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
}
(reservoir, l)
}
}

按权重选择边界

/**
* Determines the bounds for range partitioning from candidates with weights indicating how many
* items each represents. Usually this is 1 over the probability used to sample this candidate.
*
* @param candidates unordered candidates with weights
* @param partitions number of partitions
* @return selected bounds
*/
def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
val ordered = candidates.sortBy(_._1) // 把采样点排好序
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}

getPartition(key: Any)

有了边界,getPartition(key: Any)就很好计算了,其实就是个在有序区间找位置的过程。分区少就一个个比过去,如果区间数大于128,就使用二分查找获取分区位置。

def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// 二分查找
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
// 根据升序还是降序,返回相应的PartitionId。
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}