原创

Kafka源码分析(二三)——Broker:副本同步——Leader侧整体流程

Broker接受到Follower发送过来的Fetch请求后,会由Broker的Kafka API层处理,然后由各个Leader分区进行处理:

// KafkaApis.scala

class KafkaApis(val requestChannel: RequestChannel,
                val replicaManager: ReplicaManager,
                val adminManager: AdminManager,
                val coordinator: GroupCoordinator,
                val controller: KafkaController,
                val zkUtils: ZkUtils,
                val brokerId: Int,
                val config: KafkaConfig,
                val metadataCache: MetadataCache,
                val metrics: Metrics,
                val authorizer: Option[Authorizer],
                val quotas: QuotaManagers,
                val clusterId: String,
                time: Time) extends Logging {

def handle(request: RequestChannel.Request) {
  try {
    ApiKeys.forId(request.requestId) match {
      case ApiKeys.PRODUCE => handleProducerRequest(request)
      // 处理Fetch请求
      case ApiKeys.FETCH => handleFetchRequest(request)
      //...
  }
  //...
}

本章,我们就来看看Leader分区对Fetch请求的处理流程,包括:

  1. Leader分区从本地磁盘读取指定offset之后数据的流程;
  2. Leader分区的HW和LEO更新机制;
  3. Leader分区对ISR列表的维护机制;
  4. Leader分区采用时间轮机制延迟执行Fetch。

一、消息读取

我们来看下KafkaApis的handleFetchRequest方法,消息读取的本质最终还是调用了ReplicaManager.fetchMessages()

// KafkaApis.scala

def handleFetchRequest(request: RequestChannel.Request) {
  //...

  if (authorizedRequestInfo.isEmpty)
    sendResponseCallback(Seq.empty)
  else {
    // call the replica manager to fetch messages from the local replica
    replicaManager.fetchMessages(
      fetchRequest.maxWait.toLong,
      fetchRequest.replicaId,
      fetchRequest.minBytes,
      fetchRequest.maxBytes,
      versionId <= 2,
      authorizedRequestInfo,
      replicationQuota(fetchRequest),
      sendResponseCallback)
  }
}

整个流程可以用下面这张图概述:



1.1 LogSegment

ReplicaManager.fetchMessages()读取消息的流程我不赘述了,顺着源码一层层往下读就是了。我在日志子系统中已经讲过,日志最终由LogSegment进行读写,所以最关键的是下面的代码:

// LogSement.scala
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
         minOneMessage: Boolean = false): FetchDataInfo = {
  if (maxSize < 0)
    throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

  val logSize = log.sizeInBytes // this may change, need to save a consistent copy
  // 关键是这个translateOffset方法,根据稀疏索引,定位要读的消息的起始物理位置
  val startOffsetAndSize = translateOffset(startOffset)

  // if the start position is already off the end of the log, return null
  if (startOffsetAndSize == null)
    return null

  val startPosition = startOffsetAndSize.position.toInt
  val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)

  val adjustedMaxSize =
    if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
    else maxSize

  // return a log segment but with zero size in the case below
  if (adjustedMaxSize == 0)
    return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

  // calculate the length of the message set to read based on whether or not they gave us a maxOffset
  val length = maxOffset match {
    case None =>
      // no max offset, just read until the max position
      min((maxPosition - startPosition).toInt, adjustedMaxSize)
    case Some(offset) =>
      if (offset < startOffset)
        return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
      val mapping = translateOffset(offset, startPosition)
      val endPosition =
        if (mapping == null)
          logSize // the max offset is off the end of the log, use the end of the file
        else
          mapping.position
      min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
  }

  FetchDataInfo(offsetMetadata, log.read(startPosition, length),
    firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}

LogSegment.translateOffset()方法就是根据二分查找算法去稀疏索引里找数据的物理位置:

// LogSegment.scala
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogEntryPosition = {
    val mapping = index.lookup(offset)
    log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}

位移索引OffsetIndex中的lookup方法:

// OffsetIndex.scala
def lookup(targetOffset: Long): OffsetPosition = {
  maybeLock(lock) {
    val idx = mmap.duplicate
    val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY)
    if(slot == -1)
      OffsetPosition(baseOffset, 0)
    else
      parseEntry(idx, slot).asInstanceOf[OffsetPosition]
  }
}

1.2 响应

读取完消息后,最终还是由KafkaApis将响应发送到RequestChannel的responseQueue中,然后调用回调函数将结果响应给发送者:

// KafkaApis.scala

def fetchResponseCallback(delayTimeMs: Int) {
  trace(s"Sending fetch response to client $clientId of " +
    s"${convertedPartitionData.map { case (_, v) => v.records.sizeInBytes }.sum} bytes")
  val fetchResponse = if (delayTimeMs > 0) new FetchResponse(versionId, fetchedPartitionData, delayTimeMs) else response
  // 将响应交给RequestChannel处理
  requestChannel.sendResponse(new RequestChannel.Response(request, fetchResponse))
}

二、维护LEO

每次Follower发送一个Fetch请求时,都会带上自己的LEO,所以Leader分区会在本地维护所有Follower的LEO。然后,Leader会判断各个Follower的LEO是否都超出了当前的HW了,如果是的话,就把自己的HW向后推移,推移了Leader HW之后,就可以返回最新的HW给Follower,进一步推动Follower的HW发送变化。

我们来看具体的代码实现,就是在ReplicaManager的fetchMessages方法中:

// ReplicaManager.scala

def fetchMessages(timeout: Long,
                  replicaId: Int,
                  fetchMinBytes: Int,
                  fetchMaxBytes: Int,
                  hardMaxBytesLimit: Boolean,
                  fetchInfos: Seq[(TopicPartition, PartitionData)],
                  quota: ReplicaQuota = UnboundedQuota,
                  responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {
  val isFromFollower = replicaId >= 0
  val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
  val fetchOnlyCommitted: Boolean = !Request.isValidBrokerId(replicaId)

  // 读取消息
  val logReadResults = readFromLocalLog(
    replicaId = replicaId,
    fetchOnlyFromLeader = fetchOnlyFromLeader,
    readOnlyCommitted = fetchOnlyCommitted,
    fetchMaxBytes = fetchMaxBytes,
    hardMaxBytesLimit = hardMaxBytesLimit,
    readPartitionInfo = fetchInfos,
    quota = quota)

  // 关键是这里,更新本地维护的各个Follower的LEO
  if (Request.isValidBrokerId(replicaId))
    updateFollowerLogReadResults(replicaId, logReadResults)
}

private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]) {
  readResults.foreach { case (topicPartition, readResult) =>
    getPartition(topicPartition) match {
      case Some(partition) =>
        // 更新各个分区的LEO
        partition.updateReplicaLogReadResult(replicaId, readResult)
        tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicPartition))
      case None =>
        warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicPartition))
    }
  }
}

可以看到最终是调用了Partition的updateReplicaLogReadResult方法:

// Partition.scala

def updateReplicaLogReadResult(replicaId: Int, logReadResult: LogReadResult) {
  getReplica(replicaId) match {
    case Some(replica) =>
      // 更新副本的LEO
      replica.updateLogReadResult(logReadResult)
      // 判断是否要伸缩ISR
      maybeExpandIsr(replicaId, logReadResult)
    case None =>
        //...
  }
}

层层调用......所以说,讲源码系列的专栏真是的很难写......最终又调用了Replica.updateLogReadResult()`:

// Replica.scala

def updateLogReadResult(logReadResult : LogReadResult) {
  // 更新LEO
  if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)
    _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs)
  else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
    _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)

  logEndOffset = logReadResult.info.fetchOffsetMetadata
  lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
  lastFetchTimeMs = logReadResult.fetchTimeMs
}

三、维护ISR和HW

ReplicaManager启动后会创建两个定时调度任务,用来维护ISR列表:

// ReplicaManager.scala
def startup() {
  // 周期检测ISR列表,默认5s
  scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
  scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
}

关键来看第一个maybeShrinkIsr任务,内部调用了Partition的maybeShrinkIsr方法:

private def maybeShrinkIsr(): Unit = {
    trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
    allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
}

3.1 踢出ISR

我们先来看什么情况下副本会被踢出ISR列表,Partition.maybeShrinkIsr方法的代码如下,一般有两种情况会导致副本失效,从ISR剔除 :

  1. Follower 副本进程卡住,在一段时间内根本没有向 Leader 副本发起同步请求,比如频繁的 Full GC ;
  2. Follower 副本进程同步过慢,在一段时间内(默认10s)都无法追赶上 Leader 副本,比如I/O开销过大。
// Partition.scala

def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
  val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
    leaderReplicaIfLocal match {
      case Some(leaderReplica) =>
        // OSR列表
        val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
        if(outOfSyncReplicas.nonEmpty) {
          // 新ISR列表
          val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
          assert(newInSyncReplicas.nonEmpty)
          // update ISR in zk and in cache
          updateIsr(newInSyncReplicas)
          // we may need to increment high watermark since ISR could be down to 1
          replicaManager.isrShrinkRate.mark()
          // 更新HW
          maybeIncrementLeaderHW(leaderReplica)
        } else {
          false
        }

      case None => false // do nothing if no longer leader
    }
  }

  // some delayed operations may be unblocked after HW changed
  if (leaderHWIncremented)
    tryCompleteDelayedRequests()
}

// 获取OSR列表 
def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
  val candidateReplicas = inSyncReplicas - leaderReplica
  // 过滤落后的副本
  val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
  if (laggingReplicas.nonEmpty)
    debug("Lagging replicas for partition %s are %s".format(topicPartition, laggingReplicas.map(_.brokerId).mkString(",")))

  laggingReplicas
}

3.2 加入ISR

再来看Follower重新加入ISR的情况,核心就是判断副本的LEO是否大于Leader的HW,大于的话就重新加入:

// // Partition.scala

def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult) {
  val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
    // check if this replica needs to be added to the ISR
    leaderReplicaIfLocal match {
      case Some(leaderReplica) =>
        val replica = getReplica(replicaId).get
        val leaderHW = leaderReplica.highWatermark
        if(!inSyncReplicas.contains(replica) &&
           assignedReplicas.map(_.brokerId).contains(replicaId) &&
           // 核心就是这里的判断,副本的LEO是否大于Leader的HW
           replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
          val newInSyncReplicas = inSyncReplicas + replica
          updateIsr(newInSyncReplicas)
          replicaManager.isrExpandRate.mark()
        }
        // 增加Leader的HW
        maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)
      case None => false // nothing to do if no longer leader
    }
  }

  // some delayed operations may be unblocked after HW changed
  if (leaderHWIncremented)
    tryCompleteDelayedRequests()
}

3.3 维护HW

如果ISR列表发生变化或任何一个Follower的LEO发生变化,那么Leader的HW也可能随之变化,算法很简单,就是取ISR中的所有副本的LEO的最小值:

// Partition.scala

private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {
  // 获取ISR中所有副本的LEO
  val allLogEndOffsets = assignedReplicas.filter { replica =>
    curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica)
  }.map(_.logEndOffset)
  // 取各个副本的LEO最小值作为HW
  val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
  val oldHighWatermark = leaderReplica.highWatermark
  if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
    leaderReplica.highWatermark = newHighWatermark
    true
  } else {
    false
  }
}

五、总结

本章,我对Leader侧的副本同步的整体流程进行了讲解,主要涉及以下几个核心要点:

  1. Leader侧的消息读取流程;
  2. Leader侧的LEO和HW维护;
  3. Leader侧的ISR列表维护;
正文到此结束

感谢赞赏~

本文目录