0%

spark源码-BlockManager

每台节点(driver and executors)的block的总管理者,主要功能就是在本地或者远程的store中(堆内内存、磁盘、堆外内存) put、get、 block。

作为总管理者,BlockManager 依赖众多对象。

重要成员

从成员可以大体4类:RPC、传输、内存磁盘的 get or put(本文侧重)、shuffle

RPC 相关(todo)

  • master:BlockManagerMaster,RPC相关

  • rpcEnv

  • blockManagerId:该blockManager的Id,分布式系统

  • slaveEndpoint

块及其传输相关(todo)

  • serializerManager:序列化管理者

  • mapOutputTracker:跟踪 shuffle write 的输出

  • blockTransferService:块传输服务(netty)

  • securityManager:块加密

  • remoteReadNioBufferConversion:spark.network.remoteReadNioBufferConversion

  • futureExecutionContext

  • maxFailuresBeforeLocationRefresh

  • remoteBlockTempFileManager

内存磁盘相关

  • memoryManager

  • memoryStore

  • diskBlockManager

  • diskStore

  • blockInfoManager

shuffle 相关

  • shuffleManager:使用的shuffleManager

  • externalShuffleServiceEnabled:spark.shuffle.service.enabled,是否使用外部排序(todo),默认false

  • externalShuffleServicePort

  • shuffleServerId

  • shuffleClient:

重点功能

initialize

只有掉用了initialize方法,该BlockManager才可用,看看就行,pass

def initialize(appId: String): Unit = {
blockTransferService.init(this)
shuffleClient.init(appId)

blockReplicationPolicy = {
val priorityClass = conf.get(
"spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
val clazz = Utils.classForName(priorityClass)
val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
logInfo(s"Using $priorityClass for block replication policy")
ret
}

val id =
BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)

val idFromMaster = master.registerBlockManager(
id,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)

blockManagerId = if (idFromMaster != null) idFromMaster else id

shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}

// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer()
}

logInfo(s"Initialized BlockManager: $blockManagerId")
}

Put

block就是文件,所以只要有spark计算有写文件操作(内存和磁盘),那么就要用到 put。

内容较多,我的学习目标是:先思考清楚spark计算流程,至于BlockManager内部弄明白调用流程和store的选择即可。能力有限,下面的方式并不是全部。

RDD cache

RDD 的缓存 参见我写的spark cache:第一次执行时,自然是存。并且存完后也要读,因为该函数就是要get,这里就不提读步骤。

// 高层调用BlockManager
ShuffleMapTask/ResultTask.runTask -> RDD.iterator -> RDD.getOrCompute -> getOrElseUpdate

// BlockManager内部
getOrElseUpdate -> doPutIterator -> doput

// 细看下doPutIterator的流程
level.useMemory -true--> level.deserialized -true--> memoryStore.putIteratorAsValues 注意
-false-> memoryStore.putIteratorAsBytes 注意
-false-> diskStore.put
//注意: 当level为内存和磁盘时,会先存内存,内存不足再存磁盘

Broadcast

广播变量的存储。这只是简版,想看详细的参照

// 高层调用BlockManager
TorrentBroadcast.writeBlocks -> putSingle

// BlockManager内部
putSingle -> putIterator -> doPutIterator -> doput

// doPutIterator 同理

RDD block Replication

当RDD的storage level中的_replication大于1时,BlockManager需要将block数据发到另一个远程结点以备份,此时BlockManager会向远程结点发送UploadBlock消息,远程结点在收到该消息后会申请存储内存以存放收到的block数据。

// 高层调用BlockManage,分2类
NettyBlockRpcServer.receive -> match uploadBlock -> putBlockData
NettyBlockRpcServer.receiveStream -> putBlockDataAsStream

// BlockManager内部
putBlockData -> putBytes -> doPutBytes -> doPut
putBlockDataAsStream -> 看不懂 -> putBytes -> doPutBytes -> doPut

// doPutBytes 存储逻辑和 doPutIterator 基本一样
// 区别是 doPutIterator 输入是 Iterator[T];doPutBytes 输入是 ChunkedByteBuffer,它需要先反序列化
// 分析:doPutIterator 是缓存步骤计算得到的自然是java对象; doPutBytes 是节点传自然是序列化的

Shuffle 相关

Shuffle时有许多写磁盘的操作,它们都是使用了一个专门的直接将数据写入磁盘的类:

DiskWriter:DiskBlockObjectWriter,而它通过BlockManaged得到。

def getDiskWriter(
blockId: BlockId,
file: File,
serializerInstance: SerializerInstance,
bufferSize: Int,
writeMetrics: ShuffleWriteMetrics): DiskBlockObjectWriter = {
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(file, serializerManager, serializerInstance, bufferSize,
syncWrites, writeMetrics, blockId)
}
// 通用流程
// 1. 得到 DiskWriter
val writer = blockManager.getDiskWriter(xxx)

// 2. 将数据写入DiskWriter
writer.write(xxx)

// 3. 提交并返回偏移和大小
val fileSegment = writer.commitAndGet()

主要有下列情况

  • ShuffleSpill:BlockId:temp_shuffle_ + “randomUUID”

  • BypassShuffle的前3步:BlockId:temp_shuffle_ + “randomUUID”

  • ShuffleWrite生成的合并大文件:BlockId:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId

get

有写就有读,与写不同的是:写只能写本地,读可以读本地和远程,这个我想应该很好理解。

RDD cache

// 高层调用BlockManager
ShuffleMapTask/ResultTask.runTask -> RDD.iterator -> RDD.getOrCompute -> getOrElseUpdate

// BlockManager内部
getOrElseUpdate -> get -> getLocalValues
-> getRemoteValues -> getRemoteBytes -> 反序列

// 细看下 getLocalValues 的流程
level.useMemory -true--> level.deserialized -true--> memoryStore.getValues
-false-> memoryStore.getBytes -> 反序列
-false-> level.useDisk -true--> diskStore.getBytes -> 反序列

Broadcast

广播变量的读取。这只是简版,想看详细的参照

// 高层调用BlockManager
TorrentBroadcast.readBroadcastBlock -> getLocalValues

RDD block Replication

// 高层调用BlockManage,只有一类
NettyBlockRpcServer.receive -> match openBlocks -> getBlockData

// BlockManager内部
getBlockData -> getLocalBytes

Shuffle 相关

shuffle read 阶段:BlockStoreShuffleReader 核心是ShuffleBlockFetcherIterator

现在可以理解它们名字的含义了,就是去fetch write阶段写下的block,

分本地localBlocks 和远程 remoteBlocks

这部分内容较复杂,需结合rpc,先pass

小结

BlockManager 块管理者,使用它时,直接get、put即可;而之前对其底层结构的了解这一过程还是很有意思的,同时使我对spark有了更深的认识。

至此,spark文件系统告一段落,以后学了rpc再回来看文件系统的rpc。