每台节点(driver and executors)的block的总管理者,主要功能就是在本地或者远程的store中(堆内内存、磁盘、堆外内存) put、get、 block。
作为总管理者,BlockManager 依赖众多对象。
重要成员
从成员可以大体4类:RPC、传输、内存磁盘的 get or put(本文侧重)、shuffle
RPC 相关 (todo)
块及其传输相关 (todo)
serializerManager:序列化管理者
mapOutputTracker:跟踪 shuffle write 的输出
blockTransferService:块传输服务(netty)
securityManager:块加密
remoteReadNioBufferConversion:spark.network.remoteReadNioBufferConversion
futureExecutionContext
maxFailuresBeforeLocationRefresh
remoteBlockTempFileManager
内存磁盘相关 :
memoryManager
memoryStore
diskBlockManager
diskStore
blockInfoManager
shuffle 相关
shuffleManager:使用的shuffleManager
externalShuffleServiceEnabled:spark.shuffle.service.enabled,是否使用外部排序(todo),默认false
externalShuffleServicePort
shuffleServerId
shuffleClient:
重点功能
initialize
只有掉用了initialize方法,该BlockManager才可用,看看就行,pass
def initialize (appId: String ): Unit = { blockTransferService.init(this ) shuffleClient.init(appId) blockReplicationPolicy = { val priorityClass = conf.get( "spark.storage.replication.policy" , classOf[RandomBlockReplicationPolicy ].getName) val clazz = Utils .classForName(priorityClass) val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy ] logInfo(s"Using $priorityClass for block replication policy" ) ret } val id = BlockManagerId (executorId, blockTransferService.hostName, blockTransferService.port, None ) val idFromMaster = master.registerBlockManager( id, maxOnHeapMemory, maxOffHeapMemory, slaveEndpoint) blockManagerId = if (idFromMaster != null ) idFromMaster else id shuffleServerId = if (externalShuffleServiceEnabled) { logInfo(s"external shuffle service port = $externalShuffleServicePort " ) BlockManagerId (executorId, blockTransferService.hostName, externalShuffleServicePort) } else { blockManagerId } if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { registerWithExternalShuffleServer() } logInfo(s"Initialized BlockManager: $blockManagerId " ) }
Put
block就是文件,所以只要有spark计算有写文件操作(内存和磁盘),那么就要用到 put。
内容较多,我的学习目标是:先思考清楚spark计算流程 ,至于BlockManager内部弄明白调用流程和store的选择 即可。能力有限,下面的方式并不是全部。
RDD cache
RDD 的缓存 参见我写的spark cache:第一次执行时,自然是存。并且存完后也要读,因为该函数就是要get,这里就不提读步骤。
ShuffleMapTask /ResultTask .runTask -> RDD .iterator -> RDD .getOrCompute -> getOrElseUpdategetOrElseUpdate -> doPutIterator -> doput level.useMemory -true --> level.deserialized -true --> memoryStore.putIteratorAsValues 注意 -false -> memoryStore.putIteratorAsBytes 注意 -false -> diskStore.put
Broadcast
广播变量的存储。这只是简版,想看详细的参照 。
TorrentBroadcast .writeBlocks -> putSingleputSingle -> putIterator -> doPutIterator -> doput
RDD block Replication
当RDD的storage level中的_replication大于1时,BlockManager需要将block数据发到另一个远程结点以备份,此时BlockManager会向远程结点发送UploadBlock消息,远程结点在收到该消息后会申请存储内存以存放收到的block数据。
NettyBlockRpcServer .receive -> match uploadBlock -> putBlockDataNettyBlockRpcServer .receiveStream -> putBlockDataAsStreamputBlockData -> putBytes -> doPutBytes -> doPut putBlockDataAsStream -> 看不懂 -> putBytes -> doPutBytes -> doPut
Shuffle 相关
Shuffle时有许多写磁盘的操作,它们都是使用了一个专门的直接将数据写入磁盘的类:
DiskWriter:DiskBlockObjectWriter,而它通过BlockManaged得到。
def getDiskWriter ( blockId: BlockId , file: File , serializerInstance: SerializerInstance , bufferSize: Int , writeMetrics: ShuffleWriteMetrics ): DiskBlockObjectWriter = { val syncWrites = conf.getBoolean("spark.shuffle.sync" , false ) new DiskBlockObjectWriter (file, serializerManager, serializerInstance, bufferSize, syncWrites, writeMetrics, blockId) } val writer = blockManager.getDiskWriter(xxx)writer.write(xxx) val fileSegment = writer.commitAndGet()
主要有下列情况
ShuffleSpill :BlockId:temp_shuffle_ + “randomUUID”
BypassShuffle 的前3步:BlockId:temp_shuffle_ + “randomUUID”
ShuffleWrite 生成的合并大文件:BlockId:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
get
有写就有读,与写不同的是:写只能写本地,读可以读本地和远程 ,这个我想应该很好理解。
RDD cache
ShuffleMapTask /ResultTask .runTask -> RDD .iterator -> RDD .getOrCompute -> getOrElseUpdategetOrElseUpdate -> get -> getLocalValues -> getRemoteValues -> getRemoteBytes -> 反序列 level.useMemory -true --> level.deserialized -true --> memoryStore.getValues -false -> memoryStore.getBytes -> 反序列 -false -> level.useDisk -true --> diskStore.getBytes -> 反序列
Broadcast
广播变量的读取。这只是简版,想看详细的参照 。
TorrentBroadcast .readBroadcastBlock -> getLocalValues
RDD block Replication
NettyBlockRpcServer .receive -> match openBlocks -> getBlockDatagetBlockData -> getLocalBytes
Shuffle 相关
shuffle read 阶段:BlockStoreShuffleReader 核心是ShuffleBlockFetcherIterator
现在可以理解它们名字的含义了,就是去fetch write阶段写下的block,
分本地localBlocks 和远程 remoteBlocks
这部分内容较复杂,需结合rpc,先pass
小结
BlockManager 块管理者,使用它时,直接get、put即可;而之前对其底层结构的了解这一过程还是很有意思的,同时使我对spark有了更深的认识。
至此,spark文件系统告一段落,以后学了rpc再回来看文件系统的rpc。