0%

spark源码-DiskBlockManager和DiskStore

本文将磁盘文件的存储和管理,注意设计两个类 DiskBlockManager 和 DiskStore

DiskBlockManager

建立BlockID和磁盘中真实文件的一一映射,同时负责创建文件夹与删除文件夹

成员

  • subDirsPerLocalDir:每个LOCAL_DIRS下目录的最大个数,spark.diskStore.subDirectories(默认64)

  • localDirs:本地目录数组,由createLocalDirs()创建。

  • subDirs:2维数组[localDirs个数][subDirsPerLocalDir 64],相当于人为把上面的目录按hash散列开,2级目录用于防止出现顶层inodes个数过多。(block文件就在该目录下)

下面以我的spark on yarn集群为例:

localDirs :root + "blockmgr" + "-" + UUID.randomUUID,其中 root 由spark.local.dir指定(可多个),默认是 /tmp。总算知道这玩意是干嘛的了,很明显该目录对应的磁盘对性能要求高!

subDirs:可以看到散列了5个子文件夹

[xxx tmp]$ tree ./blockmgr*
./blockmgr-37c233af-359c-4bcd-8dcc-5f216e55ce7d
├── 0c
├── 0d
├── 0e
├── 11
└── 13
  • shutdownHook:给ShutdownHookManager添加一个钩子,调用doStop,递归删除localDirs下的文件

难点方法

getFile

根据名字或者blockID返回磁盘中的File(subDir, filename),如果没有就创建

def getFile(filename: String): File = {
// 先hash取余 dirId, 再hash取余 subDirId
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// 如果
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
// 子目录格式:16进制2位符
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
new File(subDir, filename)
}

def getFile(blockId: BlockId): File = getFile(blockId.name)

createTempXXXBlock

  • createTempLocalBlock:创建存放本地计算中间临时文件的Block

    blockId = temp_local_ + “randomUUID”

  • createTempShuffleBlock:创建存放shuffle计算中间临时文件(如shuffle排序内存不够发生spill)的Block,blockId = temp_shuffle_ + “randomUUID”

def createTempLocalBlock(): (TempLocalBlockId, File) = {
var blockId = new TempLocalBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempLocalBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId)) // getFile创建目录
}

def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
var blockId = new TempShuffleBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
blockId = new TempShuffleBlockId(UUID.randomUUID())
}
(blockId, getFile(blockId)) // getFile创建目录
}

stop

调用stop(),删除文件

private[spark] def stop() {
try {
ShutdownHookManager.removeShutdownHook(shutdownHook)
} catch {
case e: Exception =>
logError(s"Exception while removing shutdown hook.", e)
}
doStop()
}

private def doStop(): Unit = {
// deleteFilesOnStop:DiskBlockManager 的 构造参数,决定是否递归删除localDirs下的文件
// 一般只有使用外部shuffle服务时,它才为false
if (deleteFilesOnStop) {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
try {
if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
Utils.deleteRecursively(localDir)
}
} catch {
case e: Exception =>
logError(s"Exception while deleting local spark dir: $localDir", e)
}
}
}
}
}

DiskStore

负责将blocks写入磁盘及删改查操作

成员

  • minMemoryMapBytes: 使用内存映射(java nio 功能)的最小值spark.storage.memoryMapThreshold默认2M
  • maxMemoryMapBytes:使用内存映射的最大值
  • blockSizes:一个ConcurrentHashMap,BlockId -> blockSize

难点方法

put

def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
if (contains(blockId)) {
throw new IllegalStateException(s"Block $blockId is already present in the disk store")
}
logDebug(s"Attempting to put block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId) // 创建文件夹,并返回file
val out = new CountingWritableChannel(openForWrite(file)) // 开channel
var threwException: Boolean = true
try {
writeFunc(out) // 写入文件夹中
blockSizes.put(blockId, out.getCount) // 计数size,加入blockSizes map中
threwException = false
} finally {
try {
out.close()
} catch {
case ioe: IOException =>
if (!threwException) {
threwException = true
throw ioe
}
} finally {
if (threwException) {
remove(blockId)
}
}
}
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName,
Utils.bytesToString(file.length()),
finishTime - startTime))
}

以下是一种调用它的方式

def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
put(blockId) { channel =>
bytes.writeFully(channel)
}
}

getBytes

查,根据blockId,返回BlockData。

有两种:DiskBlockData(普通) 以 及EncryptedBlockData (加密)

def getBytes(blockId: BlockId): BlockData = {
val file = diskManager.getFile(blockId.name)
val blockSize = getSize(blockId)

// 分两种,一种是加密的一种是非加密的
securityManager.getIOEncryptionKey() match {
case Some(key) =>
new EncryptedBlockData(file, blockSize, conf, key)
case _ =>
new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
}
}

remove

def remove(blockId: BlockId): Boolean = {
blockSizes.remove(blockId)
val file = diskManager.getFile(blockId.name)
if (file.exists()) {
val ret = file.delete()
if (!ret) {
logWarning(s"Error deleting ${file.getPath()}")
}
ret
} else {
false
}
}

小结

DiskBlockManager 用于建立BlockID和磁盘中真实文件的一一映射,同时负责创建文件夹与删除文件夹,学习时可以找到那个文件夹进入看看。

DiskStore 负责将数据写入磁盘以及后续的读取与删除,相比MemoryStore它更简单。因为MemoryStore是真正的仓库,它需要定义数据结构;而DiskStore的数据是在磁盘的。

todo: 未来学习下java nio,再学习BlockData