原创

分布式消息中间件(四)——RocketMQ性能压测

上一章,我们介绍了RocketMQ的生产部署,由于在正式生产部署前一般都需要进行性能测试,所以本章我们来看下如何对RocketMQ进行性能测试。

一、性能指标监控

既然是性能测试,那么必然要看RocketMQ集群能承载的最高QPS是多少?同时在承载这个QPS的同时,各个机器的CPU、IO、磁盘、网络、内存的负载情况,以及JVM的GC情况等等。

我们如何去观察这些指标吗?通常来说,指标分为两部分:

  1. 机器本身指标:也就是上述说的CPU、内存、IO等等;
  2. RocketMQ指标:生产消息TPS、消费消息TPS、集群整体情况等等;

1.1 机器指标

对于机器指标,最简单的方式我们可以通过OS自身的一些命令进行观察,比如topfree等等,后面在压测环节我会具体讲。

一般大一点的公司都会有自己的监控平台,比如Zabbix、Open-Falcon等等,通过它们就可以对机器指标有一个较好的监控。

1.2 RocketMQ管理平台

RoeketMQ自身提供了可视化的管理界面,可以看到NameServer、Broker、Topic等基本信息。我们先来看下如何启动这个管理应用。

我们首先在任意一台部署了NameServer的机器上,执行以下命令:

git clone https://github.com/apache/rocketmq-externals.git

然后进入rocketmq-console目录,进行打包:

mvn package -DskipTests

最后启动管理平台:

java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=127.0.0.1:9876

然后就可以通过浏览器访问8080端口进行管理了:



二、系统参数调整

对于生产环境的RocketMQ集群,我们一般不能直接用默认参数启动,因为那样可能没法把中间件的性能发挥出来。所以,需要对部署MQ的机器的一些参数进行调整。

2.1 OS内核参数调整

由于OS的内核参数默认值未必适合生产环境下的MQ运行,所以一般需要根据机器配置进行调整。

vm.overcommit_memory

vm.overcommit_memory这个参数有三个值可以选择:0、1、2。
值0:中间件在申请内存时,OS内核会检查机器可用内存是否足够,如果足够就分配内存给MQ,如果剩余内存不足,则拒绝申请,此时会导致中间件出现异常;

因此一般需要将这个参数值调整为1,表示允许分配所有可用的物理内存,只要有内存就给MQ来用,这样可以避免申请内存失败的问题。通过如下命令修改:

echo 'vm.overcommit_memory=1' >> /etc/sysctl.conf

比如,Redis在save数据快照到磁盘文件的时候,需要申请大内存,如果vm.overcommit_memory这个参数值为0,就很可能被拒绝,进而导致异常报错。

vm.max_map_count

这个参数会影响中间件可以开启的线程数量,如果值过小,有时可能会导致有些中间件无法开启足够的线程,进而报错。

它的默认值是65536,在高并发场景下一般是不够的,因此建议把这个参数调大10倍,比如655360,保证中间件可以开启足够多的线程。 通过如下命令修改:

echo 'vm.max_map_count=655360' >> /etc/sysctl.conf

vm.swappiness

这个参数用来控制进程的swap行为。所谓进程swap,就是OS会把一部分磁盘空间作为swap区域,如果有的进程当前不是太活跃,就会被OS调整为睡眠状态,把进程中的数据放入磁盘上的swap区域,然后让这个进程把原来占用的内存腾出来,以节省内存资源的使用。

vm.swappiness的取值在0到100之间,默认为60: 如果值设置为0,表示尽量别把任何一个进程放到磁盘swap区域去,尽量用物理内存。如果值是100,表示尽量把进程放到磁盘swap区域去,以腾出内存。

因此,生产环境建议把这个参数调整小一些,比如设置为10,尽量用物理内存,减少swap行为。 可以用如下命令修改:

echo 'vm.swappiness=10' >> /etc/sysctl.conf

ulimit

这个参数用来控制Linux上的最大文件链接数,默认值1024。当程序在频繁的读写磁盘文件时,或者是进行网络通信时,都会跟这个参数有关系。

对于一个生产环境下的中间件,默认值是不够的,如果采用默认值,线上可能会出现如下错误:error: too many open files

因此通常建议用如下命令修改这个值:

echo 'ulimit -n 1000000' >> /etc/profile

其实大家思考一下这几个参数,会发现最后要调整的参数,无非就是跟IO、网络通信、内存、线程数量有关,因为中间件在运行的时候主要就是跟这些打交道。

2.2 JVM参数调整

RocketMQ本身是基于Java开发的,所以运行时是基于JVM的,那么就必然涉及对JVM参数的一些调整。



RocketMQ的JVM相关配置可以在启动脚本中设置,在rocketmq/distribution/target/apache-rocketmq/bin目录下,就有对应的启动脚本,比如mqbroker是用来启动Broker的,mqnamesvr是用来启动NameServer的。

我们以mqbroker为例来看下,它的文件内容最后有如下一行:

sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@

上述runbroker.sh是其真正的启动脚本,内容如下:

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

上面就是一些JVM启动时的默认参数,包括堆内存大小、新生代大小、垃圾回收类型及配置,等等。可以看到,RocketMQ默认采用G1作为垃圾回收器,默认堆内存8G。

我们需要根据机器的实际配置来调整上面的参数,对于JVM调优不熟悉的童鞋,可以去看我的专栏《JVM从基础到实战系列》

2.3 MQ参数调整

RocketMQ自身也有些参数,我们需要对其进行调整。

比如,较为核心的是sendMessageThreadPoolNums,默认值是16,表示RocketMQ内部用来发送消息的线程池中的线程数量。 这个参数可以根据机器的CPU核数进行适当调整,比如机器CPU是24核,那就增加这个线程数量到24或者30。

sendMessageThreadPoolNums在目录rocketmq/distribution/target/apache-rocketmq/conf/dledger下, RocketMQ还有一些其它的核心参数,在后续章节中我们再一一分析。

三、压力测试

接着,我们就要利用生产者和消息者对已经部署好的RocketMQ进行压测了。回顾下系统的逻辑架构图:



我们需要新建Maven两个工程,一个是生产者,一个是消费者,两个工程都需要加入下面的Maven依赖,然后分别打包并部署到上述对应的四台机器上:

 <dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client</artifactId>
     <version>4.5.1</version>
</dependency>

3.1 生产者

生产者程序就是开启多个线程,不断的往Broker里生产消息:

public class Producer {
    public static void main(String[] args) throws Exception {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("test_producer");

        // 设置NameServer地址
        producer.setNamesrvAddr("IP:PORT");

        /*
         * Launch the instance.
         */
        producer.start();

        for (int i = 0; i < 80; i++) {
            new Thread(){
                public void run(){
                    while(true){
                         Message msg = new Message("TopicTest", "TagA", 
                            ("test").getBytes(RemotingHelper.DEFAULT_CHARSET));
                         SendResult sendResult = producer.send(msg);
                         System.out.printf("%s%n", sendResult);
                    }
                }
            }.start();
        }

        while(true){
            Thread.sleep(1000);
        }
    }
}

3.2 消费者

生产者程序就是不断的从Broker里获取消息并处理:

public class Consumer {

    public static void main(String[] args) throws Exception {

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");

        // 设置NameServer地址
        consumer.setNamesrvAddr("IP:PORT");


        // 订阅Topic
        consumer.subscribe("TopicTest", "*");


        /*
         *  回调接口,处理获取到的消息
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

3.3 压测

我们让两个Producer不停的往RocketMQ集群发送消息,每个Producer所在的机器启动了80个线程。RocketMQ集群是一主双从的dLedger模式的架构,只有Master Broker接受消息的写入,同时两个Consumer不停的从RocketMQ集群消费消息。

TPS和消息延时

TPS方面,我们根据RocketMQ的管理控制台页面监测,发现Master-Broker的TPS可以稳定达到7W左右,也就是每秒可以处理7w条消息。

消息延时方面,我们发现:一条消息从被Producer生产出来,到最后被Consumer消费完成,时间跨度不超过1s,这个性能是正常可接受的。

CPU负载

我们可以通过top命令检查Broker所在机器的CPU负载情况,执行top命令后,可以看到类似如下一行信息:

load average:12.03,12.05,12.08

上述信息分别代表CPU在1分钟、5分钟和15分钟内的cpu负载情况 。

上面的12表示有12个核在使用中,而我们的Broker机器是24核的,所以还有12个核其实还没使用,cpu还是有很大余力的,所以在7万TPS的压力下,CPU负载没什么问题。

内存使用率

我们可以通过free命令查看机器内存的使用率,我们的机器是48G内存,观察到在7万TPS的压力下,还剩下很大一部分内存都是空闲可用的。

GC频率

我们可以通过jstat命令查看RocketMQ的JVM的GC频率,观察到:新生代每隔几十秒GC一次,每次GC后存活的对象很少,几乎不进入老年代,Full GC在观察的几小时内从未出现。说明在7万TPS的压力下,GC频率正常。

IO负载

我们可以先用top命令查看,磁盘IO等待占CPU执行时间的百分比,执行top命令后会看到一行类似下面的内容:

Cpu(s):  0.3% us,  0.3% sy,  0.0% ni, 76.7% id, 13.2% wa,  0.0% hi,  0.0% si。

上面的13.2% wa,就是磁盘IO等待占CPU执行时间的百分比 , 如果这个比例太高,说明IO负载很高,导致大量的IO等待。一般来说,超过50%就很危险了。

网卡流量

通知执行命令sar -n DEV 1 2,可以查看服务器的网卡流量,也就是每秒钟网卡读写的数据量。对于千兆网卡,理论上每秒可以传输128MB的数据,一般100MB就是极限了。

我们压测时发现,对于每条500字节的消息,当TPS达到7万时,Master-Broker所在机器的网卡就几乎打满了。因为Master-Broker不但负责消息的写入,还要把数据同步给两个Slave-Broker,同时还有一些心跳之类的其它网络开销。

四、总结

通过上述压测,我们可以看到,整个RocketMQ集群的性能瓶颈其实在网卡上,当TPS达到7万时,cpu、内存、IO负载都在正常范围,GC频率也正常。所以,针对我们自己的机器配置,理论上RocketMQ集群的负载上限是7w。

为了支撑10万TPS,我们就需要采用6台机器部署Broker,最终的系统逻辑架构图如下:



正文到此结束
本文目录