欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springBoot+rabbitMq集成demo

程序员文章站 2022-07-12 12:28:58
...

一.为什么使用消息队列

1.解耦:多个系统同时需要某个系统提供数据,例如用户中心系统提供用户数据给多个系统,如果新增一个系统又需要用户中心系统提供数据,那么用户中心还得改代码重新打包,这样用户中心实际上和这一些相关的系统紧密耦合在一起了。如果用了MQ,发布订阅模式,那么用户中心系统只需要将用户数据往消息队列里面存放,其他的消费者自己去获取数据,这样就解耦了吧
2.异步:用户注册成功发送短信提示用户注册成功,首先用户注册得写注册信息到msql吧,然后再调用短信发送接口,短信发送成功响应,然后再告诉前台用户注册成功,万一短信注册有延迟,这下响应时间就比较长了,所以用户注册成功直接将用户手机号放在队列里面,然后直接给用户提示注册成功,至于短信,那就自己去队列里面获取手机号发送短信,这样是不是响应时间就短了。
3.削锋:大量得请求进来,服务无法响应,将请求得信息放队列里面慢慢响应,是不是很靠谱。至少不会保证服务器崩溃吧。

二.为什么使用rabbitMq

这市面上常见得MQ:activieMq rabbitMq rocketMq kafka 这四位大佬。actiiveMq 和rabbitMq得吞吐量不如rocketMq 和kafka 但是时效性呢,rabbitMq是最强得,毕竟是erlang写得,惹不起惹不起,kafka是大数据专用,惹不起惹不起,社区活跃度,activeMq 感觉社区活跃度真的不如剩下得三位大佬活跃,所以,activeMq出局(有时候真的感觉,社区活不活跃真的是很重要的),再说下剩下的三位大佬。rocketMq是阿里出品,别出精品。而且是java写的。所以在自己修改肯定是强于rabbitMq的,kafka大数据专用,感觉一般的项目没有必要用这么重的框架(有时候真的不是用最牛的技术就最牛逼还得结合场景,用的不好反而是自己给自己挖坑),这下就只剩rocketMq和rabbitMq了,这两个都挺不错,但是rabbitMq还专门提供了管理界面,还有windows版本,入手快,缺点,erlang开发,自己去改代码,那简直是找刺激,但是社区比较活跃,更新速度快,rocketMq说实在的真的比rabbitMq要强,但是最后为撒选rabbitMq呢,技术选型真的还是要看使用场景,如果中小型项目用rabbitMq足矣,并且还要丰富的管理界面便于上手,哎遍不下去了我菜

三.开始集成mq吧

1.pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.6.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.mq.cn</groupId>
	<artifactId>mq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>mq</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<!--MQ-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
			<version>1.5.2.RELEASE</version>
		</dependency>

		<!--日志-->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>

		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>



	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

2.配置消息队列的配置信息:application.properties


server.port=7010
server.servlet.context-path=/test-mq

#对于rabbitMQ的支持
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#生产者回调确认
spring.rabbitmq.publisher-confirm-type=simple
#生产者发送消息回调
spring.rabbitmq.publisher-returns=true

# 消费手动确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3.配置mq的配置信息. config

package com.mq.cn.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;


/**
 * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
 * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
 * Queue:消息的载体,每个消息都会被投到一个或多个队列。
 * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
 * Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
 * vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
 * Producer:消息生产者,就是投递消息的程序.
 * Consumer:消息消费者,就是接受消息的程序.
 * Channel:消息通道,在客户端的每个连接里,可建立多个channel.
 */

@Configuration
@Slf4j
public class RabbitConfig {

    public static final String FANOUT_EXCHANGE = "fanoutExchange";


    public static final String EXCHANGE_A = "my-mq-exchange_A";
    public static final String EXCHANGE_B = "my-mq-exchange_B";
    public static final String EXCHANGE_C = "my-mq-exchange_C";


    public static final String QUEUE_A = "QUEUE_A";
    public static final String QUEUE_B = "QUEUE_B";
    public static final String QUEUE_C = "QUEUE_C";

    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
    public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";


    @Resource
    private RabbitTemplate rabbitTemplate;


    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     * HeadersExchange :通过添加属性key-value匹配
     * DirectExchange:按照routingkey分发到指定队列
     * TopicExchange:多关键字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_A);
    }

    /**
     * 获取队列A
     *
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //队列持久
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }


    @Bean
    public Queue queueB() {
        return new Queue(QUEUE_B, true); //队列持久
    }

    @Bean
    public Binding bindingB() {
        return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B);
    }


    /**
     * 广播消息的配置
     */
    //把所有的队列都绑定到这个交换机上去


    //配置fanout_exchange
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeA(Queue queueA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueA).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueB).to(fanoutExchange);
    }


}

4.配置生产者

package com.mq.cn.util;

import com.mq.cn.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * Mq的消息生产者
 */
@Component
@Slf4j
public class MqProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Autowired
    public MqProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 发送消息
     * @param content
     */
    public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
    }


    public void sendAll(String content) {
        rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"", content);
    }


    /**
     * 生产者回调
     * @param correlationData
     * @param act
     * @param s
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean act, String s) {
       log.info("confirm:"+correlationData.getId()+"----"+act);
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化对象输出
        System.out.println("消息主体: " + SerializationUtils.deserialize(message.getBody()));
        System.out.println("应答码: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);
    }

}

5.配置消费者

package com.mq.cn.util;

import com.mq.cn.config.RabbitConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;

/**
 * 接收指定队列A的消费者
 */
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
@Slf4j
public class MqCustomer {

    @RabbitHandler

    public void process(String hello,Channel channel, Message message) throws IOException {
       log.info("HelloReceiver收到  : " + hello +"收到时间"+new Date());
        try {
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.info("receiver success");
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            log.info("receiver fail");
        }

    }
}

5.最后测试一下
springBoot+rabbitMq集成demo
可以看见生产者发送信息,消费者也接受了信息

四.补充总结

其实rabbitmq和springBoot集成还是非常简单的
但是要注意一下几个点:

#生产者回调确认
spring.rabbitmq.publisher-confirm-type=simple
#生产者发送消息回调
spring.rabbitmq.publisher-returns=true

这地方这样配置,是为了保证服务器向MQ发送数据的时候,有个回调机制,可以保证服务器到MQ是数据的完整性。如果发送失败

/**
     * 生产者回调
     * @param correlationData
     * @param act
     * @param s
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean act, String s) {
       log.info("confirm:"+correlationData.getId()+"----"+act);
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化对象输出
        System.out.println("消息主体: " + SerializationUtils.deserialize(message.getBody()));
        System.out.println("应答码: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交换器 exchange : " + exchange);
        System.out.println("消息使用的路由键 routing : " + routingKey);
    }

confim方法中的act会返回false,并且returnedMessage将会被执行,有了这个机制,我们是不是可以在这地方做一些补偿发送机制呢

另外再说下这个配置

# 消费手动确认
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

默认的是 AUTO 自动确认
这里改成手动确认模式
如果消费者拿到数据后就自动确认,如果没有入库或者中途出现异常,则会导致数据丢失,因此全局开启手动确认模式,可以保证消息队列到消费之间的数据是可靠的

相关标签: rabbitmq java