欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

windows搭建RocketMQ服务

程序员文章站 2022-03-23 12:55:07
...

一.系统及环境

windows7,JDK1.8,IntelliJ IDEA

二.RocketMQ的安装

1.下载RocketMQ的版本:

下载地址http://rocketmq.apache.org/release_notes/release-notes-4.2.0/ 

windows搭建RocketMQ服务

我下载的事这个

windows搭建RocketMQ服务

下载好的文件: rocketmq-all-4.2.0-bin-release.zip

2.然后将下载好的文件解压

解压后的文件目录如下:

windows搭建RocketMQ服务

3.修改配置

3.1修改rocketmq配置:网上有人建议修改(理由是有可能会因为内存设置过大导致一系列的问题),具体细节可以下去了解和学习,这里我也修改了

需要执行的命令都在程序目录下的bin文件夹下,主要是mqnamesrv.cmdmqbroker.cmd这两个命令,但他们又都各自调用了runserver.cmdrunbroker.cmd命令,分别修改这两个文件里的JAVA_OPT设置,如下:

set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

改为

set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

3.2修改环境变量(很重要)

步骤如下:

windows搭建RocketMQ服务

添加环境变量值ROCKETMQ_HOME,变量值为mq的解压的路径

windows搭建RocketMQ服务

环境变量添加后,一般需要重启计算机(我的没有重启,mq一直启动不了,重启之后成功启动)

三.启动mq

1.启动 NameServer:

在cmd下进去mq的解压的bin路径,执行mqnamesrv.cmd -n localhost:9876命令

会弹出另外一个命令窗口,内容如下,说明NameServer启动成功(该窗口不要关闭)

windows搭建RocketMQ服务

如果执行命令行时,提示你设置环境变量,那么启动失败,原因是环境变量没有配好,请参照上文配置,重新配置

2.启动 Broker

windows搭建RocketMQ服务

在bin路径下执行命令start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true876 autoCreateTopicEnable=true

说明:官方文档,还有网上其他人,使用mqbroker.cmd -n localhost:9876命令启动Broker,但我试过,启动不了,在网上参照了一些大牛,使用了上面的方式(参考https://blog.csdn.net/yangding_/article/details/62419383

执行成功后会弹出一个空白的cmd命令窗口,如下

windows搭建RocketMQ服务

目前我的状态时启动的(不要关闭这个窗口)

四.使用demo程序,要调试一下,消息的消费(参考https://www.cnblogs.com/lsx1993/p/4656351.html

下面是我自己按照上面写的demo和调试的

1.maven工程目录结构

windows搭建RocketMQ服务

1.1pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.joe</groupId>
    <artifactId>rocketMQtest</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>rocketmqTest Maven Webapp</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.2.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>3.2.6</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>rocketmqTest</finalName>
    </build>

</project>

1.2.消费者

package com.joe.example;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    /**
     * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
     * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
     */
    public static void main(String[] args) throws InterruptedException,
            MQClientException{
        /**
         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
         * 注意:ConsumerGroupName需要由应用来保证唯一
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "ConsumerGroupName");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("Consumber");

        /**
         * 订阅指定topic下tags分别等于TagA或TagC或TagD
         */
        consumer.subscribe("TopicTest1","TagA || TagC || TagD");
        /**
         * 订阅指定topic下所有消息<br>
         * 注意:一个consumer对象可以订阅多个topic
         */
        consumer.subscribe("TopicTest2","*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt>msgs, ConsumeConcurrentlyContext context) {

                System.out.println(Thread.currentThread().getName()
                        +" Receive New Messages: " + msgs.size());

                MessageExt msg = msgs.get(0);
                if(msg.getTopic().equals("TopicTest1")) {
                    //执行TopicTest1的消费逻辑
                    if(msg.getTags() != null && msg.getTags().equals("TagA")) {
                        //执行TagA的消费
                        System.out.println(new String(msg.getBody()));
                    }else if (msg.getTags() != null
                            &&msg.getTags().equals("TagC")) {
                        //执行TagC的消费
                        System.out.println(new String(msg.getBody()));
                    }else if (msg.getTags() != null
                            &&msg.getTags().equals("TagD")) {
                        //执行TagD的消费
                        System.out.println(new String(msg.getBody()));
                    }
                }else if (msg.getTopic().equals("TopicTest2")) {
                    System.out.println(new String(msg.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

            }
        });

        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         */
        consumer.start();

        System.out.println("ConsumerStarted.");
    }
}

1.3生产者

package com.joe.example;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

import java.util.concurrent.TimeUnit;

public class Producer {
    public static void main(String[] args) throws MQClientException,
            InterruptedException{
        /**
         * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
         * 注意:ProducerGroupName需要由应用来保证唯一<br>
         * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
         * 因为服务器会回查这个Group下的任意一个Producer
         */
        final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("Producer");

        /**
         * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         * 注意:切记不可以在每次发送消息时,都调用start方法
         */
        producer.start();

        /**
         * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
         * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
         * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
         * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
         */
        for (int i = 0; i < 10; i++){
            try {
                {
                    Message msg = new Message("TopicTest1",// topic
                            "TagA",// tag
                            "OrderID001",// key
                            ("Hello MetaQA").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }

                {
                    Message msg = new Message("TopicTest2",// topic
                            "TagB",// tag
                            "OrderID0034",// key
                            ("Hello MetaQB").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }

                {
                    Message msg = new Message("TopicTest3",// topic
                            "TagC",// tag
                            "OrderID061",// key
                            ("Hello MetaQC").getBytes());// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
            }catch(Exception e) {
                e.printStackTrace();
            }
            TimeUnit.MILLISECONDS.sleep(1000);
        }

        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
         */
//producer.shutdown();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                producer.shutdown();
            }
        }));
        System.exit(0);
    }
}

1.4,执行

先执行消费者,在执行生产者

控制台日志如下

windows搭建RocketMQ服务

相关标签: RocketMQ