原创

RocketMQ源码分析(三)——Broker启动流程

上一章,我们分析完了NameServer的启动流程,本章我们就来看下Broker的启动流程。整体上,Broker也是通过脚本启动,最终还是执行broker模块下的BrokerStartup.main()方法。

一、启动入口

1.1 BrokerStartup

我们先来看下BrokerStartup的main方法,其实和NamesrvStartup几乎是一个模子,都是先创建了一个Controller,然后启动它:

public static void main(String[] args) {
    // 创建BrokerController对象并启动
    start(createBrokerController(args));
}

public static BrokerController start(BrokerController controller) {
    try {

        // 启动BrokerController
        controller.start();

        String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
            + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

        if (null != controller.getBrokerConfig().getNamesrvAddr()) {
            tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
        }

        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

上面关键是createBrokerController方法,我们就来分析下BrokerController是如何创建的。

二、创建BrokerController

createBrokerController方法里面的代码非常长,我就省略一些无关紧要的代码了:

public static BrokerController createBrokerController(String[] args) {

    // ...省略Netty缓存区配置的代码

    try {
        // 解析命令行参数,和NamesrvController里面的操作类似
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
            new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
        }

        // 这里是关键,构建了Broker自身的配置对象、底层的Netty服务端配置、Netty客户端配置
        final BrokerConfig brokerConfig = new BrokerConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        final NettyClientConfig nettyClientConfig = new NettyClientConfig();

 nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
            String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));

        // Netty服务端监听10911端口
        nettyServerConfig.setListenPort(10911);

        // Broker用来存储消息的配置
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

        // Slave-Broker的参数设置
        if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
            int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
            messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
        }

        // ...省略解析命令行文件配置的代码

        // 解析配置中的NameServer地址
        String namesrvAddr = brokerConfig.getNamesrvAddr();
        if (null != namesrvAddr) {
            try {
                String[] addrArray = namesrvAddr.split(";");
                for (String addr : addrArray) {
                    RemotingUtil.string2SocketAddress(addr);
                }
            } catch (Exception e) {
                System.out.printf(
                    "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                    namesrvAddr);
                System.exit(-3);
            }
        }

        // 判断Broker的角色,不同角色用不同的特性配置值
        switch (messageStoreConfig.getBrokerRole()) {
            case ASYNC_MASTER:
            case SYNC_MASTER:
                brokerConfig.setBrokerId(MixAll.MASTER_ID);
                break;
            case SLAVE:
                if (brokerConfig.getBrokerId() <= 0) {
                    System.out.printf("Slave's brokerId must be > 0");
                    System.exit(-3);
                }

                break;
            default:
                break;
        }

        // 判断是否开启DLeger机制
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            brokerConfig.setBrokerId(-1);
        }

        // 设置HA监听端口号,这个暂时不管,我们后面会专门讲
        messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);

        // ...省略日志配置的代码

        // 创建一个BrokerController对象
        final BrokerController controller = new BrokerController(
            brokerConfig,
            nettyServerConfig,
            nettyClientConfig,
            messageStoreConfig);
        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        // 初始化BrokerController对象
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }

        // ...省略shutdownhook代码

        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

从上述代码可以看出,创建BrokerController时,核心就做了两件事情:

  1. 解析各种配置,创建BrokerController需要的各种配置对象:BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig;
  2. 调用BrokerController对象的initialize方法,进行初始化。

2.1 BrokerController配置

从创建BrokerController的代码中,我们可以看出,BrokerController依赖的四个核心配置如下:



这些配置其实没什么好说的,就是一些普通的POJO类。所以此时,Broker的整个组件架构应该是这样的:



Broker这个JVM进程运行期间,都是由BrokerController这个管控组件去管理Broker的请求处理、后台线程以及磁盘数据。

2.3 BrokerController对象

在我们继续研究BrokerController如何进行初始化之前,我们先要来看下BrokerController内部到底是什么样的。BrokerController其实就是一个Broker管控组件,它控制着当前运行的这个Broker的行为,包括接收网络请求、包括管理磁盘上的消息数据,以及一大堆的后台线程的运行。

我们来看下BrokerController的构造函数,里面创建了各种各样的组件:

public BrokerController(
    final BrokerConfig brokerConfig,
    final NettyServerConfig nettyServerConfig,
    final NettyClientConfig nettyClientConfig,
    final MessageStoreConfig messageStoreConfig
) {
    // BrokerController的一些核心配置
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;

    // 下面是Broker内部的各个功能组件,这些组件全部由BrokerController掌控
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    this.topicConfigManager = new TopicConfigManager(this);
    this.pullMessageProcessor = new PullMessageProcessor(this);
    this.pullRequestHoldService = new PullRequestHoldService(this);
    this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
    this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
    this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
    this.consumerFilterManager = new ConsumerFilterManager(this);
    this.producerManager = new ProducerManager();
    this.clientHousekeepingService = new ClientHousekeepingService(this);
    this.broker2Client = new Broker2Client(this);
    this.subscriptionGroupManager = new SubscriptionGroupManager(this);
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    this.filterServerManager = new FilterServerManager(this);
    this.slaveSynchronize = new SlaveSynchronize(this);

    // 各种线程池队列
    this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
    this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
    this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
    this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
    this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
    this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
    this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
    this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());

    // metric统计组件
    this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());

    this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));

    // 处理Broker故障的组件
    this.brokerFastFailure = new BrokerFastFailure(this);
    this.configuration = new Configuration(
        log,
        BrokerPathConfigHelper.getBrokerConfigPath(),
        this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
    );
}

上面的各种组件没必要在阅读BrokerController源码时一下子扎进去读,只要大体知道它们是做什么的的就可以了,后面可以针对Broker的某项功能再去深入研究:



2.4 初始化BrokerController

现在BrokerController相关的配置都解析好了,BrokerController对象也创建好了,接下来就要对BrokerController进行初始化了:

public boolean initialize() throws CloneNotSupportedException {
    // 加载Topic配置
    boolean result = this.topicConfigManager.load();
    // 加载Consumer的消费ofset
    result = result && this.consumerOffsetManager.load();
    // 加载Consumer订阅组
    result = result && this.subscriptionGroupManager.load();
    // 加载过滤器
    result = result && this.consumerFilterManager.load();

    if (result) {
        try {
            // 创建消息存储管理组件
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                    this.brokerConfig);

            // 如果启用了DLeger机制,就初始化一堆DLeger相关组件
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
            }

            // 创建Broker统计组件
            this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);

            // 下面这坨暂时不用管
            MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
            this.messageStore = MessageStoreFactory.build(context, this.messageStore);
            this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
        } catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e);
        }
    }
    result = result && this.messageStore.load();

    if (result) {
        // 这里很关键,创建了NettyRemotingServer,作为底层的Netty服务器,接受客户端请求
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);

        // 下面是创建各种线程池,主要有两类:第一类负责处理别人发过来的请求,第二类负责处理自己的一些后台任务
        this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getSendMessageThreadPoolNums(),
            this.brokerConfig.getSendMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.sendThreadPoolQueue,
            new ThreadFactoryImpl("SendMessageThread_"));

        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_"));

        this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.replyThreadPoolQueue,
            new ThreadFactoryImpl("ProcessReplyMessageThread_"));

        this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.queryThreadPoolQueue,
            new ThreadFactoryImpl("QueryMessageThread_"));

        // 这是一个管理Broker命令执行的线程池
        this.adminBrokerExecutor =
 Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                "AdminBrokerThread_"));

        // 管理客户端的线程池
        this.clientManageExecutor = new ThreadPoolExecutor(
            this.brokerConfig.getClientManageThreadPoolNums(),
            this.brokerConfig.getClientManageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.clientManagerThreadPoolQueue,
            new ThreadFactoryImpl("ClientManageThread_"));

        // 这个线程池负责给NameServer发送心跳
        this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.heartbeatThreadPoolQueue,
            new ThreadFactoryImpl("HeartbeatThread_", true));

        // 这是一个跟事务消息有关的线程池
        this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.endTransactionThreadPoolQueue,
            new ThreadFactoryImpl("EndTransactionThread_"));

        // 管理Consumer的线程池
        this.consumerManageExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                "ConsumerManageThread_"));

        this.registerProcessor();

        // 下面都是定时调度线程池的后台执行
        final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
        final long period = 1000 * 60 * 60 * 24;
        // 定时调度Metric统计任务
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.getBrokerStats().record();
                } catch (Throwable e) {
                    log.error("schedule record error.", e);
                }
            }
        }, initialDelay, period, TimeUnit.MILLISECONDS);

        // ...省略其它各种定时调度任务

        // 下面是设置NameServer地址,支持各种配置方式
        if (this.brokerConfig.getNamesrvAddr() != null) {
            this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
            log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
        } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                    } catch (Throwable e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }

        // 下面这些代码跟DLeger机制有关,暂时也不用管
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }
            } else {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable e) {
                            log.error("schedule printMasterAndSlaveDiff error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
        }

        // ...省略其它无关代码

        // 初始化事务消息相关的东西
        initialTransaction();
        // 初始化权限控制相关的东西
        initialAcl();
        // 不用管
        initialRpcHooks();
    }
    return result;
}

很多童鞋看到上面的代码估计都要晕了,其实里面很多东西我们没必要去深究,我们要弄清楚的就是BrokerController的整个启动流程,上述代码的核心逻辑就是:

  1. 对BrokerController内部的各种组件进行配置加载;
  2. 创建一些额外组件(包括核心的NettyRemotingServer);
  3. 创建各种线程池

最核心的,就是BrokerController一旦初始化完成后,就准备好了Netty服务器,可以用于接收网络请求,然后准备好了处理各种请求的线程池,以及各种用于执行后台定时任务的线程池。



三、启动BrokerController

到这里,BrokerController已经完成了初始化:

  • 实现各种功能的核心组件都已经初始化完毕;
  • 底层的Netty服务器也初始化完毕;
  • 负责处理请求的线程池以及执行定时调度任务的线程池也都初始化完毕。

最后,就要调用BrokerController对象的start方法进行启动了。BrokerController的启动,会正式完成内部Netty服务器的启动,然后就可以接收客户端请求了,同时Broker也可以作为Netty客户端向NameServer进行注册以及保持心跳。

从start方法可以看到,其实就是对BrokerController内部的各个组件执行它们自己的start方法,挨个去启动它们:

public void start() throws Exception {
    if (this.messageStore != null) {
        this.messageStore.start();
    }

    // 启动Netty服务器
    if (this.remotingServer != null) {
        this.remotingServer.start();
    }

    if (this.fastRemotingServer != null) {
        this.fastRemotingServer.start();
    }

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }

    // 这里比较关键,因为Broker要发送心跳、注册请求到NameServer,所以它自身也必须是一个Netty客户端,所以这里其实就是启动一个Netty客户端组件
    if (this.brokerOuterAPI != null) {
        this.brokerOuterAPI.start();
    }

    if (this.pullRequestHoldService != null) {
        this.pullRequestHoldService.start();
    }

    if (this.clientHousekeepingService != null) {
        this.clientHousekeepingService.start();
    }

    if (this.filterServerManager != null) {
        this.filterServerManager.start();
    }

    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        startProcessorByHa(messageStoreConfig.getBrokerRole());
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
        this.registerBrokerAll(true, false, true);
    }

    // 这里很关键,用于向线程池提交一个任务,这个任务的作用就是将当前Broker注册到NameServer
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

    if (this.brokerStatsManager != null) {
        this.brokerStatsManager.start();
    }

    if (this.brokerFastFailure != null) {
        this.brokerFastFailure.start();
    }
}

上面的代码核心就是三块:

  1. 启动Netty服务器,用于接收各种客户端的网络请求;
  2. 启动一个BrokerOuterAPI组件,这个组件可以基于Netty客户端发送请求给别人;
  3. 启动一个线程,将当前Broker注册到NameServer。


四、总结

本章,我们详细讲解了Broker的整个启动流程,在阅读Broker和Nameserver的源码过程中,我们也可以体会到,一定要抓住开源框架运行的主体流程和核心组件,而不要陷入各种组件的细节里去。

对于各种细节代码,可以后续从各种场景驱动去翻阅源码,比如Broker的注册和心跳,Producer从NameServer拉取路由信息,Producer根据负载均衡算法选择一个Broker机器,Broker把消息存储到磁盘等等。这样,在阅读源码的过程中才不会半途而废。

正文到此结束

感谢赞赏~

本文目录