欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot如何优雅的使用RocketMQ

程序员文章站 2022-05-20 20:32:16
[TOC] SpringBoot如何优雅的使用RocketMQ MQ,是一种跨进程的通信机制,用于上下游传递消息。在传统的互联网架构中通常使用MQ来对上下游来做解耦合。 举例:当A系统对B系统进行消息通讯,如A系统发布一条系统公告,B系统可以订阅该频道进行系统公告同步,整个过程中A系统并不关系B系统 ......

springboot如何优雅的使用rocketmq

mq,是一种跨进程的通信机制,用于上下游传递消息。在传统的互联网架构中通常使用mq来对上下游来做解耦合。

举例:当a系统对b系统进行消息通讯,如a系统发布一条系统公告,b系统可以订阅该频道进行系统公告同步,整个过程中a系统并不关系b系统会不会同步,由订阅该频道的系统自行处理。

什么是rocketmq?

官方说明:

随着使用越来越多的队列和虚拟主题,activemq io模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。因此,我们那时开始关注流行的消息传递解决方案kafka。不幸的是,kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。

看到这里可以很清楚的知道rcoketmq 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。

具有以下特性:

  • 支持发布/订阅(pub/sub)和点对点(p2p)消息模型
  • 能够保证严格的消息顺序,在一个队列中可靠的先进先出(fifo)和严格的顺序传递
  • 提供丰富的消息拉取模式,支持拉(pull)和推(push)两种消息模式
  • 单一队列百万消息的堆积能力,亿级消息堆积能力
  • 支持多种消息协议,如 jms、mqtt 等
  • 分布式高可用的部署架构,满足至少一次消息传递语义

rocketmq环境安装

下载地址:https://rocketmq.apache.org/dowloading/releases/

从官方下载二进制或者源码来进行使用。源码编译需要maven3.2x,jdk8

在根目录进行打包:

mvn -prelease-all -dskiptests clean packager -u

distribution/target/apache-rocketmq文件夹中会存在一个文件夹版,zip,tar三个可运行的完整程序。

使用rocketmq-4.6.0.zip:

  1. 启动名称服务 mqnamesrv.cmd
  2. 启动数据中心 mqbroker.cmd -n localhost:9876

springboot环境中使用rocketmq

springboot 入门:https://www.cnblogs.com/simplewu/p/10027237.html
springboot 常用start:https://www.cnblogs.com/simplewu/p/9798146.html
当前环境版本为:

  • springboot 2.0.6.release
  • springcloud finchley.release
  • springcldod alibaba 0.2.1.release
  • rocketmq 4.3.0
    在项目工程中导入:
<!-- mq begin -->
<dependency>
    <groupid>org.apache.rocketmq</groupid>
    <artifactid>rocketmq-client</artifactid>
    <version>${rocketmq.version}</version>
</dependency>
<!-- mq end -->

由于我们这边已经有工程了所以就不在进行创建这种过程了。主要是看看如何使用rocketmq。
创建rocketmqproperties配置属性类,类中内容如下:

@configurationproperties(prefix = "rocketmq")
public class rocketmqproperties {
    private boolean isenable = false;
    private string namesrvaddr = "localhost:9876";
    private string groupname = "default";
    private int producermaxmessagesize = 1024;
    private int producersendmsgtimeout = 2000;
    private int producerretrytimeswhensendfailed = 2;
    private int consumerconsumethreadmin = 5;
    private int consumerconsumethreadmax = 30;
    private int consumerconsumemessagebatchmaxsize = 1;
    //省略get set
}

现在我们所有子系统中的生产者,消费者对应:
isenable 是否开启mq
namesrvaddr 集群地址
groupname 分组名称
设置为统一已方便系统对接,如有其它需求在进行扩展,类中我们已经给了默认值也可以在配置文件或配置中心中获取配置,配置如下:

#发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
rocketmq.groupname=please_rename_unique_group_name
#是否开启自动配置
rocketmq.isenable=true
#mq的nameserver地址
rocketmq.namesrvaddr=127.0.0.1:9876
#消息最大长度 默认1024*4(4m)
rocketmq.producer.maxmessagesize=4096
#发送消息超时时间,默认3000
rocketmq.producer.sendmsgtimeout=3000
#发送消息失败重试次数,默认2
rocketmq.producer.retrytimeswhensendfailed=2
#消费者线程数量
rocketmq.consumer.consumethreadmin=5
rocketmq.consumer.consumethreadmax=32
#设置一次消费消息的条数,默认为1条
rocketmq.consumer.consumemessagebatchmaxsize=1

创建消费者接口 rocketconsumer.java 该接口用户约束消费者需要的核心步骤:

/**
 * 消费者接口
 * 
 * @author simplewu
 *
 */
public interface rocketconsumer {

/**
     * 初始化消费者
     */
    public abstract void init();

    /**
     * 注册监听
     * 
     * @param messagelistener
     */
    public void registermessagelistener(messagelistener messagelistener);

}

创建抽象消费者 abstractrocketconsumer.java:

/**
 * 消费者基本信息
 * 
 * @author simpelwu
 */
public abstract class abstractrocketconsumer implements rocketconsumer {

    protected string topics;
    protected string tags;
    protected messagelistener messagelistener;
    protected string consumertitel;
    protected mqpushconsumer mqpushconsumer;

    /**
     * 必要的信息
     * 
     * @param topics
     * @param tags
     * @param consumertitel
     */
    public void necessary(string topics, string tags, string consumertitel) {
        this.topics = topics;
        this.tags = tags;
        this.consumertitel = consumertitel;
    }

    public abstract void init();

    @override
    public void registermessagelistener(messagelistener messagelistener) {
        this.messagelistener = messagelistener;
    }
    
}

在类中我们必须指定这个topics,tags与消息监听逻辑
public abstract void init();该方法是用于初始化消费者,由子类实现。
接下来我们编写自动配置类rocketmqconfiguation.java,该类用户初始化一个默认的生产者连接,以及加载所有的消费者。
@enableconfigurationproperties({ rocketmqproperties.class }) 使用该配置文件
@configuration 标注为配置类
@conditionalonproperty(prefix = "rocketmq", value = "isenable", havingvalue = "true") 只有当配置中指定rocketmq.isenable = true的时候才会生效
核心内容如下:

/**
 * mq配置
 * 
 * @author simplewu
 */
@configuration
@enableconfigurationproperties({ rocketmqproperties.class })
@conditionalonproperty(prefix = "rocketmq", value = "isenable", havingvalue = "true")
public class rocketmqconfiguation {

    private rocketmqproperties properties;

    private applicationcontext applicationcontext;

    private logger log = loggerfactory.getlogger(rocketmqconfiguation.class);

    public rocketmqconfiguation(rocketmqproperties properties, applicationcontext applicationcontext) {
        this.properties = properties;
        this.applicationcontext = applicationcontext;
    }

    /**
     * 注入一个默认的消费者
     * @return
     * @throws mqclientexception
     */
    @bean
    public defaultmqproducer getrocketmqproducer() throws mqclientexception {
        if (stringutils.isempty(properties.getgroupname())) {
            throw new mqclientexception(-1, "groupname is blank");
        }

        if (stringutils.isempty(properties.getnamesrvaddr())) {
            throw new mqclientexception(-1, "nameserveraddr is blank");
        }
        defaultmqproducer producer;
        producer = new defaultmqproducer(properties.getgroupname());

        producer.setnamesrvaddr(properties.getnamesrvaddr());
        // producer.setcreatetopickey("auto_create_topic_key");

        // 如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instancename
        // producer.setinstancename(instancename);
        producer.setmaxmessagesize(properties.getproducermaxmessagesize());
        producer.setsendmsgtimeout(properties.getproducersendmsgtimeout());
        // 如果发送消息失败,设置重试次数,默认为2次
        producer.setretrytimeswhensendfailed(properties.getproducerretrytimeswhensendfailed());

        try {
            producer.start();
            log.info("producer is start ! groupname:{},namesrvaddr:{}", properties.getgroupname(),
                    properties.getnamesrvaddr());
        } catch (mqclientexception e) {
            log.error(string.format("producer is error {}", e.getmessage(), e));
            throw e;
        }
        return producer;

    }

    /**
     * springboot启动时加载所有消费者
     */
    @postconstruct
    public void initconsumer() {
        map<string, abstractrocketconsumer> consumers = applicationcontext.getbeansoftype(abstractrocketconsumer.class);
        if (consumers == null || consumers.size() == 0) {
            log.info("init rocket consumer 0");
        }
        iterator<string> beans = consumers.keyset().iterator();
        while (beans.hasnext()) {
            string beanname = (string) beans.next();
            abstractrocketconsumer consumer = consumers.get(beanname);
            consumer.init();
            createconsumer(consumer);
            log.info("init success consumer title {} , toips {} , tags {}", consumer.consumertitel, consumer.tags,
                    consumer.topics);
        }
    }

    /**
     * 通过消费者信心创建消费者
     * 
     * @param consumerpojo
     */
    public void createconsumer(abstractrocketconsumer arc) {
        defaultmqpushconsumer consumer = new defaultmqpushconsumer(this.properties.getgroupname());
        consumer.setnamesrvaddr(this.properties.getnamesrvaddr());
        consumer.setconsumethreadmin(this.properties.getconsumerconsumethreadmin());
        consumer.setconsumethreadmax(this.properties.getconsumerconsumethreadmax());
        consumer.registermessagelistener(arc.messagelistenerconcurrently);
        /**
         * 设置consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        // consumer.setconsumefromwhere(consumefromwhere.consume_from_first_offset);
        /**
         * 设置消费模型,集群还是广播,默认为集群
         */
        // consumer.setmessagemodel(messagemodel.clustering);

        /**
         * 设置一次消费消息的条数,默认为1条
         */
        consumer.setconsumemessagebatchmaxsize(this.properties.getconsumerconsumemessagebatchmaxsize());
        try {
            consumer.subscribe(arc.topics, arc.tags);
            consumer.start();
            arc.mqpushconsumer=consumer;
        } catch (mqclientexception e) {
            log.error("info consumer title {}", arc.consumertitel, e);
        }

    }

}

然后在src/main/resources文件夹中创建目录与文件meta-inf/spring.factories里面添加自动配置类即可开启启动配置,我们只需要导入依赖即可:

org.springframework.boot.autoconfigure.enableautoconfiguration=\
com.xcloud.config.rocketmq.rocketmqconfiguation

接下来在服务中导入依赖,然后通过我们的抽象类获取所有必要信息对消费者进行创建,该步骤会在所有消费者初始化完成后进行,且只会管理是spring bean的消费者。
下面我们看看如何创建一个消费者,创建消费者的步骤非常简单,只需要继承abstractrocketconsumer然后再加上spring的@component就能够完成消费者的创建,我们可以在类中自定义消费的主题与标签。
在项目可以根据需求当消费者创建失败的时候是否继续启动工程。
创建一个默认的消费者 defaultconsumermq.java

@component
public class defaultconsumermq extends abstractrocketconsumer {
    /**
     * 初始化消费者
     */
    @override
    public void init() {
        // 设置主题,标签与消费者标题
        super.necessary("topictest", "*", "这是标题");
        //消费者具体执行逻辑
        registermessagelistener(new messagelistenerconcurrently() {
            @override
            public consumeconcurrentlystatus consumemessage(list<messageext> msgs, consumeconcurrentlycontext context) {
                msgs.foreach(msg -> {
                    system.out.printf("consumer message boyd %s %n", new string(msg.getbody()));
                });
                // 标记该消息已经被成功消费
                return consumeconcurrentlystatus.consume_success;
            }
        });
    }
}

super.necessary("topictest", "*", "这是标题"); 是必须要设置的,代表该消费者监听topictest主题下所有tags,标题那个字段是我自己定义的,所以对于该配置来说没什么意义。
我们可以在这里注入spring的bean来进行任意逻辑处理。
创建一个消息发送类进行测试

@override
public string qmtest(@pathvariable("name")string name) throws mqclientexception, remotingexception, mqbrokerexception, interruptedexception, unsupportedencodingexception {
    message msg = new message("topictest", "tags1", name.getbytes(remotinghelper.default_charset));
    // 发送消息到一个broker
    sendresult sendresult = defaultmqproducer.send(msg);
    // 通过sendresult返回消息是否成功送达
    system.out.printf("%s%n", sendresult);
    return null;
}

我们来通过http请求测试:

http://localhost:10001/demo/base/mq/hello  consumer message boyd hello 
http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿  consumer message boyd 嘿嘿嘿嘿嘿 

好了到这里简单的start算是设计完成了,后面还有一些:顺序消息生产,顺序消费消息,异步消息生产等一系列功能,官人可参照官方去自行处理。

  • activemq 没经过大规模吞吐量场景的验证,社区不高不活跃。
  • rabbitmq 集群动态扩展麻烦,且与当前程序语言不至于难以定制化。
  • kafka 支持主要的mq功能,功能无法达到程序需求的要求,所以不使用,且与当前程序语言不至于难以定制化。
  • rocketmq 经过全世界的女人的洗礼,已经很强大;mq功能较为完善,还是分布式的,扩展性好;支持复杂mq业务场景。(业务复杂可做首选)