原创

RocketMQ源码分析(一)——RocketMQ源码环境搭建

本章是分布式消息中间件(进阶篇)的第一篇,我们开始对RocketMQ的源码进行分析,深入理解RockectMQ的整个架构及核心功能的原理。阅读源码,最核心的一点是:理清系统架构、模块划分、模块关系,然后按照开源框架的工作流程从核心组件开始阅读,在阅读源码的过程中一定要“不求甚解”,抛开细枝末节,抓住核心主干。

一、导入源码

我们先来搭建RocketMQ的源码环境。RocketMQ的开源代码托管在GitHub上:https://github.com/apache/rocketmq,我们clone一份到自己的本地机器:



由于项目采用Maven管理,所以我们可以在根目录执行以下命令进行编译打包:

mvn clean compile -Dmaven.test.skip=true install

然后,我们将Maven项目通过IDE(比如Idea)导入:



可以看到RocketMQ源码的模块结构如下:

  • broker:存放RocketMQ的Broker相关的代码,这里的代码可以用来启动Broker进程;

  • client:存放RocketMQ的Producer、Consumer这些客户端的代码,生产消息、消费消息的代码都在里面;

  • common:存放公共代码;

  • dev:存放开发相关的一些信息;

  • distribution:存放用来部署RocketMQ的一些东西,比如bin目录 、conf目录等等;

  • example:存放RocketMQ的一些例子;

  • filter:存放RocketMQ的与过滤器相关的代码;

  • logappender和logging:存放RocketMQ的日志打印相关的东西;

  • namesvr:存放NameServer的源码;

  • openmessaging:这是开放消息标准,先忽略;

  • remoting:存放RocketMQ的远程网络通信模块的代码,基于netty实现;

  • srvutil:存放一些工具类;

  • store:存放Broker上存储相关的一些源码;

  • style、test、tools:存放checkstyle代码检查的东西,一些测试相关的类,还有就是tools里放的一些命令行监控工具类。

二、启动工程

接下来,我们来看下如何启动RocketMQ工程。我们在使用RocketMQ使,肯定是先启动NameServer,因为它是RocketMQ的消息路由中心;接着,肯定是启动Broker,它负责接受消息和消息的存储;最后才是启动Producer和Consumer。

2.1 启动NameServer

要启动NameServer,所以我们先在RocketMQ源码目录中找到namesrv这个工程,然后找到NamesvrStartup.java启动类:



NameServer启动时,会去寻找一个名为ROCKETMQ_HOME的环境变量,这个环境变量的值是我们本机上的RocketMQ的运行时根目录。所以,我们需要先建立一个ROCKETMQ_HOME目录,例如,我在自己本机上建立了如下根目录:



注意,我们需要在ROCKETMQ_HOME根目录下创建conf、logs、store三个文件夹,因为后续NameServer运行是需要使用这些目录的。然后我们把RocketMQ源码目录中的distrbution模块下的broker.conf、logback_namesvr.xml两个配置文件拷贝到上述的conf目录中去,接着就需要修改这两个配置文件的内容:

logback_namesvr.xml:修改里面的所有${user.home},替换为你的ROCKETMQ_HOME根目录。

broker.conf:修改为以下内容

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# 这是nameserver的地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# 这是存储路径,你设置为你的rocketmq运行目录的store子目录
storePathRootDir=C:\Users\Ressmix\Desktop\ROCKETMQ\store

# 这是commitLog的存储路径
storePathCommitLog=C:\Users\Ressmix\Desktop\ROCKETMQ\store/commitlog

# consume queue文件的存储路径
storePathConsumeQueue=C:\Users\Ressmix\Desktop\ROCKETMQ\store/consumequeue

# 消息索引文件的存储路径
storePathIndex=C:\Users\Ressmix\Desktop\ROCKETMQ\store/index

# checkpoint文件的存储路径
storeCheckpoint=C:\Users\Ressmix\Desktop\ROCKETMQ\store/checkpoint

# abort文件的存储路径
abortFile=C:\Users\Ressmix\Desktop\ROCKETMQ/abort

最后,启动NameServer,注意启动时,要先配置好ROCKETMQ_HOME环境变量,可以在IDE中进行配置:



NameServer启动时会读取conf里的配置文件,所有的日志都会打印在logs目录里,然后数据都会写在store目录里,启动成功之后,在Intellij IDEA的命令行里就会看到下面的提示:

Connected to the target VM, address: '127.0.0.1:54473', transport: 'socket'
The Name Server boot success. serializeType=JSON

2.2 启动Broker

启动Broker的方式和NameServer类似,先在RocketMQ源码目录中找到broker这个工程,然后找到BrokerStartup.java启动类:



Broker启动时,需要指定broker.conf配置,这个就是ROCKETMQ_HOME/conf/broker.conf,主要是配置了NameServer的地址,然后配置了Broker的数据存储路径:包括commitlog文件、consumequeue文件、index文件、checkpoint文件等等。

除此之外,我们还需要将RocketMQ源码目录中的distrbution模块下的logback_broker.xml这个配置文件拷贝到上述的conf目录中去,修改里面的所有${user.home},替换为你的ROCKETMQ_HOME根目录。

最后,启动Broker,注意要在IDE中进行配置:



启动成功之后,在Intellij IDEA的命令行里就会看到下面的提示:

Connected to the target VM, address: '127.0.0.1:55275', transport: 'socket'
The broker[broker-a, 192.168.3.9:10911] boot success. serializeType=JSON

三、 运行示例

NameServer和Broker都启动成功了,接着我们就可以进行消息投递和消费了。RocketMQ源码里的example模块下提供了生产者和消费者的示例:



3.1 创建Topic

投递/订阅消息肯定要指定消息的Topic,我们先通过RocketMQ的管理控制台创建一个测试用的Topic——TopicTest



我在《RocketMQ性能压测》一章讲过如果搭建RocketMQ的管理控制台,可以参阅该章节内容进行搭建。

3.2 生产者示例

接着,我们来修改一下RocketMQ自带的Producer示例程序:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        // 设置NameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");


        /*
         * Launch the instance.
         */
        producer.start();

        for (int i = 0; i < 1; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();
    }
}

我们执行运行上面的程序就可以了,他会发送1条消息到Broker里去,我们观察一下控制台的日志打印,如果可以看到下面的内容,就说明我们已经成功的把消息发送到Broker里去了:

SendResult[sendStatus=SEND_OK,msgId=240E03A24CD1B7A0B066027402ACC71F000018B4AAC217E3F1580000,offsetMsgId=C0A8030900002A9F0000000000000000,messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a,queueId=2],queueOffset=0]

3.3 消费者示例

接着,我们来修改一下RocketMQ自带的Consumer示例程序:

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        // NameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        /*
         * Specify where to start in case the specified consumer group is a brand new one.
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more more topics to consume.
         */
        consumer.subscribe("TopicTest", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

接着运行上述程序,可以看到消费到了1条消息,如下所示:

ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=225, queueOffset=0, sysFlag=0, bornTimestamp=1580887214424, bornHost=/192.168.3.9:56600, storeTimestamp=1580887214434, storeHost=/192.168.3.9:10911, msgId=C0A8030900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1580887519080, UNIQ_KEY=240E03A24CD1B7A0B066027402ACC71F000018B4AAC217E3F1580000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]

到此为止,我们的RocketMQ的源码环境彻底搭建完毕了。

四、总结

本章,我们讲解了如何搭建RocketMQ的源码环境,因为RocketMQ源码本身是用Java编写的,所以整个过程还是比较简单的。从下一章开始,我们将基于搭建好的环境,开始分析RocketMQ的源码。

正文到此结束
本文目录