privatevar rangeBounds: Array[K] = { if (partitions <= 1) { Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. // Cast to double to avoid overflowing ints or longs // 总采样点的个数,不超过 1e6,注意 partitions 是分区后的分区个数 val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. // 每个分区的采样点个数,并乘了3进行过采样 val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt // 使用蓄水池采样法(见下)进行采样,返回总数据个数,和每个分区的采样情况(partitionId, 该分区数据总个数, sample) val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) val candidates = ArrayBuffer.empty[(K, Float)] val imbalancedPartitions = mutable.Set.empty[Int] sketched.foreach { case (idx, n, sample) => // 如果一个分区采样过多,就重新采样它 if (fraction * n > sampleSizePerPartition) { imbalancedPartitions += idx } else { // The weight is 1 over the sampling probability. // 权重 = 该分区数据总个数 / 采样点数 val weight = (n.toDouble / sample.length).toFloat for (key <- sample) { candidates += ((key, weight)) } } } if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. // 重新采样 val imbalanced = newPartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) val seed = byteswap32(-rdd.id - 1) val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() // 以采样率的倒数做权重 val weight = (1.0 / fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } // 返回每个分区边界数据的数组(见下),数组长度为分区个数 - 1 (很好理解,切4份西瓜,需要3刀) RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size)) } } }
defreservoirSampleAndCount[T: ClassTag]( input: Iterator[T], k: Int, seed: Long = Random.nextLong()) : (Array[T], Long) = { val reservoir = newArray[T](k) // 装采样点的蓄水池 // Put the first k elements in the reservoir. var i = 0 while (i < k && input.hasNext) { val item = input.next() reservoir(i) = item i += 1 }
// If we have consumed all the elements, return them. Otherwise do the replacement. if (i < k) { // If input size < k, trim the array to return only an array of input size. // 如果数据总个数不足采样个数,那就全部采样了,然后返回 val trimReservoir = newArray[T](i) System.arraycopy(reservoir, 0, trimReservoir, 0, i) (trimReservoir, i) } else { // If input size > k, continue the sampling process. var l = i.toLong val rand = newXORShiftRandom(seed) // 对整个 input 遍历一次 while (input.hasNext) { val item = input.next() l += 1 // There are k elements in the reservoir, and the l-th element has been // consumed. It should be chosen with probability k/l. The expression // below is a random long chosen uniformly from [0,l) val replacementIndex = (rand.nextDouble() * l).toLong // 取[0,l)的随机数d // 如果 d 在 k 的范围内,则用 item 替换池子里的第d个数据。 if (replacementIndex < k) { reservoir(replacementIndex.toInt) = item } } (reservoir, l) } }
按权重选择边界
/** * Determines the bounds for range partitioning from candidates with weights indicating how many * items each represents. Usually this is 1 over the probability used to sample this candidate. * * @param candidates unordered candidates with weights * @param partitions number of partitions * @return selected bounds */ defdetermineBounds[K : Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] val ordered = candidates.sortBy(_._1) // 把采样点排好序 val numCandidates = ordered.size val sumWeights = ordered.map(_._2.toDouble).sum val step = sumWeights / partitions var cumWeight = 0.0 var target = step val bounds = ArrayBuffer.empty[K] var i = 0 var j = 0 var previousBound = Option.empty[K] while ((i < numCandidates) && (j < partitions - 1)) { val (key, weight) = ordered(i) cumWeight += weight if (cumWeight >= target) { // Skip duplicate values. if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { bounds += key target += step j += 1 previousBound = Some(key) } } i += 1 } bounds.toArray }