0%

spark源码-block

首先明确spark有个自己文件系统,block就是里面的一个文件。如:缓存后的RDD的一个分区是一个block;计算产生的临时文件也是block任何你要存的东西都是block

因此既然它是文件,它就有文件名元信息数据,理解它们,你就理解了block!

BlockId

每个Block都有个ID与之一一对应,这个BlockId也就是该文件的文件名

sealed abstract class BlockId {
def name: String

// RDDBlockId、ShuffleBlockId什么的都是它的子类
def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
def isRDD: Boolean = isInstanceOf[RDDBlockId]
def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]

override def toString: String = name
}

理解了取名的规则,你就对spark的计算流程理解了。例如:

todo:理解全部的 BlockId

普通RDD

  • RDDBlockId "rdd_" + rddId + "_" + splitIndex

简单的按 rddId 和 分区ID 划分

Shuffle过程

  • ShuffleBlockId "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
  • ShuffleDataBlockId "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
  • ShuffleIndexBlockId "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"

ShuffleBlockId 就是ShuffleWrite时单个task生成的那个大文件,ShuffleDataBlockId是这个大文件的临时叫法,ShuffleIndexBlockId是索引文件。而它们由shuffleId、mapId、reduceId 3者共同确定。

注意:reduceId是0(NOOP_REDUCE_ID,因为SortShuffle最后生成的是1个大文件,所以这个reduceid没什么软用。理解清楚SortShuffleManager和被淘汰的HashShuffleManager的区别,骚年。

broadcast

  • BroadcastBlockId "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)

广播变量的存储,分两种:原对象形成的block,直接是 broadcast_" + broadcastId;还有原对象切分成的小块,后接块序号。

临时文件

  • TempLocalBlockId temp_local_ + “randomUUID”

  • TempShuffleBlockId temp_shuffle_ + “randomUUID”

TempLocalBlockId是本地计算中间临时文件

TempShuffleBlockId是shuffle计算中间临时文件:1、BypassShuffle 单分区临时文件;2、shuffle排序时如果内存不够发生spill到磁盘的文件。

BlockInfo

block的元信息如下

构造参数:

  • StorageLevel:存储等级
  • classTag:类名,用于序列化需求
  • tellMaster:是否需要通知master block改变,大多数都要,但是broadcast blocks不用

内部属性(可get、set)后面两参数用于实现锁机制:

  • size: block的大小(bytes)
  • readerCount: 目前有多少个Task在读它
  • writerTask持有该块写锁的Task ID。默认值:BlockInfo.NO_WRITER,代表没有人写

在set值后,会调用checkInvariants方法,检查值是否合格。

private def checkInvariants(): Unit = {
// 可多人读
assert(_readerCount >= 0)
// 读写互斥
assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
}

BlockInfoManager

它用于控制块的元信息,以实现块的锁机制,一个经典的文件读写锁机制。

需求:

  • 同一个文件可以多个人读,前提没人写;且读锁可重入
  • 只能有一个人写文件,且没人读 ;且写锁不可重入
  • 当拿不到锁,可以选择一直等待(实现方法:拿不到锁wait,放锁notify)

成员

// 换个别名
// 理解:一个task一个线程,锁的就它
private type TaskAttemptId = Long

// BlockId 和 BlockInfo的映射
@GuardedBy("this")
private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]

// 任务ID 和 它拥有的写锁
@GuardedBy("this")
private[this] val writeLocksByTask =
new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
with mutable.MultiMap[TaskAttemptId, BlockId]

// 任务ID 和 它拥有的读锁
@GuardedBy("this")
private[this] val readLocksByTask =
new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]

writeLocksByTask的数据结构的写法真是活久见,使用有重复值的set实现可重入锁

难点方法

registerTask

readLocksByTask添加映射,值自然是空的

def registerTask(taskAttemptId: TaskAttemptId): Unit = synchronized {
require(!readLocksByTask.contains(taskAttemptId),
s"Task attempt $taskAttemptId is already registered")
readLocksByTask(taskAttemptId) = ConcurrentHashMultiset.create()
}

注意一个特殊的TaskId: NON_TASK_WRITER,它在BlockInfoManager初始化时被注册,非任务线程,如driver线程。

lockNewBlockForWriting

创建新的block时,自然需要加写锁,它用到了lockForReadinglockForWriting

def lockNewBlockForWriting(
blockId: BlockId,
newBlockInfo: BlockInfo): Boolean = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
// 由于有多线程的存在不能直接执行操作,而是先判断读锁,防止别的兄弟先一步已经lockNewBlockForWriting了
lockForReading(blockId) match {
case Some(info) =>
false
case None =>
// 给 infos 加映射
infos(blockId) = newBlockInfo
// 上写锁
lockForWriting(blockId)
true
}
}

lockForWriting

加写锁

def lockForWriting(
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo] = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
do {
infos.get(blockId) match {
case None => return None
case Some(info) =>
// 当该block没有人读,且没有人写时才能锁它,并返回
if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
info.writerTask = currentTaskAttemptId
writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
return Some(info)
}
}
// 如果之前没拿到锁,就wait(),blocking控制是否等待
if (blocking) {
wait()
}
} while (blocking)
None
}

lockForReading

加读锁

def lockForReading(
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo] = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
do {
infos.get(blockId) match {
case None => return None
case Some(info) =>
// 没有人写,即可加读锁,并返回
if (info.writerTask == BlockInfo.NO_WRITER) {
info.readerCount += 1
readLocksByTask(currentTaskAttemptId).add(blockId)
logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
return Some(info)
}
}
// 同上
if (blocking) {
wait()
}
} while (blocking)
None
}

unlock

放锁

def unlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized {
val taskId = taskAttemptId.getOrElse(currentTaskAttemptId)
logTrace(s"Task $taskId releasing lock for $blockId")
val info = get(blockId).getOrElse {
throw new IllegalStateException(s"Block $blockId not found")
}
// 放写锁
if (info.writerTask != BlockInfo.NO_WRITER) {
info.writerTask = BlockInfo.NO_WRITER
writeLocksByTask.removeBinding(taskId, blockId)
} else {
assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
// 放读锁
info.readerCount -= 1
val countsForTask = readLocksByTask(taskId)
// remove(blockId, n) 删除重复set中指定元素 n 个 。此处自然只删一个
val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
assert(newPinCountForTask >= 0,
s"Task $taskId release lock on block $blockId more times than it acquired it")
}
notifyAll()
}

downgradeLock

由写锁降为读锁。todo:了解这玩意有啥用

def downgradeLock(blockId: BlockId): Unit = synchronized {
logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId")
val info = get(blockId).get
require(info.writerTask == currentTaskAttemptId,
s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on" +
s" block $blockId")
// 放锁
unlock(blockId)
// 加读锁,注意blocking为false
val lockOutcome = lockForReading(blockId, blocking = false)
assert(lockOutcome.isDefined)
}

BlockResult

调用 getLocalValues(blockId) 或者 getRemoteValues(blockId) 时,以java对象的形式返回block。

注意,数据是存在store中的,如memoryStore,调它的API取出数据封装成BlockResult。

private[spark] class BlockResult(
val data: Iterator[Any], // 数据,从memoryStore中取出
val readMethod: DataReadMethod.Value, // Memory, Disk, Hadoop, Network
val bytes: Long) // 大小,从info中取出

BlockData

调用 getLocalBytes(blockId) 或者 getRemoteBytes(blockId) 时,以序列化字节的形式返回block。

private[spark] trait BlockData {
def toInputStream(): InputStream
def toNetty(): Object
def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
def toByteBuffer(): ByteBuffer
def size: Long
def dispose(): Unit // 多了一个销毁
}

它有3个实现类,ByteBufferBlockDataDiskBlockData,以及EncryptedBlockData

ByteBufferBlockData为例:

private[spark] class ByteBufferBlockData(
val buffer: ChunkedByteBuffer, // 数据,从memoryStore中取出
val shouldDispose: Boolean) extends BlockData {

// 下面4个是选择以什么形式提取
override def toInputStream(): InputStream = buffer.toInputStream(dispose = false)
override def toNetty(): Object = buffer.toNetty
override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
buffer.copy(allocator)}
override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer

override def size: Long = buffer.size
override def dispose(): Unit = {
if (shouldDispose) {
buffer.dispose()
}
}
}

小结

我觉得对Block的认识非常非常重要,它是spark内部文件系统的基石。

对知识梳理一波:

Store:真正存数据,如MemoryStore、DiskStore

MemoryManager: 划分内存

Block:文件,有文件名(blockId)、元信息(blockInfo)、(blockInfoManager)、数据(从Store中取)

BlockManager:对Block进行管理(下一节)

RPC:各种master、manager(以后再学)

这当真是一个完整的分布式多人文件系统