RocketMQ源码分析(一)——RocketMQ源码环境搭建
本章是分布式消息中间件(进阶篇)的第一篇,我们开始对RocketMQ的源码进行分析,深入理解RockectMQ的整个架构及核心功能的原理。阅读源码,最核心的一点是:理清系统架构、模块划分、模块关系,然后按照开源框架的工作流程从核心组件开始阅读,在阅读源码的过程中一定要“不求甚解”,抛开细枝末节,抓住核心主干。
一、导入源码
我们先来搭建RocketMQ的源码环境。RocketMQ的开源代码托管在GitHub上:https://github.com/apache/rocketmq,我们clone一份到自己的本地机器:

由于项目采用Maven管理,所以我们可以在根目录执行以下命令进行编译打包:
mvn clean compile -Dmaven.test.skip=true install
然后,我们将Maven项目通过IDE(比如Idea)导入:

可以看到RocketMQ源码的模块结构如下:
broker:存放RocketMQ的Broker相关的代码,这里的代码可以用来启动Broker进程;
client:存放RocketMQ的Producer、Consumer这些客户端的代码,生产消息、消费消息的代码都在里面;
common:存放公共代码;
dev:存放开发相关的一些信息;
distribution:存放用来部署RocketMQ的一些东西,比如bin目录 、conf目录等等;
example:存放RocketMQ的一些例子;
filter:存放RocketMQ的与过滤器相关的代码;
logappender和logging:存放RocketMQ的日志打印相关的东西;
namesvr:存放NameServer的源码;
openmessaging:这是开放消息标准,先忽略;
remoting:存放RocketMQ的远程网络通信模块的代码,基于netty实现;
srvutil:存放一些工具类;
store:存放Broker上存储相关的一些源码;
style、test、tools:存放checkstyle代码检查的东西,一些测试相关的类,还有就是tools里放的一些命令行监控工具类。
二、启动工程
接下来,我们来看下如何启动RocketMQ工程。我们在使用RocketMQ使,肯定是先启动NameServer,因为它是RocketMQ的消息路由中心;接着,肯定是启动Broker,它负责接受消息和消息的存储;最后才是启动Producer和Consumer。
2.1 启动NameServer
要启动NameServer,所以我们先在RocketMQ源码目录中找到namesrv
这个工程,然后找到NamesvrStartup.java
启动类:

NameServer启动时,会去寻找一个名为ROCKETMQ_HOME
的环境变量,这个环境变量的值是我们本机上的RocketMQ的运行时根目录。所以,我们需要先建立一个ROCKETMQ_HOME
目录,例如,我在自己本机上建立了如下根目录:

注意,我们需要在ROCKETMQ_HOME
根目录下创建conf、logs、store三个文件夹,因为后续NameServer运行是需要使用这些目录的。然后我们把RocketMQ源码目录中的distrbution
模块下的broker.conf、logback_namesvr.xml两个配置文件拷贝到上述的conf目录中去,接着就需要修改这两个配置文件的内容:
logback_namesvr.xml:修改里面的所有${user.home},替换为你的ROCKETMQ_HOME
根目录。
broker.conf:修改为以下内容
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# 这是nameserver的地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 这是存储路径,你设置为你的rocketmq运行目录的store子目录
storePathRootDir=C:\Users\Ressmix\Desktop\ROCKETMQ\store
# 这是commitLog的存储路径
storePathCommitLog=C:\Users\Ressmix\Desktop\ROCKETMQ\store/commitlog
# consume queue文件的存储路径
storePathConsumeQueue=C:\Users\Ressmix\Desktop\ROCKETMQ\store/consumequeue
# 消息索引文件的存储路径
storePathIndex=C:\Users\Ressmix\Desktop\ROCKETMQ\store/index
# checkpoint文件的存储路径
storeCheckpoint=C:\Users\Ressmix\Desktop\ROCKETMQ\store/checkpoint
# abort文件的存储路径
abortFile=C:\Users\Ressmix\Desktop\ROCKETMQ/abort
最后,启动NameServer,注意启动时,要先配置好ROCKETMQ_HOME
环境变量,可以在IDE中进行配置:

NameServer启动时会读取conf
里的配置文件,所有的日志都会打印在logs
目录里,然后数据都会写在store
目录里,启动成功之后,在Intellij IDEA的命令行里就会看到下面的提示:
Connected to the target VM, address: '127.0.0.1:54473', transport: 'socket'
The Name Server boot success. serializeType=JSON
2.2 启动Broker
启动Broker的方式和NameServer类似,先在RocketMQ源码目录中找到broker
这个工程,然后找到BrokerStartup.java
启动类:

Broker启动时,需要指定broker.conf
配置,这个就是ROCKETMQ_HOME/conf/broker.conf
,主要是配置了NameServer的地址,然后配置了Broker的数据存储路径:包括commitlog文件、consumequeue文件、index文件、checkpoint文件等等。
除此之外,我们还需要将RocketMQ源码目录中的distrbution
模块下的logback_broker.xml
这个配置文件拷贝到上述的conf目录中去,修改里面的所有${user.home},替换为你的ROCKETMQ_HOME
根目录。
最后,启动Broker,注意要在IDE中进行配置:

启动成功之后,在Intellij IDEA的命令行里就会看到下面的提示:
Connected to the target VM, address: '127.0.0.1:55275', transport: 'socket'
The broker[broker-a, 192.168.3.9:10911] boot success. serializeType=JSON
三、 运行示例
NameServer和Broker都启动成功了,接着我们就可以进行消息投递和消费了。RocketMQ源码里的example
模块下提供了生产者和消费者的示例:

3.1 创建Topic
投递/订阅消息肯定要指定消息的Topic,我们先通过RocketMQ的管理控制台创建一个测试用的Topic——TopicTest
。

我在《RocketMQ性能压测》一章讲过如果搭建RocketMQ的管理控制台,可以参阅该章节内容进行搭建。
3.2 生产者示例
接着,我们来修改一下RocketMQ自带的Producer示例程序:
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
/*
* Launch the instance.
*/
producer.start();
for (int i = 0; i < 1; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
}
}
我们执行运行上面的程序就可以了,他会发送1条消息到Broker里去,我们观察一下控制台的日志打印,如果可以看到下面的内容,就说明我们已经成功的把消息发送到Broker里去了:
SendResult[sendStatus=SEND_OK,msgId=240E03A24CD1B7A0B066027402ACC71F000018B4AAC217E3F1580000,offsetMsgId=C0A8030900002A9F0000000000000000,messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a,queueId=2],queueOffset=0]
3.3 消费者示例
接着,我们来修改一下RocketMQ自带的Consumer示例程序:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
接着运行上述程序,可以看到消费到了1条消息,如下所示:
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=225, queueOffset=0, sysFlag=0, bornTimestamp=1580887214424, bornHost=/192.168.3.9:56600, storeTimestamp=1580887214434, storeHost=/192.168.3.9:10911, msgId=C0A8030900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1580887519080, UNIQ_KEY=240E03A24CD1B7A0B066027402ACC71F000018B4AAC217E3F1580000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
到此为止,我们的RocketMQ的源码环境彻底搭建完毕了。
四、总结
本章,我们讲解了如何搭建RocketMQ的源码环境,因为RocketMQ源码本身是用Java编写的,所以整个过程还是比较简单的。从下一章开始,我们将基于搭建好的环境,开始分析RocketMQ的源码。
感谢赞赏~