欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RocketMq 本地(Windows)安装配置,客户端启动,简单代码实现

程序员文章站 2022-07-14 23:37:19
...

                                                    RocketMq

1、描述

本程序是结合网上一些资料进行整合的,进行对RocketMq的简单的demo,其中包括RocketMq安装,RocketMq客户端查看,以及简单代码实现。

提供网站地址:
SpringBoot(17)---SpringBoot整合RocketMQ:https://www.cnblogs.com/qdhxhz/p/11109696.html
windows下RocketMQ安装部署:https://www.jianshu.com/p/4a275e779afa
RocketMQ可视化管理控制台rocketmq-console-ng:https://www.jianshu.com/p/4a275e779afa
启动本地MQ命令:(1)start mqnamesrv.cmd   (2)start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

提供demo代码git: https://github.com/kinber123/rocketMqDemo.git

2、安装流程

本人是通过网络搜索查看到的,个人觉不错,提供网站:

windows下RocketMQ安装部署:https://www.jianshu.com/p/4a275e779afa

3、RocketMQ可视化管理控制台

提供网站:https://www.jianshu.com/p/4a275e779afa

4、实现代码demo

1、添加rocketmq包

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.1</version>
        </dependency>

2、JmsConfig(配置类)

连接RocketMQ服务器配置类,这里为了方便直接写成常量。

/**
 * \* @author wcy
 * \* @date: 2020-05-15 17:42
 * \* Description:  类,安装实际开发这里的信息 都是应该写在配置里,来读取,这里为了方便所以写成常量
 * \
 */
public class JmsConfig {

    /**
     * Name Server 地址,因为是集群部署 所以有多个用 分号 隔开
     */
    public static final String NAME_SERVER = "127.0.0.1:9876";
    /**
     * 主题名称 主题一般是服务器设置好 而不能在代码里去新建topic( 如果没有创建好,生产者往该主题发送消息 会报找不到topic错误)
     */
    public static final String TOPIC = "topic_family";

}

 

3、Producer (生产者)

package com.rocketmq.demo.service.simpleness;

import com.rocketmq.demo.config.JmsConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;


/**
 * \* @author wcy
 * \* @date: 2020-05-15 17:36
 * \* Description:  一般情況下需要程序启动就开始初始化
 * \
 */
@Slf4j
@Component
public class SimplenessConsumer {

    /**
     * 消费者实体对象
     */
    private DefaultMQPushConsumer consumer;
    /**
     * 消费者组
     */
    public static final String CONSUMER_GROUP = "test_consumer";

    /**
     * 通过构造函数 实例化对象
     */
    public SimplenessConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //订阅主题和 标签( * 代表所有标签)下信息
        consumer.subscribe(JmsConfig.TOPIC, "*");
        // //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            // msgs中只收集同一个topic,同一个tag,并且key相同的message
            // 会把不同的消息分别放置到不同的队列中
            try {
                for (Message msg : msgs) {
                    //消费者获取消息 这里只输出 不做后面逻辑处理
                    String body = new String(msg.getBody(), "utf-8");
                    log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);
                }
            } catch (UnsupportedEncodingException e) {
                log.error("rocketMq error:{}", ExceptionUtils.getStackTrace(e));
                // 异常重试
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        log.info("消费者 启动成功=======");
    }
}

4、Consumer (消费者)

package com.rocketmq.demo.service.simpleness;

import com.rocketmq.demo.config.JmsConfig;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.stereotype.Component;
import org.apache.rocketmq.common.message.Message;

/**
 * \* @author wcy
 * \* @date: 2020-05-15 17:34
 * \* Description:  类
 * \
 */
@Slf4j
@Component
public class SimplenessProducer {


    private String producerGroup = "test_producer";

    private DefaultMQProducer producer;

    public SimplenessProducer() {
        start();
    }

    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start() {
        //示例生产者
        producer = new DefaultMQProducer(producerGroup);
        //不开启vip通道 开通口端口会减2
        producer.setVipChannelEnabled(false);
        //绑定name server
        producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        // 设置超过多大进行compress压缩
        producer.setCompressMsgBodyOverHowmuch(1024 * 10);
        // 设置发送失败的尝试次数。
        producer.setRetryTimesWhenSendFailed(3);
        // 设置如果返回值不是send_ok,是否要重新发送
        producer.setRetryAnotherBrokerWhenNotStoreOK(false);
        // 设置限制最大的文件大小
        producer.setMaxMessageSize(1024*50);
        // 设置默认主题对应的队列数
        producer.setDefaultTopicQueueNums(4);
        // 设置发送超时时间 ms
        producer.setSendMsgTimeout(1000);
        try {
            this.producer.start();
        } catch (MQClientException e) {
            log.info(ExceptionUtils.getStackTrace(e));
        }
    }

    /**
     * 生产者生产方法
     * @param topic 主题
     * @param tags 标签,用来给消费者进行过滤的
     * @param keys 作为key
     * @param body 发送的内容
     */
    @SneakyThrows
    public SendResult producerSendMes(String topic, String tags, String keys, String body) {
        Message message = new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
        //发送
        SendResult sendResult = this.producer.send(message);
        return sendResult;
    }

    /**
     * 生产者生产方法
     * @param topic 主题
     * @param tags 标签,用来给消费者进行过滤的
     * @param body 发送的内容
     */
    @SneakyThrows
    public SendResult producerSendMes(String topic, String tags, String body) {
        Message message = new Message(topic, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
        //发送
        SendResult sendResult = this.producer.send(message);
        return sendResult;
    }

    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

5、测试

package com.rocketmq.demo.controller;

import com.rocketmq.demo.config.JmsConfig;
import com.rocketmq.demo.service.simpleness.SimplenessProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

/**
 * \* @author wcy
 * \* @date: 2020-05-15 17:30
 * \* Description:  类
 * \
 */
@Slf4j
@RestController
@RequestMapping(value = "/text")
public class RocketMqController {

    @Autowired
    private SimplenessProducer producer;

    private List<String> mesList;

    /**
     * 初始化消息
     */
    public RocketMqController() {
        mesList = new ArrayList<>();
        mesList.add("小小");
        mesList.add("爸爸");
        mesList.add("妈妈");
        mesList.add("爷爷");
        mesList.add("奶奶");
        mesList.add("外公");
        mesList.add("外婆");

    }

    @RequestMapping("/rocketmq")
    public Object callback() throws Exception {
        //总共发送五次消息
        for (String s : mesList) {
            //创建生产信息
            SendResult sendResult = producer.producerSendMes(JmsConfig.TOPIC, "testtag", ("小小一家人的称谓:" + s));
            log.info("输出生产者信息={}",sendResult);
        }
        return "成功";
    }

    /**
     * 测试是否能访问
     * @param test
     * @return
     * @throws Exception
     */
    @RequestMapping("/test")
    public Object test(String test) {
        return "success "+test;
    }
}

6、结果显示

控制台

RocketMq 本地(Windows)安装配置,客户端启动,简单代码实现

rocketMq控制台

RocketMq 本地(Windows)安装配置,客户端启动,简单代码实现

注:再次感谢网络作者的贡献

相关标签: Java java