原创

Kafka源码分析(二七)——Broker:集群管理——分区重分配

当Kafka集群中的一个Broker节点突然宕机,那么这个节点上的分区副本就已经处于功能失效的状态了,Kafka并不会自动将这些失效分区副本迁移到其它Broker节点上;或者说,当集群中新增一个Broker时,这个Broker上没有任何分区副本,Kafka也不会自动将其它Broker上已有的一些分区副本重分配到该新加入的Broker中。

所以,这就是涉及到了分区重分配(reblance)问题,本章,我就对分区重分配的整体流程和底层原理进行讲解。

一、重分配策略

执行分区重分配,第一件事情就是要生成重分配方案。Kafka提供了 kafka-reassign-partitions.sh 脚本,用来生成重分配策略,并执行分区重分配工作,它可以在集群扩容、 Broker节点挂掉的场景下对分区副本进行迁移。

1.1 脚本使用

我们先来看下kafka-reassign-partitions.sh 脚本的基本使用,主要包含三个步骤:

  1. 创建一个包含要进行分区重分配的Topic清单的JSON 文件;
  2. 根据Topic清单和Broker节点清单生成一份重分配方案
  3. 根据方案执行重分配动作。

我通过一个示例来讲解下,假设现有3个Broker组成的Kafka集群,一个名为topic-reassign的主题,包含4个分区,每个分区有2个副本,当前的分区副本分配状态如下:

Topic : topic-reassign    Partition : 0    Leader: 0    Replicas: 0,2    Isr : 0,2
Topic : topic-reassign    Partition : 1    Leader: 1    Replicas: 1,0    Isr : 1,0
Topic : topic-reassign    Partition : 2    Leader: 2    Replicas: 2,1    Isr : 2,1
Topic : topic-reassign    Partition : 3    Leader: 0    Replicas: 0,1    Isr : 0,1

Partition表示分区ID,Leader表示该分区的Leader副本所在的Broker ID,Replicas表示该分区的所有副本分配在哪些Broker上(即值为BrokerID),Isr表示该分区的ISR列表。

假设我现在需要将Broker ID为1的节点下线,那么首先要做的就是将它上面的分区副本迁移出去,使用kafka-reassign-partitions.sh脚本创建一个 JSON 文件,文件内容为要进行分区重分配的Topic清单 :

# reassign.json 
{
    "topics": [
        {
            "topic": "topic-reassign"
        }
    ],
    "version": 1
}

接着,根据这个JSON 文件,然后指定所要分配的Broker节点列表,通过命令生成一份重分配方案 :

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --generate --topics-to-move-json-file reassign.json --broker-list 0,2

执行上述命令后,会生成两串JSON格式的内容。Current partition replica assignment表示当前的分区副本分配情况,Proposed partition reassignment configuration就是生成的一份供参考的重分配方案:

Current partition replica assignment
{
    "version": 1,
    "partitions": [{
        "topic": "topic-reassign",
        "partition": 0,
        "replicas": [0, 2],
        "log_dirs": ["any", "any"]
    }, {
        "topic": "topic-reassign",
        "partition": 1,
        "replicas": [1, 0],
        "log_dirs": ["any", "any"]
    }, {
        "topic": "topic-reassign",
        "partition": 2,
        "replicas": [2, 1],
        "log_dirs": ["any", "any"]
    }, {
        "topic": "topic-reassign",
        "partition": 3,
        "replicas": [0, 1],
        "log_dirs": ["any", "any"]
    }]
}

Proposed partition reassignment configuration
{
    "version": 1,
    "partitions": [{
        "topic": "topic-reassign",
        "partition": 2,
        "replicas": [2, 0],
        "log_dirs": ["any", "any"]
    }, {
        "topic": "topic-reassign",
        "partition": 1,
        "replicas": [0, 2],
        "log_dirs": ["any", "any"]
    }, {
        "topic": "topic-reassign",
        "partition": 3,
        "replicas": [0, 2],
        "log_dirs": ["any", "any"]
    }, {
        "topic": "topic-reassign",
        "partition": 0,
        "replicas": [2, 0],
        "log_dirs": ["any", "any"]
    }]
}

注意:这里只是生成一份可行性的方案,并没有真正执行重分配的动作。另外,一般要将Current partition replica assignment的内容保存起来,以备后续的回滚操作。

最后,将第二个 JSON 内容保存在一个 JSON 文件中,假定这个文件的名称为project.json,然后执行以下命令完成分区副本重分配:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file project.json --execute

另外,我们还可以验证查看分区重分配的进度:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file project.json --verify

# 输出信息:
Status of partition reassignment:
Reassignment of partition topic-reassign completed successfully
Reassignment of partition topic-reassign-2 completed successfully
Reassignment of partition topic-reassign-1 completed successfully
Reassignment of partition topic-reassign-0 completed successfully
Reassignment of partition topic-reassign-3 completed successfully

二、底层原理

了解了分区副本重分配的流程,我就带大家来看看底层的源码。事实上,分区副本重分配的本质,就是将新分配方案写入到Zookeeper的admin/reassign_partitions节点中,由Controller Broker监听变动并通知各个Broker进行数据复制迁移



2.1 监听重分配

我们已经知道了最终执行分区重分配是通过脚本kafka-reassign-partitions.sh

# kafka-reassign-partitions.sh
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand "$@"

我们看ReassignPartitionsCommand执行重分配的方法executeAssignment

// ReassignPartitionsCommand.scala

object ReassignPartitionsCommand extends Logging {

  def main(args: Array[String]): Unit = {
    val opts = validateAndParseArgs(args)
    val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
    val zkUtils = ZkUtils(zkConnect,
                          30000,
                          30000,
                          JaasUtils.isZkSecurityEnabled())
    try {
      // 验证
      if(opts.options.has(opts.verifyOpt))
        verifyAssignment(zkUtils, opts)
      // 生成方案
      else if(opts.options.has(opts.generateOpt))
        generateAssignment(zkUtils, opts)
      // 执行
      else if (opts.options.has(opts.executeOpt))
        executeAssignment(zkUtils, opts)
    } catch {
      case e: Throwable =>
        println("Partitions reassignment failed due to " + e.getMessage)
        println(Utils.stackTrace(e))
    } finally zkUtils.close()
  }
}

最终调用了reassignPartitions方法,将分区分配方案写入admin/reassign_partitions节点:

// ReassignPartitionsCommand.scala

def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String, throttle: Long = -1) {
  val partitionsToBeReassigned = parseAndValidate(zkUtils, reassignmentJsonString)
  val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, partitionsToBeReassigned.toMap)

  // If there is an existing rebalance running, attempt to change its throttle
  if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) {
    println("There is an existing assignment running.")
    reassignPartitionsCommand.maybeLimit(throttle)
  }
  else {
    if (throttle >= 0)
    // 执行命令
    if (reassignPartitionsCommand.reassignPartitions(throttle)) {
      println("Successfully started reassignment of partitions.")
    } else
      println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
  }
}

def reassignPartitions(throttle: Long = -1): Boolean = {
  maybeThrottle(throttle)
  try {
    val validPartitions = proposedAssignment.filter { case (p, _) => validatePartition(zkUtils, p.topic, p.partition) }
    if (validPartitions.isEmpty) false
    else {
      val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions)
      // 将分区分配方案写入/admin/reassign_partitions节点
      zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
      true
    }
  } catch {
      //...
  }
}

当Controller Broker监听到/admin/reassign_partitions节点的变动后,就会开始执行分区重分配:

// KafkaController.scala

// 监听到`/admin/reassign_partitions`节点的变动后,执行该方法
def doHandleDataChange(dataPath: String, data: AnyRef) {
  debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
    .format(dataPath, data))
  val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
  val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
    partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
  }
  // 遍历每一个分区,按照方案对该分区的副本进行重分配
  partitionsToBeReassigned.foreach { partitionToBeReassigned =>
    inLock(controllerContext.controllerLock) {
      if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
        error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
          .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
        controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
      } else {
        // 关键是这里,对该分区进行重分配
        val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
        controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
      }
    }
  }
}

// 针对指定的分区
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
                                      reassignedPartitionContext: ReassignedPartitionsContext) {
  val newReplicas = reassignedPartitionContext.newReplicas
  val topic = topicAndPartition.topic
  val partition = topicAndPartition.partition
  try {
    val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
    assignedReplicasOpt match {
      case Some(assignedReplicas) =>
        if (assignedReplicas == newReplicas) {
          throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
            " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
        } else {
          // 将分区的副本分配到各个新的Broker
          info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
          watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
          controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
          deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
          // 关键是这里
          onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
        }
      case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
        .format(topicAndPartition))
    }
  } catch {
    //...
  }
}

2.2 执行重分配

最核心的其实就是KafkaController.onPartitionReassignment()方法,该方法包含了对某个分区的副本执行重分配的整个流程。

我们回顾一下Topic的创建流程:每次创建完一个Topic,都会将该Topic分区副本的默认分配方案写入Zookeeper的/brokers/topics/[Topic名称]节点中,然后由Controller负责将新的集群元数据广播给其它Broker,其它Broker则开始新建日志段,接受Producer请求等操作……

分区重分配的流程本质其实也是相似的,只不过Controller会去/admin/reassign_partitions目录下获取分区重分配方案,然后广播集群元数据,让各个Broker执行日志复制。

我举个例子来帮助大家理解下分区副本的Reblance流程。假设现在有一个名为reassign的Topic,它的某个分区原先的副本分配方案为{1, 2, 3},最终要执行的新方案{4, 5, 6},那么对于原来Broker1、2、3上的副本,都需要进行重新分配到Broker4、5、6上,具体的过程如下:

  1. 首先,向Zookeeper中写入副本分配方案:{1, 2, 3, 4, 5, 6};
  2. 然后,向每个Broker都发送一个LeaderAndIsr请求,这样各个Broker上的副本都会向Leader副本请求数据进行复制;
  3. 等待直到步骤2中的所有Follower副本都同步完成,此时ISR={1, 2, 3, 4, 5, 6};
  4. 接着,在新分配副本的Broker{4, 5, 6}中选举一个Leader,比如这里是Broker4;
  5. 最后,从ISR中剔除{1, 2, 3}这三个副本,然后在Zookeeper中也删除{1, 2, 3}这三个副本,然后由Controller广播集群元数据。

上述步骤中,有几个比较重要的概念,我这里简单讲解下,关键是理解,对于每一个分区而言,都有下面几个集合:

  • RAR:重分配后的副本状态,比如{1, 2, 3}表示副本分别分布在Broker1、Broker2、Broker3,Leader副本在Broker1上;
  • OAR:初始的副本状态;
  • AR:当前的副本状态,随着重分配过程不断变化;
  • RAR-OAR:RAR与OAR的差集,即需要创建、数据迁移的新副本;
  • OAR-RAR:OAR与RAR的差集,即迁移后需要下线的副本。

可以用下面这张图来理解整个过程:



最后,给出onPartitionReassignment的源码,理解了整个流程,代码略读即可,关键是理解这个思路:

// KafkaController.scala

// 对分区topicAndPartition执行重分配方案
def onPartitionReassignment(topicAndPartition: TopicAndPartition, 
                            reassignedPartitionContext: ReassignedPartitionsContext) {
  val reassignedReplicas = reassignedPartitionContext.newReplicas
  // 如果新分配的副本没有全在ISR中
  if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
    // 需要创建、数据迁移的新副本
    val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
    val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
    // 1.在Zookeeper的“/brokers/topics/[Topic名称]”中写入该主题分区的副本分配方案:OAR + RAR
    updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
    // 2.发送LeaderAndIsr请求给OAR + RAR中每一个副本
    updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
      newAndOldReplicas.toSeq)
    // 3.新分配的副本状态更新为NewReplica,此时新副本会开始创建并同步数据
    startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
  } else { //等待所有的RAR都在ISR中

      // 4.迁移后需要下线的副本: OAR - RAR  
    val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
    // 5.将副本状态设置为OnlineReplica
    reassignedReplicas.foreach { replica =>
      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
        replica)), OnlineReplica)
    }
    // 6.将上下文中的AR设置为RAR
    // 7.新加入的副本已经同步完成, LeaderAndIsr都更新到最新的结果
    moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
    // 8/9.将旧的副本下线
    stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
    // 10.将ZK中的AR设置为RAR
    updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
    // 11.分区重分配完成, 在ZK中删除/admin/reassign_partitions节点中的迁移方案
    removePartitionFromReassignedPartitions(topicAndPartition)
    info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
    controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
    // 12.发送metadata更新请求给所有Broker
    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
    //...
  }
}

三、总结

本章,我对Kafka的分区重分配流程和底层实现原理进行了讲解,分区重分配本质在于数据复制,先增加新的副本,然后进行数据同步,最后删除旧的副本来达到最终目的。

数据复制会占用额外的资源,所以,分区重分配一定要在低峰值时期执行。另外,可以减小重分配的粒度,以小批次的方式来操作是一种可行的解决思路 。但是,如果集群中某个分区的流量即使在低峰时期还是特别大,那么就需要采取限流机制,Kafka默认提供了两种复制限流的方式:通过脚本kafka-config. shkafka-reassign-partitions.sh实现。我就不赘述了,读者可以找相关资料了解。

正文到此结束

感谢赞赏~

本文目录