当Broker Server被分配Replica的时候,该Replica有可能成为Leader状态的Replica或者Follower的状态的Replica。Replica的角色的确定是通过处理LeaderAndIsrRequest来实现的。代码逻辑如下:
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
try {
//将leaderAndIsrRequest请求交给ReplicaManager类中的函数becomeLeaderOrFollower来处理
val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
//发送处理的结果给controller
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse)))
} catch {
......
}
}
进入ReplicaManager的becomeLeaderOrFollower。
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
offsetManager: OffsetManager):
(collection.Map[(String, Int), Short], Short) = {
......
replicaStateChangeLock synchronized {
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
//判断request请求的时效性,如果request内部的时钟小于当前时钟,则这一条request请求过时,不处理
if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
......
(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
val controllerId = leaderAndISRRequest.controllerId
val correlationId = leaderAndISRRequest.correlationId
controllerEpoch = leaderAndISRRequest.controllerEpoch
// First check partition's leader epoch
val partitionState = new HashMap[Partition, PartitionStateInfo]()
leaderAndISRRequest.partitionStateInfos.foreach{
case ((topic, partitionId), partitionStateInfo) =>
val partition = getOrCreatePartition(topic, partitionId)
val partitionLeaderEpoch = partition.getLeaderEpoch()
//当前请求有效
if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) {
if(partitionStateInfo.allReplicas.contains(config.brokerId))
partitionState.put(partition, partitionStateInfo)
else {
......
}
} else {
responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
}
}
//筛选出改Broker Server上即将成为Leader的Replica
val partitionsTobeLeader = partitionState
.filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
//剩下的就是Follower
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
if (!partitionsTobeLeader.isEmpty)
//进入成为leader的流程
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager)
if (!partitionsToBeFollower.isEmpty)
//进入成为Follower的流程
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager)
if (!hwThreadInitialized) {
//开启highwatermark-checkpoint线程,负责HighWatermark的刷新
startHighWaterMarksCheckPointThread()
hwThreadInitialized = true
}
//关闭空闲的Fetch线程
replicaFetcherManager.shutdownIdleFetcherThreads()
(responseMap, ErrorMapping.NoError)
}
}
}
当Replica成为Leader时,其处理流程如下:
private def makeLeaders(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionStateInfo],
correlationId: Int,
responseMap: mutable.Map[(String, Int), Short],
offsetManager: OffsetManager) = {
......
try {
//删除针对该Replica的Fetch请求
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
......
// 调用Partition的makeLeader流程
partitionState.foreach{ case (partition, partitionStateInfo) =>
partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)}
} catch {
......
}
最终通过Partition的makeLeader来完成Leader状态的Replica初始化。
def makeLeader(controllerId: Int,
partitionStateInfo: PartitionStateInfo, correlationId: Int,
offsetManager: OffsetManager): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// 生成assign replicas的结构化信息
allReplicas.foreach(replica => getOrCreateReplica(replica))
val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
// 删除已经不存在的Replica
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = newInSyncReplicas
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt = Some(localBrokerId)
// construct the high watermark metadata for the new leader replica
//构造Leader状态的Replica的LogOffsetMetadata
val newLeaderReplica = getReplica().get
newLeaderReplica.convertHWToLocalOffsetMetadata()
//初始化Follower状态的Replica的LogOffsetMetadata
assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata)
// 初始化leader状态的高水位
maybeIncrementLeaderHW(newLeaderReplica)
if (topic == OffsetManager.OffsetsTopicName)
offsetManager.loadOffsetsFromLog(partitionId)
true
}
}
当Replica成为Follower时,其处理流程如下:
private def makeFollowers(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionStateInfo],
leaders: Set[Broker],
correlationId: Int,
responseMap: mutable.Map[(String, Int), Short],
offsetManager: OffsetManager) {
......
try {
var partitionsToMakeFollower: Set[Partition] = Set()
partitionState.foreach{ case (partition, partitionStateInfo) =>
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
leaders.find(_.id == newLeaderBrokerId) match {
//只改变那些Leader Broker处于在线状态的分区
case Some(leaderBroker) =>
//调用Partition的makeFollower流程
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
partitionsToMakeFollower += partition
else
......
case None =>
......
partition.getOrCreateReplica()
}
}
//由于leader可能发生变化,则删除旧的Fetch请求
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
//为了保证数据的一致性,需要将Replica的数据截断至highWatermark处
logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
}
if (isShuttingDown.get()) {
......
}
else {
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
new TopicAndPartition(partition) -> BrokerAndInitialOffset(
leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
partition.getReplica().get.logEndOffset.messageOffset)).toMap
//增加新的Fetch请求
replicaFetcherManager.addFetcherForPartitions(
partitionsToMakeFollowerWithLeaderAndOffset)
......
}
}
} catch {
......
}
}
最终通过Partition的makeFollower来完成Follower状态的Replica的初始化
def makeFollower(controllerId: Int,
partitionStateInfo: PartitionStateInfo,
correlationId: Int, offsetManager: OffsetManager): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val newLeaderBrokerId: Int = leaderAndIsr.leader
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// 增加新的Replica
allReplicas.foreach(r => getOrCreateReplica(r))
// remove assigned replicas that have been removed by the controller
//删除已经被controller删除的Replica
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = Set.empty[Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt.foreach { leaderReplica =>
if (topic == OffsetManager.OffsetsTopicName &&
/*如果该Partition的Leader Replica没有变化,则表明其对应的Replica保存不变;否则表面其对应的Replica发生了变化 */
leaderReplica == localBrokerId)
offsetManager.clearOffsetsInPartition(partitionId)
}
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
false
}
else {
leaderReplicaIdOpt = Some(newLeaderBrokerId)
true
}
}
}
以上便是Kafka的副本管理模块ReplicaManager处理LeaderAndIsrRequest请求的全流程,在这个流程之后就确定了Broker Server中Replica的角色,从而保证集群中副本的处理与同步。
版权声明:本文为bao2901203013原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。