RabbitMQ整合Spring AMQP实战
程序员文章站
2022-03-05 18:27:25
...
RabbitMQ整合Spring AMQP实战
一、RabbitAdmin应用
- RabbitAdmin:使用RabbitTemplate的execute方法执行对应的声明、修改、删除等RabbitMQ基础功能操作。
- 例如:添加一个交换机、删除一个绑定、清空一个队列里的消息等等。
- 注意:setAutoStartup必须设置为true,否则Spring容器不会加载RabbitAdmin类
- RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
maven 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.4.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
新建配置类
package com.example.demo.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Configurable
@Component
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("10.39.30.42:5672");
connectionFactory.setUsername("fnuser");
connectionFactory.setPassword("fnuser");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
单元测试
@Test
public void testAdmin() throws Exception {
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
//----------------------------第二种声明方式 建造者------------------------------------------------------
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.topic.queue", false)) //直接创建队列
.to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系
.with("user.#")); //指定路由Key
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));
//清空队列数据
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
二、RabbitMQ声明式配置
使用Spring @Bean注解声明式配置
@Bean
public Binding binding() {
return BindingBuilder.bind(new Queue("declare.queue"))
.to(new TopicExchange("test.topic", false, false))
.with("declare.routingKey");
}
三、RabbitTemplate 消息模板
- SpringAMQP发送消息的关键类
- 该类提供可靠性投递方法、回调监听消息接口ConfirmCallback、返回值确认ReturnCallback等
- 在与Spring整合时需要实例化,在SpringBoot整合中,只需要在配置文件添加配置即可。
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("10.39.30.42:5672");
connectionFactory.setUsername("fnuser");
connectionFactory.setPassword("fnuser");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
单元测试
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加额外的设置---------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
});
}
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}
四、SimpleMessageListenerContainer
- 设置消息确认与自动确认模式、是否重回队列、异常捕获handler函数
- 设置消费者标签生成策略、是否独占模式、消费者属性等
- 设置具体的监听器、消息转换器等
- 注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者的数量的大小、接受消息的模式等
- SimpleMessageListenerContainer为什么可以感知配置变更
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//监听多个消息队列
container.setQueueNames("test_queue001","test_queue002","test_queue003");
//setConcurrentConsumers设置并发消费者
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
//重回队列
container.setDefaultRequeueRejected(false);
//签收模式 自动签收
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消费者: " + msg);
}
});
return container;
}
上一篇: 怎样在H5页面实现数据交互
下一篇: 关于移动端H5页面中1px边框的解决方法
推荐阅读