介绍spark的缓存功能
简介
复杂的任务中,某个中间转换结果可能会被多次调用,此时可以使用 spark 的缓存功能,将计算的中间过程缓存在内存或者磁盘中,以便再次使用,减少不必要的计算。
特点
- 懒加载,只有RDD触发action时才会进行计算并且缓存
- 5个参数控制存储级别:是否用内存缓存、 是否用磁盘缓存、是否用堆外内存缓存、是否序列化、缓存个数
- cache() 是 persist() 也是 persist(StorageLevel.MEMORY_ONLY)
- api:
rdd.cache()
or rdd.persist()
、rdd.unpersist()
、sc.getPersistentRDDs
接着,以MEMORY_ONLY模式为例,从源码中验证这一切,同时加深对 spark内存管理 的理解
流程
cache() or persist(…)
解决一个疑问:rdd1.cache() rdd1 -> rdd2 rdd2.cahce()
,此时 rdd1 和 rdd2 都会被缓存
def cache(): this.type = persist() def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
def persist(newLevel: StorageLevel): this.type = { if (isLocallyCheckpointed) { persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true) } else { persist(newLevel, allowOverride = false) } }
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = { if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } if (storageLevel == StorageLevel.NONE) { sc.cleaner.foreach(_.registerRDDForCleanup(this)) sc.persistRDD(this) } storageLevel = newLevel this }
|
getOrCompute(…)
cache() 给 RDD埋了一个属性storageLevel
,只有执行行动操作才会真正执行缓存
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } }
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { val blockId = RDDBlockId(id, partition.index) var readCachedBlock = true SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { readCachedBlock = false computeOrReadCheckpoint(partition, context) }) match { case Left(blockResult) => if (readCachedBlock) { val existingMetrics = context.taskMetrics().inputMetrics existingMetrics.incBytesRead(blockResult.bytes) new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) { override def next(): T = { existingMetrics.incRecordsRead(1) delegate.next() } } } else { new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) } case Right(iter) => new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) } }
|
getOrElseUpdate(…)
getOrElseUpdate
是BlockManager
中的方法
函数参数makeIterator
就是我们的computeOrReadCheckpoint
方法
def getOrElseUpdate[T]( blockId: BlockId, level: StorageLevel, classTag: ClassTag[T], makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = { get[T](blockId)(classTag) match { case Some(block) => return Left(block) case _ => } doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match { case None => val blockResult = getLocalValues(blockId).getOrElse { releaseLock(blockId) throw new SparkException(s"get() failed for block $blockId even though we held a lock") } releaseLock(blockId) Left(blockResult) case Some(iter) => Right(iter) } }
|
补充
persistentRdds
一个map :key 为 rdd.id ,value 为 rdd
private[spark] val persistentRdds = { val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]() map.asScala } def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
val ds: collection.Map[Int, RDD[_]] = sc.getPersistentRDDs
|
unpersist()
取消缓存
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) { env.blockManager.master.removeRdd(rddId, blocking) persistentRdds.remove(rddId) listenerBus.post(SparkListenerUnpersistRDD(rddId)) }
|
小结
当你对spark的存储有一点理解时,本节相对简单。缓存就是将RDD的storageLevel
属性改写,并把该RDD加入persistentRdds
这个map中。当执行到iterator
时触发,如果没有缓存过,则进行计算并写入BLock中,有缓存直接从BLock中提取即可。