欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ整合Spring AMQP实战

程序员文章站 2022-03-05 18:27:25
...

一、RabbitAdmin应用

  • RabbitAdmin:使用RabbitTemplate的execute方法执行对应的声明、修改、删除等RabbitMQ基础功能操作。
  • 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等。
  • 注意:setAutoStartup必须设置为true,否则Spring容器不会加载RabbitAdmin类
  • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明

maven 依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

新建配置类

package com.example.demo.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Configurable
@Component
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("10.39.30.42:5672");
        connectionFactory.setUsername("fnuser");
        connectionFactory.setPassword("fnuser");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

单元测试

    @Test
    public void testAdmin() throws Exception {
        rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));

        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));

        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));

        rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));

        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));

        rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));

 //----------------------------第二种声明方式  建造者------------------------------------------------------

        rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                Binding.DestinationType.QUEUE,
                "test.direct", "direct", new HashMap<>()));

        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("test.topic.queue", false))		//直接创建队列
                        .to(new TopicExchange("test.topic", false, false))	//直接创建交换机 建立关联关系
                        .with("user.#"));	//指定路由Key


        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("test.fanout.queue", false))
                        .to(new FanoutExchange("test.fanout", false, false)));

        //清空队列数据
        rabbitAdmin.purgeQueue("test.topic.queue", false);
    }

二、RabbitMQ声明式配置

使用Spring @Bean注解声明式配置

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(new Queue("declare.queue"))
                .to(new TopicExchange("test.topic", false, false))
                .with("declare.routingKey");
    }

三、RabbitTemplate 消息模板

  • SpringAMQP发送消息的关键类
  • 该类提供可靠性投递方法回调监听消息接口ConfirmCallback返回值确认ReturnCallback等
  • 在与Spring整合时需要实例化,在SpringBoot整合中,只需要在配置文件添加配置即可。
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("10.39.30.42:5672");
        connectionFactory.setUsername("fnuser");
        connectionFactory.setPassword("fnuser");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

单元测试

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	
	@Test
	public void testSendMessage() throws Exception {
		//1 创建消息
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.getHeaders().put("desc", "信息描述..");
		messageProperties.getHeaders().put("type", "自定义消息类型..");
		Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
		
		rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
			@Override
			public Message postProcessMessage(Message message) throws AmqpException {
				System.err.println("------添加额外的设置---------");
				message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
				message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
				return message;
			}
		});
	}
	
	@Test
	public void testSendMessage2() throws Exception {
		//1 创建消息
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setContentType("text/plain");
		Message message = new Message("mq 消息1234".getBytes(), messageProperties);
		
		rabbitTemplate.send("topic001", "spring.abc", message);
		
		rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
		rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
	}

四、SimpleMessageListenerContainer

  • 设置消息确认自动确认模式是否重回队列、异常捕获handler函数
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器等
  • 注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者的数量的大小、接受消息的模式等
  • SimpleMessageListenerContainer为什么可以感知配置变更
    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //监听多个消息队列
        container.setQueueNames("test_queue001","test_queue002","test_queue003");
        //setConcurrentConsumers设置并发消费者
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(5);
        //重回队列
        container.setDefaultRequeueRejected(false);
        //签收模式 自动签收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);

        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.err.println("----------消费者: " + msg);
            }
        });
		return container;
    }
相关标签: RabbitMQ