Spring AMQP整合RabbitMQ
RabbitAdmin
-
RabbitAdmin类可以很好的操作RabbitMQ, 在Spring中直接进行注入即可
@Bean public RabbitAdmin rabbitAdmin(ConnectionFactory ConnectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; }
-
注意 : autoStartup必须要设置为true, 否则Spring容器就不会加载RabbitAdmin类
-
RabbitAdmin底层实现就是从Spring容器中获取Exchange, Binding, RoutingKey已及Queue的@Bean声明
-
然后RabbitAdmin底层使用RabbitTemplate的excute方法执行对应的声明, 修改, 删除等一系列RabbitMQ基础功能操作, 如添加一个交换机, 删除一个绑定, 清空一个队列等
添加Maven依赖 :
<properties>
<spring.version>4.3.20.RELEASE</spring.version>
<junit.version>4.12</junit.version>
<spring-rabbit.version>1.7.5.RELEASE</spring-rabbit.version>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-rabbit.version}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
</dependencies>
XML方式配置
方式一 : 使用标签模式配置
<!--配置ConnectionFactory, 指定连接rabbitMQ的参数-->
<rabbit:connection-factory id="connectionFactory" host="192.168.72.138" port="5672" virtual-host="/" username="guest" password="guest"/>
<!--指定RabbitAdmin, auto-startup默认为true-->
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
方式二: 使用Spring Bean的方式配置
<!--配置ConnectionFactory, 指定连接rabbitMQ的参数-->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="192.168.72.138"/>
<property name="port" value="5672"/>
<property name="virtualHost" value="/"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
<!--配置rabbitAdmin, auto-startup默认为true-->
<bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
<constructor-arg ref="connectionFactory"/>
</bean>
测试代码:
使用Spring集成Junit进行测试
// 用于Spring集成Junit, 添加到测试类上面
// @RunWith(SpringJUnit4ClassRunner.class)
// @ContextConfiguration("classpath:application.xml")
@Test
public void testRabbitAdmin() {
// 创建Direct模式Exchange
rabbitAdmin.declareExchange(new DirectExchange("test_spring_declare_exchange", false, false));
// 创建Topic模式Exchange
rabbitAdmin.declareExchange(new TopicExchange("test_spring_topic_exchange", false, false));
// 创建Fanout
rabbitAdmin.declareExchange(new FanoutExchange("test_spring_fanout_exchange", false, false));
// 创建队列
rabbitAdmin.declareQueue(new Queue("test_spring_direct_queue", false));
rabbitAdmin.declareQueue(new Queue("test_spring_topic_queue", false));
rabbitAdmin.declareQueue(new Queue("test_spring_fanout_queue", false));
// 可以直接创建绑定, new的Queue和Exchange必须在上面声明过
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test_spring_topic_queue", false))
.to(new TopicExchange("test_spring_topic_exchange", false, false))
.with("sping.*"));
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue("test_spring_fanout_queue", false))
.to(new FanoutExchange("test_spring_fanout_exchange", false, false)));
}
Java类配置方式
配置类:
package com.qiyexue.annotation;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* 基于Java类的方式配置
*
* @author 七夜雪
* @date 2018-12-19 20:28
*/
@Configuration
@ComponentScan("com.qiyexue.*")
public class RabbitConfig {
/**
* 配置ConnectionFactory
* @return
*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setAddresses("192.168.72.138:5672");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
return factory;
}
/**
* 声明RabbitAdmin
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 需要将AutoStartup设置为true
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
测试类:
测试类和基于xml配置的测试类是一样的, 只是在Spring启动的注解上修改一下, 改成加载类配置, 而不是加载配置文件配置,测试类声明如下:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={RabbitConfig.class})
public class RabbitAdminTest {
@Autowired
private RabbitAdmin rabbitAdmin;
}
SpringAMQP声明
- 使用SpringAMQP去声明, 可以基于Spring的xml配置去声明, 也可以使用基于Java类配置去声明
- 只Spring容器启动之后, 下面的声明就会自动被声明到RabbitMQ中
XML格式声明
<!--声明topic模式exchange-->
<rabbit:topic-exchange name="topic002" durable="true"
auto-delete="false">
<!--声明exchange和queue之间的绑定-->
<rabbit:bindings>
<rabbit:binding pattern="huangquan.*" queue="queue002"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="queue002" durable="true" exclusive="false"
auto-delete="false">
</rabbit:queue>
<!--声明direct模式exchange-->
<rabbit:direct-exchange name="direct003" durable="true" auto-delete="false">
<!--声明exchange和queue之间的绑定-->
<rabbit:bindings>
<rabbit:binding queue="queue003" key="hongchen"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:queue name="queue003" durable="true" exclusive="false"
auto-delete="false">
</rabbit:queue>
Java类配置方式声明
/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* HeadersExchange :通过添加属性key-value匹配
* DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
*/
@Bean
public TopicExchange exchange001(){
TopicExchange exchange = new TopicExchange("topic001", false, false);
return exchange;
}
/**
* 声明队列
* @return
*/
@Bean
public Queue queue001(){
return new Queue("queue001", false);
}
/**
* 声明绑定
* @return
*/
@Bean
public Binding binding001(){
return BindingBuilder.bind(queue001()).to(exchange001()).with("biluo.*");
}
RabbitTemplate
- 消息模板, 在与SpringAMQP整合的时候, 进行发送消息的关键类
- 该类提供了丰富的消息发送方法, 包括可靠性消息投递方法, 回调监听消息接口ConfirmCallback, 返回值确认接口ReturnCallback等。
- 将此类注入到Spring容器中, 就可以直接使用了
- 在与Spring整合时, 需要进行实例化, 但是在与SpringBoot整合时, 直接在配置文件里添加配置即可
XML配置
<!--方式一 : 使用rabbit:template标签配置-->
<!--<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>-->
<!--方式二 : 使用普通Spring Bean方式配置-->
<bean id="rabbitTemlate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
</bean>
Java类配置方式
/**
* RabbitTemplate配置
* RabbitTemplate主要用于消息发送
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
RabbitTemplate常用API
发送普通消息:
// 设置Message属性
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("name", "七夜雪");
messageProperties.getHeaders().put("age", 18);
Message message = new Message("听雪楼中听雪落".getBytes(), messageProperties);
// 使用Send方法必须传入message类型
rabbitTemplate.send("topic001", "biluo.test", message);
// 使用convertAndSend方法, 可以使用String类型, 或者Object类型消息, 会自动转换
rabbitTemplate.convertAndSend("topic002", "huangquan.test", "上穷碧落下黄泉", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.out.println("----------添加额外设置------------");
message.getMessageProperties().getHeaders().put("name", "黄泉");
message.getMessageProperties().getHeaders().put("company", "听雪楼");
return message;
}
});
SimpleMessageListenerContainer
- 简单消息监听容器
- 这个类功能很强大, 对于消费者的配置项, 这份都可以实现
- 监听队列(支持多个队列), 自动启动, 自动声明等
- 设置事务特性, 事务管理器, 事务属性, 事务容量(并发), 是否开启事务, 回滚消息等
- 可以设置消费者数量, 最小最大数量, 批量消费等
- 设置消息确认和自动确认模式, 是否重回队列, 异常捕获Handler函数
- 设置消费者标签生成策略, 是否独占模式, 消费者属性等
- 设置具体的消息监听器, 消息转换器等
- SimpleMessageListenerContainer可以进行动态设置, 比如在运行中的应用可以动态的修改其消费者数量的大小, 接收消息的模式等, 很多基于RabbitMQ的定制的后端管控台进行动态设置的时候, 也是基于这一特性去实现的
XML配置
自定义MessageListener:
package com.qiyexue.xml;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
/**
* 自定义MessageListener
*
* @author 七夜雪
* @date 2018-12-23 8:31
*/
public class MyMessageListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println(msg);
}
}
<bean id="myMessageListener" class="com.qiyexue.xml.MyMessageListener"/>
<!--方法一 : 使用rabbit:listener-container标签设置listener-container-->
<rabbit:listener-container connection-factory="connectionFactory"
auto-startup="true"
acknowledge="auto"
max-concurrency="5"
concurrency="1"
requeue-rejected="false">
<!--使用queue-names="queue001"也可以, 但是queues和queue-names不能同时使用-->
<rabbit:listener ref="myMessageListener" queues="queue002,queue003"/>
</rabbit:listener-container>
<!--方法二 : 使用普通Spring Bean方式设置MessageListenerContainer-->
<bean id="simpleMessageListenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="autoStartup" value="true"/>
<property name="acknowledgeMode" value="AUTO"/>
<property name="concurrentConsumers" value="1"/>
<property name="defaultRequeueRejected" value="false"/>
<property name="consumerTagStrategy" ref="myConsumerTag"/>
<property name="messageListener" ref="myMessageListener"/>
<property name="queueNames" value="queue001,queue002,queue003"/>
</bean>
Java类配置方式
配置类中添加如下配置:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
// 添加要监听的队列
// container.setQueues(queue001());
// 添加要监听的队列, 传入队列名, 和setQueues使用其中一个即可
container.setQueueNames("queue001","queue002", "queue003");
// 设置当前消费者格式
container.setConcurrentConsumers(1);
// 设置最大消费者个数
container.setMaxConcurrentConsumers(5);
// 是否重回队列
container.setDefaultRequeueRejected(false);
// 设置签收模式
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setExposeListenerChannel(true);
// 设置消费者标签
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 设置监听MessageListener
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println(msg);
}
});
return container;
}
测试直接使用RabbitTemplate发送消息即可
MessageListenerAdapter
-
适配器模式监听消息
-
defaultListenerMethod默认监听方法名称, 用于设置监听方法名称, 默认handleMessage
-
Delegate委托对象, 委派设计模式, 实际真实的委托对象, 用于处理消息
-
queueOrTagToMethodName队列标识与方法名称组成的集合
- 可以一一进行队列与方法名称的匹配
- 队列与方法名称绑定, 即指定队列里的消息会被绑定的方法所接受处理
XML方式配置
<!--使用MessageListenerAdapter-->
<bean id="messageListenerAdapter"
class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<ref bean="messageDelegate"/>
</constructor-arg>
</bean>
<!--委托对象-->
<bean id="messageDelegate" class="com.qiyexue.adapter.MessageDelegate"/>
Java类方式配置
/**
* 适配器方式 : 默认的方法名字的:handleMessage
* 可以自己指定方法名
* 也可以添加一个转换器, 将字节数组转换为String, 默认简单消息也是会转换成String的
*/
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
/**
* 适配器模式, 队列与方法名绑定
*/
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
Map<String, String> queueToMethodMap = new HashMap<>();
queueToMethodMap.put("queue001", "queue1Method");
queueToMethodMap.put("queue002", "queue2Method");
adapter.setQueueOrTagToMethodName(queueToMethodMap);
container.setMessageListener(adapter);
测试方法 :
@Test
public void testSendMessage4Text() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("听雪楼中听雪落...".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "biluo.test", message);
rabbitTemplate.send("topic002", "huangquan.test", message);
}
MessageConverter
-
在发送消息的时候, 正常情况下消息体是以二进制的数据方式进行传输, 如果希望内部进行转换, 或者指定自定义的转换器, 就需要用到MessageConverter
-
自定义常用转换器 : MessageConverter, 一般来讲,都需要实现这个接口
实现下面两个方法:
- toMessage : Java对象转换为Message
- fromMessage : Message对象转换成java对象
-
转换器类型
- Json转换器 : Jackson2JsonMessageConverter, 可以进行java对象的转换功能
- DefaultJackson2JavaTypeMapper映射器 : 可以进行java对象的映射
- 自定义二进制转换器 : 如图片,PDF, PPT等, 可以将多个转换器放到一个全局转换器ContentTypeDelegatingMessageConverter中
推荐阅读
-
十、Spring boot 简单优雅的整合 Swagger2
-
Mybatis整合spring(适合小白)
-
spring整合struts2过程详解
-
Spring-Data-JPA整合MySQL和配置的方法
-
Spring Boot整合WebSocket
-
微服务架构下使用Spring Cloud Zuul作为网关将多个微服务整合到一个Swagger服务上
-
spring MVC扩展和SSM整合
-
SSM(Spring+SpringMVC+Mybatis)框架整合
-
spring boot整合hessian的示例
-
spring cloud 入门系列八:使用spring cloud sleuth整合zipkin进行服务链路追踪