熟练运用RabbitMQ,整合Spring AMQP
程序员文章站
2022-07-12 12:29:22
...
1.RabbitAdmin介绍
依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<!--Rabbit SpringBoot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
这里我直接使用springboot的依赖,因为springboot已经对spring做了一个封装,所以我用springboot的依赖,但是没使用springboot的相关特性。
配置类
package com.ep.rabbitmq.spring.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author zht
* @version 1.0
* @createDate 2020/06/25 14:00
*/
@Configuration
public class RabbitMQConfig {
/**注入rabbitMQ连接工厂*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
cachingConnectionFactory.setHost("127.0.0.1:5672");
cachingConnectionFactory.setVirtualHost("/");
cachingConnectionFactory.setUsername("guest");
cachingConnectionFactory.setPassword("guest");
return cachingConnectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
注意:autoStartup必须设置为true,否则spring容器不会加载RabbitMQ。
简单测试
@Test
public void testAdmin() throws Exception {
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.topic.queue", false)) //直接创建队列
.to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系
.with("user.#")); //指定路由Key
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));
//清空队列数据
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
注入Exchange,Bingding,Queue
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
return new Queue("queue001", true); //队列持久
}
@Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
}
@Bean
public TopicExchange exchange002() {
return new TopicExchange("topic002", true, false);
}
@Bean
public Queue queue002() {
return new Queue("queue002", true); //队列持久
}
@Bean
public Binding binding002() {
return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");
}
@Bean
public Queue queue003() {
return new Queue("queue003", true); //队列持久
}
@Bean
public Binding binding003() {
return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*");
}
@Bean
public Queue queue_image() {
return new Queue("image_queue", true); //队列持久
}
@Bean
public Queue queue_pdf() {
return new Queue("pdf_queue", true); //队列持久
}
}
总结
RabbitMQAdmin底层实现就是从Spring容器中获取Exchange,Bingding,RoutingKey以及Queue的@Bean申明,然后使用RabbitTemplate的executie方法执行对应的声明,修改,删除等一系列RabbitMQ基础功能操作。
2.消息模板-RabbitTemplate
介绍
我们已经依赖注入了Exchange,Queue,Bingding,那么Spring中如何进行消息发送呢?Spring为我们提供了RabbitTemplate即消息模板,我们在与SpringAMQP整合的时候进行发送消息的关键类。
该类提供了丰富的发送消息方法,包括可靠性消息投递方法,回调监听消息接口Confirmback,返回值确认接口ReturnCllback等等。同样我们需要将其注入到Spring容器中,然后直接使用。
运用
- 配置类中注入RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
- 测试类中进行消息的发送测试
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加额外的设置---------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
});
}
@Test
public void testSendMessage2() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}
@Test
public void testSendMessage4Text() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.send("topic002", "rabbit.abc", message);
}
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("消息订单");
order.setContent("描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
@Test
public void testSendJavaMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("订单消息");
order.setContent("订单描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType("application/json");
messageProperties.getHeaders().put("__TypeId__", "com.bfxy.spring.entity.Order");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
@Test
public void testSendMappingMessage() throws Exception {
ObjectMapper mapper = new ObjectMapper();
Order order = new Order();
order.setId("001");
order.setName("订单消息");
order.setContent("订单描述信息");
String json1 = mapper.writeValueAsString(order);
System.err.println("order 4 json: " + json1);
MessageProperties messageProperties1 = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order");
Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
Packaged pack = new Packaged();
pack.setId("002");
pack.setName("包裹消息");
pack.setDescription("包裹描述信息");
String json2 = mapper.writeValueAsString(pack);
System.err.println("pack 4 json: " + json2);
MessageProperties messageProperties2 = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}
@Test
public void testSendExtConverterMessage() throws Exception {
// byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
// MessageProperties messageProperties = new MessageProperties();
// messageProperties.setContentType("image/png");
// messageProperties.getHeaders().put("extName", "png");
// Message message = new Message(body, messageProperties);
// rabbitTemplate.send("", "image_queue", message);
byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/pdf");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "pdf_queue", message);
}
3.SimpleMessageListenerContainer
功能特性
简单消息监听容器
- 设置事务特性,事务管理器,事务属性,事务容量,是否开启事务,回滚消息等。
- 设置消费者数量,最大最小数量,批量消费
- 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
- 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数。
- z设置消费者标签生成策略、是否独占模式、消费者属性等
- 设置具体的监听器、消息转换器等等。
运用
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//监听队列
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//当前消费者数量
container.setConcurrentConsumers(1);
//设置最大消费者数量
container.setMaxConcurrentConsumers(5);
//设置是否重回队列
container.setDefaultRequeueRejected(false);
//设置签收模式 自动签收,手动签收(限流和重回)
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setExposeListenerChannel(true);
//设置消费端独立标签
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
/**
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消费者: " + msg);
}
});
*/
/**
* 1 适配器方式. 默认是有自己的方法名字的:handleMessage
// 可以自己指定一个方法的名字: consumeMessage
// 也可以添加一个转换器: 从字节数组转换为String
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
*/
/**
* 2 适配器方式: 我们的队列名称 和 方法名称 也可以进行一一的匹配
*
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
container.setMessageListener(adapter);
*/
// 1.1 支持json格式的转换器
/**
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
*/
// 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换
/**
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
*/
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
/**
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.bfxy.spring.entity.Order.class);
idClassMapping.put("packaged", com.bfxy.spring.entity.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
*/
//1.4 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的转换器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
return container;
}
总结
注意: SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特去实现的。所以可以看出SpringAMQP非常的强大