【RabbitMQ】基本使用:Spring AMQP配置使用及SpringBoot整合
程序员文章站
2022-07-12 12:29:16
...
在上一篇 【RabbitMQ】Spring AMQP核心组件解析我们介绍了 Spring AMQP ,那它具体该如何使用呢?
1.xml配置方式
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/" username="guest"
password="guest" host="127.0.0.1" port="5672" />
<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />
1.1 Direct Exchange
<!--定义queue -->
<rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false"
declared-by="connectAdmin" />
<!--定义direct exchange,绑定MY_FIRST_QUEUE -->
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false"
declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey">
</rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="MY_DIRECT_EXCHANGE" />
<!--消息接收者 -->
<bean id="messageReceiver" class="com.mymq.consumer.FirstConsumer"></bean>
<!--queue listener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" />
</rabbit:listener-container>
<!--定义queue -->
<rabbit:queue name="MY_SECOND_QUEUE" durable="true" auto-delete="false" exclusive="false"
declared-by="connectAdmin" />
<!-- 将已经定义的Exchange绑定到MY_SECOND_QUEUE,注意关键词是key -->
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false"
declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_SECOND_QUEUE" key="SecondKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息接收者 -->
<bean id="receiverSecond" class="com.mymq.consumer.SecondConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_SECOND_QUEUE" ref="receiverSecond" />
</rabbit:listener-container>
1.2 Topic Exchange
<!--定义queue -->
<rabbit:queue name="MY_THIRD_QUEUE" durable="true" auto-delete="false" exclusive="false"
declared-by="connectAdmin" />
<!-- 定义topic exchange,绑定MY_THIRD_QUEUE,注意关键词是pattern -->
<rabbit:topic-exchange name="MY_TOPIC_EXCHANGE" durable="true" auto-delete="false"
declared-by="connectAdmin">
<rabbit:bindings>
<rabbit:binding queue="MY_THIRD_QUEUE" pattern="#.Third.#"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory"
exchange="MY_TOPIC_EXCHANGE" />
<!-- 消息接收者 -->
<bean id="receiverThird" class="com.mymq.consumer.ThirdConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_THIRD_QUEUE" ref="receiverThird" />
</rabbit:listener-container>
1.3 Fanout Exchange
<!--定义queue -->
<rabbit:queue name="MY_FOURTH_QUEUE" durable="true" auto-delete="false" exclusive="false"
declared-by="connectAdmin" />
<!-- 定义fanout exchange,绑定MY_FIRST_QUEUE 和 MY_FOURTH_QUEUE -->
<rabbit:fanout-exchange name="MY_FANOUT_EXCHANGE" auto-delete="false" durable="true"
declared-by="connectAdmin" >
<rabbit:bindings>
<rabbit:binding queue="MY_FIRST_QUEUE"></rabbit:binding>
<rabbit:binding queue="MY_FOURTH_QUEUE"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 消息接收者 -->
<bean id="receiverFourth" class="com.mymq.consumer.FourthConsumer"></bean>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="MY_FOURTH_QUEUE" ref="receiverFourth" />
</rabbit:listener-container>
2.SpringBoot整合
使用了SpringBoot后,我们不再需要些上面那么多的配置,因为在RabbitAutoConfiguration
中已经对Bean做了自动配置。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.1 定义队列、交换机
@Configuration
public class RabbitConfig {
// 声明Queue、Exchange、Binding.......
}
1.定义交换机
@Bean("directExchange") // 定义的name会在绑定时用到
public DirectExchange getDirectExchange() {
return new DirectExchange("DIRECT_EXCHANGE")
}
@Bean("topicExchange")
public TopicExchange getTopicExchange(){
return new TopicExchange("TOPIC_EXCHANGE");
}
@Bean("fanoutExchange")
public FanoutExchange getFanoutExchange(){
return new FanoutExchange("FANOUT_EXCHANGE");
}
2.定义队列
@Bean("firstQueue")
public Queue getFirstQueue(){
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl",6000); // 设置超时
Queue queue = new Queue("FIRST_QUEUE", false, false, true, args);
return queue;
}
@Bean("secondQueue")
public Queue getSecondQueue(){
return new Queue("SECOND_QUEUE");
}
@Bean("thirdQueue")
public Queue getThirdQueue(){
return new Queue("THIRD_QUEUE");
}
3.定义绑定
@Bean
public Binding bindFirst(@Qualifier("firstQueue") Queue queue, // 队列
@Qualifier("directExchange") TopicExchange exchange){ // 交换机
return BindingBuilder.bind(queue).to(exchange).with("my"); // 路由键
}
@Bean
public Binding bindSecond(@Qualifier("secondQueue") Queue queue,
@Qualifier("topicExchange") TopicExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("#.my.#"); // 路由键有通配符
}
@Bean
public Binding bindThird(@Qualifier("thirdQueue") Queue queue,
@Qualifier("fanoutExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange); // fanout没有路由键
}
2.2 Producer
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
@Component
public class MyProvider {
@Autowired
AmqpTemplate amqpTemplate;
public void send(){
// 发送4条消息
amqpTemplate.convertAndSend("DIRCET_EXCHANGE","my","a direct msg");
amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "msg", "a topic msg1");
amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "a.my.b", "a topic msg2");
amqpTemplate.convertAndSend("FANOUT_EXCHANGE","","a fanout msg");
}
}
2.3 Consumer
@Component
@RabbitListener(queues = "FIRST_QUEUE") // 指定监听的队列,
public class FirstConsumer {
@RabbitHandler // 有消息时,就会自动调用当前方法
public void process(String msg){
System.out.println(" first queue received msg : " + msg);
}
}
@Component
@RabbitListener(queues = {"SECOND_QUEUE","THIRD_QUEUE"}) // 配置一个消费者监听多个队列
public class SecondConsumer {
@RabbitHandler
public void process(String msg){
System.out.println(" second queue received msg : " + msg);
}
}
2.4 配置文件
全部配置总体上分成三大类:连接类、消息发送类、消息消费类。
1.连接配置
# rabbitmq连接基本配置
spring.rabbitmq.addresses=43.105.136.120:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
# 开启confirm机制
spring.rabbitmq.publisher-confirms=true
# 开启return模式
spring.rabbitmq.publisher-returns=true
# 配合return机制使用,表示接收路由不可达的消息
spring.rabbitmq.template.mandatory=true
属性值 | 说明 | 默认值 |
---|---|---|
address | 客户端连接的地址,有多个的时候用逗号分隔 改地址可以IP与port的结合 |
|
host | RabbitMQ的主机地址 | localhost |
port | RabbitMQ的端口号 | |
virtual-host | 连接到RabbitMQ的虚拟主机 | |
username | 登录到RabbitMQ的用户名 | |
password | 登录到RabbitMQ的密码 | |
ssl.enabled | 启用SSL支持 | false |
ssl.key-store | 保存SSL证书的地址 | |
ssl.key-store-password | 访问SSL证书的地址使用的密码 | |
ssl.trust-store | SSL的可信地址 | |
ssl.trust-store-password | 访问SSL的可信地址的密码 | |
ssl.algorithm | SSL算法,默认使用Rabbit的客户端算法库 | |
cache.channel.checkout-timeout | 当缓存已满时,获取Channel的等待时间,单位为毫秒 | |
cache.channel.size | 缓存中保持的Channe丨数量 | |
cache.connection.mode | 连接缓存的模式 | CHANNEL |
cache.connection.size | 缓存的连接数 | |
connnection-timeout | 连接超时参数单位为毫秒:设置为“0”代表无穷大 | |
dynamic | 默认创建—个AmqpAdmin的Beantrue |
注:Producer与Consumer都需要先配置RabbitMQ连接信息。
2.消息发送配置(Producer)
# 开启confirm机制
spring.rabbitmq.publisher-confirms=true
# 开启return模式
spring.rabbitmq.publisher-returns=true
# 配合return机制使用,表示接收路由不可达的消息
spring.rabbitmq.template.mandatory=true
属性值 | 说明 | 默认值 |
---|---|---|
publisher-confirms | 开启 Publisher Confirm 机制 | |
publisher-returns | 开启 Publisher Return 机制 | |
template.mandatory | 启用强制信息 | false |
template.receive-timeout | receive()方法的超时时间 | 0 |
template.reply-timeout | sendAndReceive()方法的超时时间 | 5000 |
template.retry.enabled | 设置为true的时候RabbitTemplate能够实现重试 | false |
template.retry.initial-interval | 第一次与第二次发布消息的时间间隔 | 1000 |
template.retry.max-attempts | 尝试发布消息的最大数量 | 3 |
template.retry.max-interval | 尝试发布消息的最大时间间隔 | 10000 |
template.retry.multiplier | 上一次尝试时间间隔的乘数 | 1.0 |
3.消息消费配置(Consumer)
# 设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
# 设置当前消费者数量(线程数)
spring.rabbitmq.listener.simple.concurrency=5
# 设置消费者最大并发数量
spring.rabbitmq.listener.simple.max-concurrency=10
属性值 | 说明 | 默认值 |
---|---|---|
listener.simple.acknowledge-mode | 容器的acknowledge模式 | |
listener.simple.auto-startup | 肩动时自动启动容器 | true |
listener.simple.concurrency | 消费者的最小数量 | |
listener.simple.default-requeue-rejected | 投递失败时是否重新排队 | true |
listener.simple.max-concurrency | 消费者的最大数量 | |
listener.simple.missing-queues-fatal | 容器上声明的队列不可用时是否失敗 | |
listener.simple.prefetch | 在单个请求中处理的消息个数,他应该大于等于事务数量 | |
listener.simple.retry.enabled | 不论是不是重试的发布 | false |
listener.simple.retry.initial-interval | 第一次与第二次投递尝试的时间间隔 | 1000ms |
listener.simple.retry.max-attempts | 尝试投递消息的最大数量 | 3 |
listener.simple.retry.max-interval | 两次尝试的最大时间间隔 | 10000ms |
listener.simple.retry.multiplier | 上一次尝试时间间隔的乘数 | 1.0 |
listener.simple.retry.stateless | 重试是有状态的还是无状态的 | true |
listener.simple.transaction-size | 在一个事务中处理的消息数量 = 为了获得最佳效果, 该值应设罝为小于等于每个请求中处理的消息个数,即 listener.prefetch 的值 |