本文将磁盘文件的存储和管理,注意设计两个类 DiskBlockManager 和 DiskStore
DiskBlockManager
建立BlockID和磁盘中真实文件的一一映射,同时负责创建文件夹与删除文件夹
成员
-
subDirsPerLocalDir:每个LOCAL_DIRS下目录的最大个数,spark.diskStore.subDirectories
(默认64)
-
localDirs:本地目录数组,由createLocalDirs()
创建。
-
subDirs:2维数组[localDirs个数][subDirsPerLocalDir 64]
,相当于人为把上面的目录按hash散列开,2级目录用于防止出现顶层inodes个数过多。(block文件就在该目录下)
下面以我的spark on yarn集群为例:
localDirs :root + "blockmgr" + "-" + UUID.randomUUID
,其中 root 由spark.local.dir
指定(可多个),默认是 /tmp
。总算知道这玩意是干嘛的了,很明显该目录对应的磁盘对性能要求高!
subDirs:可以看到散列了5个子文件夹
[xxx tmp]$ tree ./blockmgr* ./blockmgr-37c233af-359c-4bcd-8dcc-5f216e55ce7d ├── 0c ├── 0d ├── 0e ├── 11 └── 13
|
- shutdownHook:给
ShutdownHookManager
添加一个钩子,调用doStop,递归删除localDirs下的文件
难点方法
getFile
根据名字或者blockID返回磁盘中的File(subDir, filename),如果没有就创建
def getFile(filename: String): File = { val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length val subDirId = (hash / localDirs.length) % subDirsPerLocalDir val subDir = subDirs(dirId).synchronized { val old = subDirs(dirId)(subDirId) if (old != null) { old } else { val newDir = new File(localDirs(dirId), "%02x".format(subDirId)) if (!newDir.exists() && !newDir.mkdir()) { throw new IOException(s"Failed to create local dir in $newDir.") } subDirs(dirId)(subDirId) = newDir newDir } } new File(subDir, filename) }
def getFile(blockId: BlockId): File = getFile(blockId.name)
|
createTempXXXBlock
-
createTempLocalBlock
:创建存放本地计算中间临时文件的Block
blockId = temp_local_ + “randomUUID”
-
createTempShuffleBlock
:创建存放shuffle计算中间临时文件(如shuffle排序内存不够发生spill)的Block,blockId = temp_shuffle_ + “randomUUID”
def createTempLocalBlock(): (TempLocalBlockId, File) = { var blockId = new TempLocalBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempLocalBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) }
def createTempShuffleBlock(): (TempShuffleBlockId, File) = { var blockId = new TempShuffleBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) }
|
stop
调用stop()
,删除文件
private[spark] def stop() { try { ShutdownHookManager.removeShutdownHook(shutdownHook) } catch { case e: Exception => logError(s"Exception while removing shutdown hook.", e) } doStop() }
private def doStop(): Unit = { if (deleteFilesOnStop) { localDirs.foreach { localDir => if (localDir.isDirectory() && localDir.exists()) { try { if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) { Utils.deleteRecursively(localDir) } } catch { case e: Exception => logError(s"Exception while deleting local spark dir: $localDir", e) } } } } }
|
DiskStore
负责将blocks写入磁盘及删改查操作
成员
- minMemoryMapBytes: 使用内存映射(java nio 功能)的最小值
spark.storage.memoryMapThreshold
默认2M
- maxMemoryMapBytes:使用内存映射的最大值
- blockSizes:一个ConcurrentHashMap,BlockId -> blockSize
难点方法
put
增
def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) val out = new CountingWritableChannel(openForWrite(file)) var threwException: Boolean = true try { writeFunc(out) blockSizes.put(blockId, out.getCount) threwException = false } finally { try { out.close() } catch { case ioe: IOException => if (!threwException) { threwException = true throw ioe } } finally { if (threwException) { remove(blockId) } } } val finishTime = System.currentTimeMillis logDebug("Block %s stored as %s file on disk in %d ms".format( file.getName, Utils.bytesToString(file.length()), finishTime - startTime)) }
|
以下是一种调用它的方式
def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { put(blockId) { channel => bytes.writeFully(channel) } }
|
getBytes
查,根据blockId,返回BlockData。
有两种:DiskBlockData
(普通) 以 及EncryptedBlockData
(加密)
def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) val blockSize = getSize(blockId)
securityManager.getIOEncryptionKey() match { case Some(key) => new EncryptedBlockData(file, blockSize, conf, key) case _ => new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize) } }
|
remove
删
def remove(blockId: BlockId): Boolean = { blockSizes.remove(blockId) val file = diskManager.getFile(blockId.name) if (file.exists()) { val ret = file.delete() if (!ret) { logWarning(s"Error deleting ${file.getPath()}") } ret } else { false } }
|
小结
DiskBlockManager 用于建立BlockID和磁盘中真实文件的一一映射,同时负责创建文件夹与删除文件夹,学习时可以找到那个文件夹进入看看。
DiskStore 负责将数据写入磁盘以及后续的读取与删除,相比MemoryStore它更简单。因为MemoryStore是真正的仓库,它需要定义数据结构;而DiskStore的数据是在磁盘的。
todo: 未来学习下java nio,再学习BlockData
。