RocketMQ阅读源码前准备
程序员文章站
2022-07-14 23:42:07
...
IDEA获取RocketMQ源码
通过git clone下RocketMQ开源项目源码
克隆到你的目录,下一步导入的目录就是你克隆到本地的rocketmq目录路径
git clone aaa@qq.com:apache/rocketmq.git
IEDA项目导入
导入rocketmq项目
执行maven命令编译和下载依赖
clean install -U DskipTests
IDEA调试RocketMQ源码
1.启动Nameserver
step1Add Configurations
step2 Add New Configurations
step3 编辑application信息
- Main Class 选择项目的NamesrvStartUp
- Environment Variables 设置环境变量 ROCKETMQ_HOME=“你设置的目录”
- Use classpath of module 模块类加载路径
step4 创建存储路径
在RocketMQ运行的主目录下创建conf、logs、store三个文件夹 ,并将RocketMQ distribution部署目录中将broker.conf、logback_broker.xml文件复制到conf目录中 logback_broker.xml只修改日志文件的目录,broker.conf文件修改内容代码清单:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
#存储路径
storePathRootDir=Z:\\rocketmq\\store
#commitlog存储路径
storePathCommitLog=Z:\\rocketmq\\store\\commitlog
#消费队列存储路径
storePathConsumeQueue=Z:\\rocketmq\\store\\consumequeue
#消息索引存储路径
storePathIndex=Z:\\rocketmq\\store\\index
#checkpoint文件存储路径
storeCheckpoint=Z:\\rocketmq\\store\\checkpoint
#abort文件存储路径
storeFile=Z:\\rocketmq\\store\\abort
启动成功标志
2.启动broker
3.启动producer跟consumer
依次启动:
附上我自动修改过的produce跟consumer代码
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("PRODUCETTEST");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 100; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONSUMERTEST");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消费发送与消费消息都成功,则说明RocketMQ调试环境已成功搭建,可以通过Debug调试源码。
源码结构模块结构图:
- broker:broker模块(broker启动进程)。
- client:消息客户端,包括消息生成者、消息消费者相关的类。
- commom:公共包
- dev:开发者信息(非源代码)
- distribution:部署实例文件夹(非源代码)
- example:RockerMQ示例代码
- filter:消息过滤相关基础类
- flitersrv:消息过滤服务器实现相关类(Filter启动进程)
- logappender:日志实现相关类
- namesrv:NameServer实现相关类(NameServer启动进程)
- openmessaging:消息开发标准,正在制定中
- remoting:远程通信模块,基于Netty
- srvutil:服务器工具类
- store:消息存储实现相关类
- style:checkstyle相关实现
- test:测试相关类
- tools:工具类,监控命令相关实现类
上一篇: Ubuntu18.04下将Python2环境下的代码转化为Python3
下一篇: 二进制中1的个数