欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMQ阅读源码前准备

程序员文章站 2022-07-14 23:42:07
...

IDEA获取RocketMQ源码

通过git clone下RocketMQ开源项目源码

克隆到你的目录,下一步导入的目录就是你克隆到本地的rocketmq目录路径

git clone aaa@qq.com:apache/rocketmq.git

IEDA项目导入

导入rocketmq项目
RocketMQ阅读源码前准备

执行maven命令编译和下载依赖

clean install -U DskipTests

RocketMQ阅读源码前准备

IDEA调试RocketMQ源码

1.启动Nameserver
step1Add Configurations
RocketMQ阅读源码前准备
step2 Add New Configurations
RocketMQ阅读源码前准备
step3 编辑application信息
RocketMQ阅读源码前准备

  1. Main Class 选择项目的NamesrvStartUp
  2. Environment Variables 设置环境变量 ROCKETMQ_HOME=“你设置的目录”
  3. Use classpath of module 模块类加载路径

step4 创建存储路径
在RocketMQ运行的主目录下创建conf、logs、store三个文件夹 ,并将RocketMQ distribution部署目录中将broker.conf、logback_broker.xml文件复制到conf目录中 logback_broker.xml只修改日志文件的目录,broker.conf文件修改内容代码清单:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable=true
#存储路径
storePathRootDir=Z:\\rocketmq\\store
#commitlog存储路径
storePathCommitLog=Z:\\rocketmq\\store\\commitlog
#消费队列存储路径
storePathConsumeQueue=Z:\\rocketmq\\store\\consumequeue
#消息索引存储路径
storePathIndex=Z:\\rocketmq\\store\\index
#checkpoint文件存储路径
storeCheckpoint=Z:\\rocketmq\\store\\checkpoint
#abort文件存储路径
storeFile=Z:\\rocketmq\\store\\abort

启动成功标志
RocketMQ阅读源码前准备
2.启动broker
RocketMQ阅读源码前准备
3.启动producer跟consumer
RocketMQ阅读源码前准备
依次启动:
附上我自动修改过的produce跟consumer代码

public class Producer { 
 public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("PRODUCETTEST");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
 }
public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONSUMERTEST");
        consumer.setNamesrvAddr("127.0.0.1:9876");
 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

消费发送与消费消息都成功,则说明RocketMQ调试环境已成功搭建,可以通过Debug调试源码。
源码结构模块结构图:
RocketMQ阅读源码前准备

  • broker:broker模块(broker启动进程)。
  • client:消息客户端,包括消息生成者、消息消费者相关的类。
  • commom:公共包
  • dev:开发者信息(非源代码)
  • distribution:部署实例文件夹(非源代码)
  • example:RockerMQ示例代码
  • filter:消息过滤相关基础类
  • flitersrv:消息过滤服务器实现相关类(Filter启动进程)
  • logappender:日志实现相关类
  • namesrv:NameServer实现相关类(NameServer启动进程)
  • openmessaging:消息开发标准,正在制定中
  • remoting:远程通信模块,基于Netty
  • srvutil:服务器工具类
  • store:消息存储实现相关类
  • style:checkstyle相关实现
  • test:测试相关类
  • tools:工具类,监控命令相关实现类
相关标签: RocketMQ 中间件