欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SpringBoot整合阿里RocketMQ

程序员文章站 2022-07-15 08:06:41
...

什么是RocketMQ

阿里消息队列 RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,同时是收费的产品。

应用场景

削峰填谷

诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列 RocketMQ 版可提供削峰填谷的服务来解决该问题。

异步解耦

交易系统作为淘宝/天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列 RocketMQ 版可实现异步通信和应用解耦,确保主站业务的连续性。

顺序收发

细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出(First In First Out,缩写 FIFO)原理类似,消息队列 RocketMQ 版提供的顺序消息即保证消息 FIFO。

分布式事务一致性

交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。

大数据分析

数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列 RocketMQ 版与流式计算引擎相结合,可以很方便的实现将业务数据进行实时分析。

分布式缓存同步

天猫双 11 大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过消息队列 RocketMQ 版构建分布式缓存,实时通知商品数据的变化。

1、配置pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!--lombok-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

<!--阿里RocketMQ-->
<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.0.Final</version>
</dependency>

2、配置application.properties

server.port=8888
#rocketmq配置
#鉴权用AccessKeyId在阿里云服务器管理控制台创建
rocketmq.accessKey=accessKey
#鉴权用AccessKeySecret在阿里云服务器管理控制台创建
rocketmq.secretKey=secretKey
#tcp长连接,设置TCP接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
rocketmq.namesrvAddr=http://MQ_INST_15namesrvAddr7I.cn-hangzhou.mq-internal.aliyuncs.com:8080
#mq主题,,您在控制台创建的topic
rocketmq.topic=topic
#mq组名,您在控制台创建的 Group ID
rocketmq.groupId=groupId

以上参数均可在阿里控制台中找到

3、配置类

package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Properties;

/**
 * RocketMQ配置
 * @author RickSun && iFillDream
 * @date 2020/01/10 15:58
 * @Copyright "轻梦致新"即"iFillDream"微信公众号所有
 */
@Configuration
public class RocketMQConfig {

    @Value("${rocketmq.accessKey}")
    public String accessKey;
    public static String ACCESS_KEY;

    @Value("${rocketmq.secretKey}")
    public String secretKey;
    public static String SECRET_KEY;

    @Value("${rocketmq.namesrvAddr}")
    public String namesrvAddr;
    public static String NAMESRV_ADDR;

    @Value("${rocketmq.groupId}")
    public String groupId;
    public static String GROUP_ID;

    @Value("${rocketmq.topic}")
    public String topic;
    public static String TOPIC;

    /**
     * 配置RocketMq参数
     * @return Properties
     */
    public Properties getProperties() {
        Properties properties = new Properties();
        //您在控制台创建的GroupID
        properties.put(PropertyKeyConst.GROUP_ID, groupId);
        // 鉴权用AccessKeyId在阿里云服务器管理控制台创建
        properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
        // 鉴权用AccessKeySecret在阿里云服务器管理控制台创建
        properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
        //延时时间
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // 顺序消息消费失败进行重试前的等待时间单位(毫秒)
        properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
        // 消息消费失败时的最大重试次数
        properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
        // 设置TCP接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
        properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
        return properties;
    }

    /**
     * 初始化静态常量
     */
    @PostConstruct
    public void init(){
        ACCESS_KEY = this.accessKey;
        SECRET_KEY = this.secretKey;
        NAMESRV_ADDR = this.namesrvAddr;
        GROUP_ID = this.groupId;
        TOPIC = this.topic;
    }
}

4、RocketMQ工具

package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Properties;

/**
 * RocketMQ工具
 * @author RickSun && iFillDream
 * @date 2020/01/10 16:07
 * @Copyright "轻梦致新"即"iFillDream"微信公众号所有
 */
@Component
@Slf4j
public class MQUtil {

    @Autowired
    private RocketMQConfig rocketMQConfig;

    /**
     * 发送普通消息
     * @param content 内容
     * @param tag 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在消息队列
     */
    public void sendMessage(String content,String tag){
        Message message = new Message();
        message.setBody(content.getBytes());
        message.setTopic(RocketMQConfig.TOPIC);
        message.setTag(tag);
        this.sendCustomerMessage(message);
    }

    /**
     * 发送定时任务
     * @param content   内容
     * @param tag   标签
     * @param delayTime 定时时间
     */
    public void sendDelayMessage(String content,String tag,long delayTime){
        Message message = new Message();
        message.setBody(content.getBytes());
        message.setTopic(RocketMQConfig.TOPIC);
        message.setTag(tag);
        /**
         * 单位毫秒(ms)
         * 在指定时间戳(当前时间之后)进行投递
         * 例如 2016-03-07 16:21:00 投递
         * 如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者
         */
        message.setStartDeliverTime(System.currentTimeMillis() delayTime);
        this.sendCustomerMessage(message);
    }

    /**
     * 发送消息
     * @param message
     */
    private void sendCustomerMessage(Message message) {
        Properties properties=rocketMQConfig.getProperties();
        Producer producer = ONSFactory.createProducer(properties);
        //在发送消息前,必须调用start方法来启动Producer,只需调用一次即可
        producer.start();
        try {
            SendResult sendResult = producer.send(message);
            // 同步发送消息,只要不抛异常就是成功
            if (sendResult != null) {
                log.info("消息发送成功:messageID:" sendResult.getMessageId());
            }
        } catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            e.printStackTrace();
        }
        //在应用退出前,销毁Producer对象
        producer.shutdown();
    }
}

5、标签

package com.ifilldream.rocketmq_lean.mq;

/**
 * RocketMQ Tag业务标签
 * @author RickSun && iFillDream
 * @date 2020/01/10 16:32
 * @Copyright "轻梦致新"即"iFillDream"微信公众号所有
 */
public class MqTag {
    /**
     * 根据业务老创建标签
     */
    //测试1
    public final static String ROCKETMQTEST1 = "ROCKETMQ_TEST1";
    //测试2
    public final static String ROCKETMQTEST2 = "ROCKETMQ_TEST2";
}

6、消费者

package com.ifilldream.rocketmq_lean.mq;
import com.aliyun.openservices.ons.api.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Properties;

/**
 * RocketMQ消费者
 * @author RickSun && iFillDream
 * @date 2020/01/10 16:29
 * @Copyright "轻梦致新"即"iFillDream"微信公众号所有
 */
@Component
@Slf4j
public class RocketMQConsumer {

    @Autowired
    private RocketMQConfig rocketMQConfig;

    /**
     * 订阅消息,处理业务
     */
    public void normalSubscribe() {
        Properties properties = rocketMQConfig.getProperties();
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(RocketMQConfig.TOPIC, "", new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
                try {
                    //接收到的消息内容
                    String msg = new String(message.getBody(), "UTF-8");
                    String tag = message.getTag();
                    switch (tag) {
                        case MqTag.ROCKETMQTEST1:
                            log.info("收到消息messageID:"   message.getMsgID()   " msg:"   msg);
                            //TODO do something
                            break;
                        case  MqTag.ROCKETMQTEST2:
                            log.info("收到消息messageID:"   message.getMsgID()   " msg:"   msg);
                            //TODO do something
                            break;
                    }
                    return Action.CommitMessage;
                } catch (Exception e) {
                    log.info("消费失败:messageID:"   message.getMsgID());
                    e.printStackTrace();
                    return Action.ReconsumeLater;
                }
            }
        });
        consumer.start();
    }
}

7、消费者启动监听

package com.ifilldream.rocketmq_lean.mq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

/**
 * RocketMQ启动监听
 * @author RickSun && iFillDream
 * @date 2020/01/10 16:07
 * @Copyright "轻梦致新"即"iFillDream"微信公众号所有
 */
@Component
public class RocketConsumerListener implements CommandLineRunner {

    @Autowired
    private RocketMQConsumer rocketMQConsumer;

    @Override
    public void run(String... args) {
        System.out.println("========rocketMQ消费者启动==========");
        rocketMQConsumer.normalSubscribe();
    }
}

8、接口

package com.ifilldream.rocketmq_lean.controller;
import com.ifilldream.rocketmq_lean.mq.MQUtil;
import com.ifilldream.rocketmq_lean.mq.MqTag;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;

/**
 * @ClassName RocketController
 * @Author RickSun && iFillDream
 * @Date 2020/1/10 15:18
 * @Version 1.0
 */
@RestController
@RequestMapping("/ifilldream/rocketmq")
public class RocketController {

    @Resource
    private MQUtil mqUtil;

    @GetMapping("/test")
    public String test(String content) {
        return content;
    }

    @GetMapping("/test1")
    public String test1(String content) {
        mqUtil.sendMessage(content, MqTag.ROCKETMQTEST1);
        mqUtil.sendDelayMessage("测试", MqTag.ROCKETMQTEST1, 1000L);
        return "success";
    }

    @GetMapping("/test2")
    public String test2(String content) {
        mqUtil.sendMessage(content, MqTag.ROCKETMQTEST2);
        mqUtil.sendDelayMessage("测试", MqTag.ROCKETMQTEST2,3000L);
        return "success";
    }

}

此时代码完毕,在Linux服务器上运行项目Jar包,浏览器中输入:xx.xx.xx.xx:8888/ifilldream/rocketmq/test1?content=nihao即可看到效果;SpringBoot整合阿里RocketMQxx.xx.xx.xx为服务器的IP或域名,运行效果如下:SpringBoot整合阿里RocketMQ以上代码亲测可用,更多详情请关注阿里官方文档https://help.aliyun.com/product/29530.html?spm=a2c4g.11186623.6.540.1a4b7e805ygc75

统一首发平台为微信公众号"轻梦致新",搜索关注公众号,第一时间阅读最新内容。