欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

程序员文章站 2022-07-15 09:22:26
...

参考资料

在Windows下搭建RocketMQ

windows下RocketMQ安装部署

十分钟入门RocketMQ

1.偏头痛杨的rocketmq4.x入门之基础概念扫盲篇

消息队列设计的精髓基本都藏在本文里了

https://www.jianshu.com/p/453c6e7ff81c

https://blog.csdn.net/songxinjianqwe/article/details/78923482

【MQ】Eclipse向RocketMQ中发送和接收消息

RocketMQ消费者示例程序

阿里RocketMQ消息队列与Spring整合+Xml配置


spring cloud实现 rocketmq可靠一致性

SpringBoot RocketMQ 整合使用和监控

RocketMQ最佳实践(三)开发spring-boot-starter-rocketmq实现与spring boot项目的整合

windows下配置rocketMQ

下载链接

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

解压缩

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

系统环境变量配置

变量名:ROCKETMQ_HOME

变量值:MQ解压路径\MQ文件夹名

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

启动NAMESERVER

Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqnamesrv.cmd’,启动NAMESERVER。成功后会弹出提示框,此框勿关闭。

D:
cd D:\software\rocketmq-all-4.2\bin
start mqnamesrv.cmd

结果:

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

暂时忽略警告。

启动BROKER

Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,启动BROKER。成功后会弹出提示框,此框勿关闭。

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

建立项目测试生产者消费者(测试用代码)

该代码只是单纯用于测试,要用于实际环境还需要有相当的规则和编写规范,下一章将补充这个

项目结构如下:
微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

各个文件内容:
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>net.funfunle</groupId>
    <artifactId>TestMQ</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--好了,是不是经常发现pom文件一加一点东西idea的编译环境就会自动变化1.5版本的jdk??加上这个强行指定编译版本就没问题了。-->

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-common</artifactId>
        <version>4.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.2.0</version>
    </dependency>
</dependencies>
</project>

mq.properties

# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=localhost:9876

MQConfig.java

package net.funfunle.TestMQ.config;


import org.apache.commons.lang3.StringUtils;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;

/**
 * Created by DGDL-08 on 2017/3/24.
 */
public class MQConfig {

    private static Properties config;
    private static final String producerGroup;
    private static final String namesrvAddr;
    private static final String consumerGroup;

    static {


        //InputStream inputStream = MyClass.class.getClassLoader().getResourceAsStream("com/john/basis/conf.properties");
        config=new Properties();
        InputStream inputStream=null;
        try{

            InputStream in = ClassLoader.getSystemResourceAsStream("mq.properties");
            InputStreamReader is=new InputStreamReader(in,"utf-8");
            config.load(is);
            is.close();
            in.close();
//            config.load(inputStream);
        }
        catch (Exception ed){
            ed.printStackTrace();
        }


        consumerGroup=config.getProperty("apache.rocketmq.consumer.PushConsumer");
        producerGroup=config.getProperty("apache.rocketmq.producer.producerGroup");
        namesrvAddr=config.getProperty("apache.rocketmq.namesrvAddr");


    }

    public static String getProducerGroup() {
        return producerGroup;
    }

    public static String getNamesrvAddr() {
        return namesrvAddr;
    }

    public static String getConsumerGroup() {
        return consumerGroup;
    }
}

RocketMQClient.java

package net.funfunle.TestMQ;

import net.funfunle.TestMQ.config.MQConfig;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQClient {
    /**
     * 生产者的组名
     */

    private String producerGroup= MQConfig.getProducerGroup();

    /**
     * NameServer 地址
     */

    private String namesrvAddr=MQConfig.getNamesrvAddr();
    public void defaultMQProducer() {
        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);

        try {
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            producer.start();

            //创建一个消息实例,包含 topic、tag 和 消息体
            //如下:topic 为 "TopicTest",tag 为 "push"
            Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));

            StopWatch stop = new StopWatch();
            stop.start();

            for (int i = 0; i < 10000; i++) {
                SendResult result = producer.send(message);
                System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
            }
            stop.stop();
            System.out.println("----------------发送一万条消息耗时:" + stop.getTime());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }
    public static void main(String[] args){
        new RocketMQClient().defaultMQProducer();
    }
}

RocketMQServer.java

package net.funfunle.TestMQ;

import net.funfunle.TestMQ.config.MQConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * Created by zhisheng_tian on 2018/2/6
 */

public class RocketMQServer {
    /**
     * 消费者的组名
     */

    private String consumerGroup= MQConfig.getConsumerGroup();

    /**
     * NameServer 地址
     */

    private String namesrvAddr=MQConfig.getNamesrvAddr();


    public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("TopicTest", "push");

            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
                try {
                    for (MessageExt messageExt : list) {

                        System.out.println("messageExt: " + messageExt);//输出消息内容

                        String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);

                        System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + messageBody);//输出消息内容
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args){
        new RocketMQServer().defaultMQPushConsumer();
    }
}

分别执行生产者和消费者,可以得到结果:

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置

微服务架构【技术点4】windows下rocketMQ配置及java端生产者消费者配置