原创

Kafka源码分析(二六)——Broker:集群管理——Topic创建

了解了Broker集群的选举以及整体的集群管理机制,我们来看下Kafka创建Topic,以及对分区副本进行管理的流程。通常来说,我们会通过Kafka自带的kafka-topics.sh脚本来创建Topic。那么,当我们指定了一个Topic的分区数、每个分区的副本数之后,Controller(Leader Broker)是如何选择Leader副本?又是如何分配在Broker集群中分配这些副本的呢?

本章,我就对Topic的分区副本分配原理进行讲解。

如果Producer发送消息时指定了一个不存在的Topic,也会默认创建(分区1,副本1),可以通过Broker端的参数auto.create.topics.enable禁止默认创建的行为,生产环境建议禁止掉。

一、创建Topic

通过上一章的讲解,我们应该已经明白,集群中的每个Broker都知道整个集群的元数据信息。所谓元数据信息就是:集群中的每个Broker上有哪些Topic分区,每个Topic的分区信息,这些分区的Leader副本在哪,Follower副本在哪……

1.1 脚本使用

所以,Controller需要对这些Topic的分区进行管理,我以一个Topic的创建作为示例进行讲解,便于大家理解。首先,我们来看Topic的创建流程:



创建Topic通过脚本kafka-topics.sh

# kafka-topics.sh

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

本质是执行了TopicCommand命令:

// TopicCommand.scala

def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  // Topic名称
  val topic = opts.options.valueOf(opts.topicOpt)
  // 配置
  val configs = parseTopicConfigsToBeAdded(opts)
  val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
  try {
    // 1.手动指定分区
    if (opts.options.has(opts.replicaAssignmentOpt)) {
      val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
    } else {
      // 2.自动分配分区
      CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
      // 配置的分区数
      val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
      // 配置的副本数
      val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
      val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
                          else RackAwareMode.Enforced
      // 创建主题
      AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
    }
    println("Created topic \"%s\".".format(topic))
  } catch  {
    case e: TopicExistsException => if (!ifNotExists) throw e
  }
}

我们重点看它的自动分配分区分支,调用了AdminUtils.createTopic()来创建Topic并对分区副本进行分配:

// AdminUtils.scala

def createTopic(zkUtils: ZkUtils,
                topic: String,
                partitions: Int,
                replicationFactor: Int,
                topicConfig: Properties = new Properties,
                rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
  // 1.从Zookeeper中获取Broker集群的元数据信息
  val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
  // 2.基于一定的算法,将分区副本分配给各个Broker
  val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 
                                                             partitions, replicationFactor)’
  // 3.将分配好最终策略,直接写入Zookeeper中的/brokers/topics/[Topic名称]节点中
  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}

可以看到,创建Topic的本质就是根据Topic的分区数、每个分区的副本数,基于一定的算法把它们分配给各个Broker,然后把分配策略写入到Zookeeper中

所谓分区副本分配策略,我这里简单解释下,假设有个Topic设置3个分区,每个分区2个副本,那么分配结果可能就是下面这个样子:

partition1 -> [0,1]        #分区1的Leader副本分配在Broker0,Follower副本分配在Broker1
partition2 -> [2,0]        #分区2的Leader副本分配在Broker2,Follower副本分配在Broker0
partition3 -> [1,2]        #分区3的Leader副本分配在Broker1,Follower副本分配在Broker2

至于具体的分区副本分配算法,我就不赘述了,读者可以自己去AdminUtils.assignReplicasToBrokers方法里瞅一瞅,无非就是类似负载均衡之类的策略,我重点关注分区副本分配的整体流程。

二、副本管理

创建Topic只是将分区副本分配策略写入到了Zookeeper的/brokers/topics/[Topic名称]节点中,那么接下来Controller如何根据策略来进行执行副本分配?如何对副本进行管理呢?

2.1 监听Topic创建

显然,Controller是可以感知到新Topic的创建的,也就是说它会去监听/brokers/topics节点的变化,整个监听的过程我用下面这张图来表示:



我们来看下底层的源码:

// KafkaController.scala

// Broker选举成为Leader后,会调用该方法
def onControllerFailover() {
  if(isRunning) {
    info("Broker %d starting become controller state transition".format(config.brokerId))
    readControllerEpochFromZookeeper()
    incrementControllerEpoch(zkUtils.zkClient)

    registerReassignedPartitionsListener()
    registerIsrChangeNotificationListener()
    registerPreferredReplicaElectionListener()
    // 关键看这里
    partitionStateMachine.registerListeners()
    //...
  }
  else
    info("Controller has been shut down, aborting startup/failover")
}

onControllerFailover方法中调用了PartitionStateMachine.registerListeners(),它会去监听/brokers/topics/节点的变化:

// PartitionStateMachine.scala

def registerListeners() {
  registerTopicChangeListener()
  registerDeleteTopicListener()
}

private def registerTopicChangeListener() = {
  // 监听“/brokers/topics”节点的变化
  zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
}

private def registerDeleteTopicListener() = {
  // 监听“/admin/delete_topics”节点的变化
  zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}

我们来看下TopicChangeListener这个监听器,它的内部就根据“/brokers/topics”节点下的子节点变化,筛选出新增的Topic,然后按照分区维度维护成一个Map[TopicAndPartition, Seq[Int]]

// PartitionStateMachine.scalaSS

class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {

  protected def logName = "TopicChangeListener"

  // 当“/brokers/topics”节点下的子节点发生变化时,会触发Controller调用该方法
  def doHandleChildChange(parentPath: String, children: Seq[String]) {
    inLock(controllerContext.controllerLock) {
      if (hasStarted.get) {
        try {
          val currentChildren = {
            debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
            children.toSet
          }
          // 新创建的分区
          val newTopics = currentChildren -- controllerContext.allTopics
          // 删除的分区
          val deletedTopics = controllerContext.allTopics -- currentChildren
          controllerContext.allTopics = currentChildren
          // 获取分区副本分配策略
          val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
          controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
            !deletedTopics.contains(p._1.topic))
          controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
          info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
            deletedTopics, addedPartitionReplicaAssignment))
          if (newTopics.nonEmpty)
            // 关键看这里,
            controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
        } catch {
          case e: Throwable => error("Error while handling new topic", e)
        }
      }
    }
  }
}

最后,上述代码会调用KafkaController的onNewTopicCreation方法,发送Topic的元数据信息给各个Broker,这个Broker就可以进行一些初始化操作,比如新建分区日志段,准备接受Producer发送过来的消息等等:

// KafkaController.scala

def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
  info("New topic creation callback for %s".format(newPartitions.mkString(",")))
  // subscribe to partition changes
  topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
  // 按照分配策略,发送Topic的元数据信息给各个Broker
  onNewPartitionCreation(newPartitions)
}

三、总结

本章,我对Topic创建的整体流程和底层原理进行了讲解,Controller会监听新Topic的创建,同时对分区副本进行管理,向新的元数据信息发送给集群中的其它Broker。

正文到此结束

感谢赞赏~

本文目录