欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码...

程序员文章站 2022-07-15 09:22:56
...

1.1.2 Eclipse调试RocketMQ源码

本节将展示在Eclipse中启动NameServer、Broker,并运行消息发送与消息消费示例程序。
1.启动NameServer
Step1:展开namesrv模块,右键NamesrvStartup.java,移动到Debug As,选中Debug Configurations,弹出Debug Configurations对话框,如图1-14所示。
Step2:选中Java Application条目并单击右键,选择New弹出Debug Configurations对话框,如图1-15所示。
Step3:设置RocketMQ运行主目录。选择Environment选项卡,添加环境变量ROCKET_HOME。
Step4:在RocketMQ运行主目录中创建conf、logs、store三个文件夹,如图1-16所示。
《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码...
《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码...
《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码...
Step5:从RocketMQ distribution部署目录中将broker.conf、logback_broker.xml文件复制到conf目录中,logback_namesrv.xml文件则只需修改日志文件的目录,broker.conf文件内容如下所示。
代码清单1-3 broker.conf文件
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0

nameServer地址,分号分割

namesrvAddr=127.0.0.1:9876
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

存储路径

storePathRootDir=D:\rocketmq\store

commitLog 存储路径

storePathCommitLog=D:\rocketmq\store\commitlog

消费队列存储路径

storePathConsumeQueue=D:\rocketmq\store\consumequeue

消息索引存储路径

storePathIndex=D:\rocketmq\store\index

checkpoint 文件存储路径

storeCheckpoint=D:\rocketmq\store\checkpoint

abort 文件存储路径

abortFile=D:\rocketmq\store\abort
Step6:在Eclipse Debug中运行NamesrvStartup,并输出“The Name Server boot success. Serializetype=JSON”。
2.启动Broker
Step1:展开broker模块,右键BrokerStartup.java,移动到Debug As,选中Debug Configurations,弹出如图1-17所示的对话框,选择arguments选项卡,配置-c属性指定broker配置文件路径。

《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码...
Step2:切换选项卡Environment,配置RocketMQ主目录,如图1-18所示。

《RocketMQ技术内幕:RocketMQ架构设计与实现原理》—1.1.2 Eclipse调试RocketMQ源码...
Step3:以Debug模式运行BrokerStartup.java,查看${ROCKET_HOME}/logs/broker.log文件,未报错则表示启动成功。
代码清单1-4 broker启动日志截图
2018-03-22 20:47:29 INFO main - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:47:29 INFO main - The broker[broker-a, 192.168.1.3:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
2018-03-22 20:47:38 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-03-22 20:47:38 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-03-22 20:47:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:48:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:48:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-03-22 20:48:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-03-22 20:48:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:49:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
3.使用RocketMQ提供的实例验证消息发送与消息消费
Step1:修改org.apache.rocketmq.example.quickstart.Producer示例程序,设置消息生产者NameServer地址。
代码清单1-5 消息发送示例程序
public class Producer {

public static void main(String[] args) throws MQClientException, 
                InterruptedException {
    DefaultMQProducer producer = new 
                DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 1; i++) {
        try {
            Message msg = new Message("TopicTest"/* Topic */,"TagA"/* Tag */,
                ("Hello RocketMQ " + i).getBytes
                    (RemotingHelper.DEFAULT_CHARSET)/* Message body */
                );
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
    producer.shutdown();
}

}
Step2:运行该示例程序,查看运行结果,如果输出代码清单1-6所示结果则表示消息发送成功。
代码清单1-6 消息发送结果
SendResult [sendStatus=SEND_OK, msgId=C0A8010325B46D06D69C70A211400000,
offsetMsgId=C0A8010300002A9F0000000000000000, messageQueue=MessageQueue
[topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
Step3:修改org.apache.rocketmq.example.quickstart.Consumer示例程序,设置消息消费者NameServer地址。
代码清单1-7 消息消费示例程序
public class Consumer {

public static void main(String[] args) throws InterruptedException, 
        MQClientException {
    DefaultMQPushConsumer consumer = new 
        DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", 
                Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
System.out.printf("Consumer Started.%n");
}

}
Step4:运行消息消费者程序,如果输出如下所示则表示消息消费成功。
代码清单1-8 消息消费结果
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,
storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1521723269443,
bornHost=/192.168.1.3:57034, storeTimestamp=1521723269510,
storeHost=/192.168.1.3:10911, msgId=C0A8010300002A9F0000000000000000,
commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0,
preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0,
properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1521723841419,
UNIQ_KEY=C0A8010325B46D06D69C70A211400000, WAIT=true, TAGS=TagA}, body=16]]]
消息发送与消息消费都成功,则说明RocketMQ调试环境已经成功搭建了,可以直接Debug源码,探知RocketMQ的实现奥秘了。