// 换个别名 // 理解:一个task一个线程,锁的就它 privatetypeTaskAttemptId= Long
// BlockId 和 BlockInfo的映射 @GuardedBy("this") private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]
// 任务ID 和 它拥有的写锁 @GuardedBy("this") private[this] val writeLocksByTask = new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]] with mutable.MultiMap[TaskAttemptId, BlockId]
// 任务ID 和 它拥有的读锁 @GuardedBy("this") private[this] val readLocksByTask = new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
deflockForWriting( blockId: BlockId, blocking: Boolean = true): Option[BlockInfo] = synchronized { logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId") do { infos.get(blockId) match { caseNone => returnNone caseSome(info) => // 当该block没有人读,且没有人写时才能锁它,并返回 if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) { info.writerTask = currentTaskAttemptId writeLocksByTask.addBinding(currentTaskAttemptId, blockId) logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId") returnSome(info) } } // 如果之前没拿到锁,就wait(),blocking控制是否等待 if (blocking) { wait() } } while (blocking) None }
lockForReading
加读锁
deflockForReading( blockId: BlockId, blocking: Boolean = true): Option[BlockInfo] = synchronized { logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId") do { infos.get(blockId) match { caseNone => returnNone caseSome(info) => // 没有人写,即可加读锁,并返回 if (info.writerTask == BlockInfo.NO_WRITER) { info.readerCount += 1 readLocksByTask(currentTaskAttemptId).add(blockId) logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId") returnSome(info) } } // 同上 if (blocking) { wait() } } while (blocking) None }
unlock
放锁
defunlock(blockId: BlockId, taskAttemptId: Option[TaskAttemptId] = None): Unit = synchronized { val taskId = taskAttemptId.getOrElse(currentTaskAttemptId) logTrace(s"Task $taskId releasing lock for $blockId") val info = get(blockId).getOrElse { thrownewIllegalStateException(s"Block $blockId not found") } // 放写锁 if (info.writerTask != BlockInfo.NO_WRITER) { info.writerTask = BlockInfo.NO_WRITER writeLocksByTask.removeBinding(taskId, blockId) } else { assert(info.readerCount > 0, s"Block $blockId is not locked for reading") // 放读锁 info.readerCount -= 1 val countsForTask = readLocksByTask(taskId) // remove(blockId, n) 删除重复set中指定元素 n 个 。此处自然只删一个 val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1 assert(newPinCountForTask >= 0, s"Task $taskId release lock on block $blockId more times than it acquired it") } notifyAll() }
downgradeLock
由写锁降为读锁。todo:了解这玩意有啥用
defdowngradeLock(blockId: BlockId): Unit = synchronized { logTrace(s"Task $currentTaskAttemptId downgrading write lock for $blockId") val info = get(blockId).get require(info.writerTask == currentTaskAttemptId, s"Task $currentTaskAttemptId tried to downgrade a write lock that it does not hold on" + s" block $blockId") // 放锁 unlock(blockId) // 加读锁,注意blocking为false val lockOutcome = lockForReading(blockId, blocking = false) assert(lockOutcome.isDefined) }