欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot2.0 整合 RocketMQ ,实现请求异步处理

程序员文章站 2022-07-02 17:03:07
一、RocketMQ 1、架构图片 2、角色分类 (1)、Broker RocketMQ 的核心,接收 Producer 发过来的消息、处理 Consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。 (2)、NameServer 消息队列中的状态服务器,集群的各个组件通过它来了解全 ......

一、rocketmq

1、架构图片

SpringBoot2.0 整合 RocketMQ ,实现请求异步处理

2、角色分类

(1)、broker

rocketmq 的核心,接收 producer 发过来的消息、处理 consumer 的消费消息请求、消息的持 久化存储、服务端过滤功能等 。

(2)、nameserver

消息队列中的状态服务器,集群的各个组件通过它来了解全局的信息 。类似微服务中注册中心的服务注册,发现,下线,上线的概念。

热备份:
namserver可以部署多个,相互之间独立,其他角色同时向多个nameserver 机器上报状态信息。

心跳机制:
nameserver 中的 broker、 topic等状态信息不会持久存储,都是由各个角色定时上报并存储到内存中,超时不上报的话, nameserver会认为某个机器出故障不可用。

(3)、producer

消息的生成者,最常用的producer类就是defaultmqproducer。

(4)、consumer

消息的消费者,常用consumer类
defaultmqpushconsumer
收到消息后自动调用传入的处理方法来处理,实时性高
defaultmqpullconsumer
用户自主控制 ,灵活性更高。

3、通信机制

(1)、broker启动后需要完成一次将自己注册至nameserver的操作;随后每隔30s时间定时向nameserver更新topic路由信息。

(2)、producer发送消息时候,需要根据消息的topic从本地缓存的获取路由信息。如果没有则更新路由信息会从nameserver重新拉取,同时producer会默认每隔30s向nameserver拉取一次路由信息。

(3)、consumer消费消息时候,从nameserver获取的路由信息,并再完成客户端的负载均衡后,监听指定消息队列获取消息并进行消费。

二、代码实现案例

1、项目结构图

SpringBoot2.0 整合 RocketMQ ,实现请求异步处理

版本描述

<spring-boot.version>2.1.3.release</spring-boot.version>
<rocketmq.version>4.3.0</rocketmq.version>

2、配置文件

rocketmq:
  # 生产者配置
  producer:
    isonoff: on
    # 发送同一类消息的设置为同一个group,保证唯一
    groupname: feeplatgroup
    # 服务地址
    namesrvaddr: 10.1.1.207:9876
    # 消息最大长度 默认1024*4(4m)
    maxmessagesize: 4096
    # 发送消息超时时间,默认3000
    sendmsgtimeout: 3000
    # 发送消息失败重试次数,默认2
    retrytimeswhensendfailed: 2
  # 消费者配置
  consumer:
    isonoff: on
    # 官方建议:确保同一组中的每个消费者订阅相同的主题。
    groupname: feeplatgroup
    # 服务地址
    namesrvaddr: 10.1.1.207:9876
    # 接收该 topic 下所有 tag
    topics: feeplattopic~*;
    consumethreadmin: 20
    consumethreadmax: 64
    # 设置一次消费消息的条数,默认为1条
    consumemessagebatchmaxsize: 1

# 配置 group  topic  tag
fee-plat:
  fee-plat-group: feeplatgroup
  fee-plat-topic: feeplattopic
  fee-account-tag: feeaccounttag

3、生产者配置

import org.apache.rocketmq.client.exception.mqclientexception;
import org.apache.rocketmq.client.producer.defaultmqproducer;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
/**
 * rocketmq 生产者配置
 */
@configuration
public class producerconfig {
    private static final logger log = loggerfactory.getlogger(producerconfig.class) ;
    @value("${rocketmq.producer.groupname}")
    private string groupname;
    @value("${rocketmq.producer.namesrvaddr}")
    private string namesrvaddr;
    @value("${rocketmq.producer.maxmessagesize}")
    private integer maxmessagesize ;
    @value("${rocketmq.producer.sendmsgtimeout}")
    private integer sendmsgtimeout;
    @value("${rocketmq.producer.retrytimeswhensendfailed}")
    private integer retrytimeswhensendfailed;
    @bean
    public defaultmqproducer getrocketmqproducer() {
        defaultmqproducer producer;
        producer = new defaultmqproducer(this.groupname);
        producer.setnamesrvaddr(this.namesrvaddr);
        //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instancename
        if(this.maxmessagesize!=null){
            producer.setmaxmessagesize(this.maxmessagesize);
        }
        if(this.sendmsgtimeout!=null){
            producer.setsendmsgtimeout(this.sendmsgtimeout);
        }
        //如果发送消息失败,设置重试次数,默认为2次
        if(this.retrytimeswhensendfailed!=null){
            producer.setretrytimeswhensendfailed(this.retrytimeswhensendfailed);
        }
        try {
            producer.start();
        } catch (mqclientexception e) {
            e.printstacktrace();
        }
        return producer;
    }
}

4、消费者配置

import org.apache.rocketmq.client.consumer.defaultmqpushconsumer;
import org.apache.rocketmq.client.exception.mqclientexception;
import org.apache.rocketmq.common.consumer.consumefromwhere;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import javax.annotation.resource;
/**
 * rocketmq 消费者配置
 */
@configuration
public class consumerconfig {
    private static final logger log = loggerfactory.getlogger(consumerconfig.class) ;
    @value("${rocketmq.consumer.namesrvaddr}")
    private string namesrvaddr;
    @value("${rocketmq.consumer.groupname}")
    private string groupname;
    @value("${rocketmq.consumer.consumethreadmin}")
    private int consumethreadmin;
    @value("${rocketmq.consumer.consumethreadmax}")
    private int consumethreadmax;
    @value("${rocketmq.consumer.topics}")
    private string topics;
    @value("${rocketmq.consumer.consumemessagebatchmaxsize}")
    private int consumemessagebatchmaxsize;
    @resource
    private rocketmsglistener msglistener;
    @bean
    public defaultmqpushconsumer getrocketmqconsumer(){
        defaultmqpushconsumer consumer = new defaultmqpushconsumer(groupname);
        consumer.setnamesrvaddr(namesrvaddr);
        consumer.setconsumethreadmin(consumethreadmin);
        consumer.setconsumethreadmax(consumethreadmax);
        consumer.registermessagelistener(msglistener);
        consumer.setconsumefromwhere(consumefromwhere.consume_from_last_offset);
        consumer.setconsumemessagebatchmaxsize(consumemessagebatchmaxsize);
        try {
            string[] topictagsarr = topics.split(";");
            for (string topictags : topictagsarr) {
                string[] topictag = topictags.split("~");
                consumer.subscribe(topictag[0],topictag[1]);
            }
            consumer.start();
        }catch (mqclientexception e){
            e.printstacktrace();
        }
        return consumer;
    }
}

5、消息监听配置

import com.rocket.queue.service.impl.paramconfigservice;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlycontext;
import org.apache.rocketmq.client.consumer.listener.consumeconcurrentlystatus;
import org.apache.rocketmq.client.consumer.listener.messagelistenerconcurrently;
import org.apache.rocketmq.common.message.messageext;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.stereotype.component;
import org.springframework.util.collectionutils;
import javax.annotation.resource;
import java.util.list;
/**
 * 消息消费监听
 */
@component
public class rocketmsglistener implements messagelistenerconcurrently {
    private static final logger log = loggerfactory.getlogger(rocketmsglistener.class) ;
    @resource
    private paramconfigservice paramconfigservice ;
    @override
    public consumeconcurrentlystatus consumemessage(list<messageext> list, consumeconcurrentlycontext context) {
        if (collectionutils.isempty(list)){
            return consumeconcurrentlystatus.consume_success;
        }
        messageext messageext = list.get(0);
        log.info("接受到的消息为:"+new string(messageext.getbody()));
        int reconsume = messageext.getreconsumetimes();
        // 消息已经重试了3次,如果不需要再次消费,则返回成功
        if(reconsume ==3){
            return consumeconcurrentlystatus.consume_success;
        }
        if(messageext.gettopic().equals(paramconfigservice.feeplattopic)){
            string tags = messageext.gettags() ;
            switch (tags){
                case "feeaccounttag":
                    log.info("开户 tag == >>"+tags);
                    break ;
                default:
                    log.info("未匹配到tag == >>"+tags);
                    break;
            }
        }
        // 消息消费成功
        return consumeconcurrentlystatus.consume_success;
    }
}

6、配置参数绑定

import org.springframework.beans.factory.annotation.value;
import org.springframework.stereotype.service;
@service
public class paramconfigservice {
    @value("${fee-plat.fee-plat-group}")
    public string feeplatgroup ;
    @value("${fee-plat.fee-plat-topic}")
    public string feeplattopic ;
    @value("${fee-plat.fee-account-tag}")
    public string feeaccounttag ;
}

7、消息发送测试

import com.rocket.queue.service.feeplatmqservice;
import org.apache.rocketmq.client.producer.defaultmqproducer;
import org.apache.rocketmq.client.producer.sendresult;
import org.apache.rocketmq.common.message.message;
import org.springframework.stereotype.service;
import javax.annotation.resource;
@service
public class feeplatmqserviceimpl implements feeplatmqservice {
    @resource
    private defaultmqproducer defaultmqproducer;
    @resource
    private paramconfigservice paramconfigservice ;
    @override
    public sendresult openaccountmsg(string msginfo) {
        // 可以不使用config中的group
        defaultmqproducer.setproducergroup(paramconfigservice.feeplatgroup);
        sendresult sendresult = null;
        try {
            message sendmsg = new message(paramconfigservice.feeplattopic,
                                          paramconfigservice.feeaccounttag,
                                         "fee_open_account_key", msginfo.getbytes());
            sendresult = defaultmqproducer.send(sendmsg);
        } catch (exception e) {
            e.printstacktrace();
        }
        return sendresult ;
    }
}

三、项目源码

github:知了一笑
https://github.com/cicadasmile/middle-ware-parent

SpringBoot2.0 整合 RocketMQ ,实现请求异步处理