欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【RabbitMQ】基本使用:Spring AMQP配置使用及SpringBoot整合

程序员文章站 2022-07-12 12:29:16
...

在上一篇 【RabbitMQ】Spring AMQP核心组件解析我们介绍了 Spring AMQP ,那它具体该如何使用呢?

1.xml配置方式

<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/" username="guest" 
                           password="guest" host="127.0.0.1" port="5672" />

<!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
<rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />

1.1 Direct Exchange

<!--定义queue -->
<rabbit:queue name="MY_FIRST_QUEUE" durable="true" auto-delete="false" exclusive="false" 
              declared-by="connectAdmin" />

<!--定义direct exchange,绑定MY_FIRST_QUEUE -->
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false" 
                        declared-by="connectAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="MY_FIRST_QUEUE" key="FirstKey">
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:direct-exchange>

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" 
                 exchange="MY_DIRECT_EXCHANGE" />

<!--消息接收者 -->
<bean id="messageReceiver" class="com.mymq.consumer.FirstConsumer"></bean>

<!--queue listener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener queues="MY_FIRST_QUEUE" ref="messageReceiver" />
</rabbit:listener-container>

<!--定义queue -->
<rabbit:queue name="MY_SECOND_QUEUE" durable="true" auto-delete="false" exclusive="false" 
              declared-by="connectAdmin" />

<!-- 将已经定义的Exchange绑定到MY_SECOND_QUEUE,注意关键词是key -->
<rabbit:direct-exchange name="MY_DIRECT_EXCHANGE" durable="true" auto-delete="false"
                        declared-by="connectAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="MY_SECOND_QUEUE" key="SecondKey"></rabbit:binding>
    </rabbit:bindings>
</rabbit:direct-exchange>

<!-- 消息接收者 -->
<bean id="receiverSecond" class="com.mymq.consumer.SecondConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener queues="MY_SECOND_QUEUE" ref="receiverSecond" />
</rabbit:listener-container>

1.2 Topic Exchange

<!--定义queue -->
<rabbit:queue name="MY_THIRD_QUEUE" durable="true" auto-delete="false" exclusive="false" 
              declared-by="connectAdmin" />

<!-- 定义topic exchange,绑定MY_THIRD_QUEUE,注意关键词是pattern -->
<rabbit:topic-exchange name="MY_TOPIC_EXCHANGE" durable="true" auto-delete="false" 
                       declared-by="connectAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="MY_THIRD_QUEUE" pattern="#.Third.#"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="amqpTemplate2" connection-factory="connectionFactory" 
                 exchange="MY_TOPIC_EXCHANGE" />

<!-- 消息接收者 -->
<bean id="receiverThird" class="com.mymq.consumer.ThirdConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener queues="MY_THIRD_QUEUE" ref="receiverThird" />
</rabbit:listener-container>

1.3 Fanout Exchange

<!--定义queue -->
<rabbit:queue name="MY_FOURTH_QUEUE" durable="true" auto-delete="false" exclusive="false" 
              declared-by="connectAdmin" />

<!-- 定义fanout exchange,绑定MY_FIRST_QUEUE 和 MY_FOURTH_QUEUE -->
<rabbit:fanout-exchange name="MY_FANOUT_EXCHANGE" auto-delete="false" durable="true"
                        declared-by="connectAdmin" >
    <rabbit:bindings>
        <rabbit:binding queue="MY_FIRST_QUEUE"></rabbit:binding>
        <rabbit:binding queue="MY_FOURTH_QUEUE"></rabbit:binding>
    </rabbit:bindings>
</rabbit:fanout-exchange>

<!-- 消息接收者 -->
<bean id="receiverFourth" class="com.mymq.consumer.FourthConsumer"></bean>

<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener queues="MY_FOURTH_QUEUE" ref="receiverFourth" />
</rabbit:listener-container>

2.SpringBoot整合

使用了SpringBoot后,我们不再需要些上面那么多的配置,因为在RabbitAutoConfiguration中已经对Bean做了自动配置。

 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.1 定义队列、交换机

@Configuration
public class RabbitConfig {
    // 声明Queue、Exchange、Binding.......
}

1.定义交换机

@Bean("directExchange") // 定义的name会在绑定时用到
public DirectExchange getDirectExchange() {
    return new DirectExchange("DIRECT_EXCHANGE")
}

@Bean("topicExchange")
public TopicExchange getTopicExchange(){
    return new TopicExchange("TOPIC_EXCHANGE");
}

@Bean("fanoutExchange")
public FanoutExchange getFanoutExchange(){
    return  new FanoutExchange("FANOUT_EXCHANGE");
}

2.定义队列

@Bean("firstQueue")
public Queue getFirstQueue(){
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-message-ttl",6000); // 设置超时
    Queue queue = new Queue("FIRST_QUEUE", false, false, true, args);
    return queue;
}

@Bean("secondQueue")
public Queue getSecondQueue(){
    return new Queue("SECOND_QUEUE");
}

@Bean("thirdQueue")
public Queue getThirdQueue(){
    return new Queue("THIRD_QUEUE");
}

3.定义绑定

@Bean
public Binding bindFirst(@Qualifier("firstQueue") Queue queue, // 队列
                          @Qualifier("directExchange") TopicExchange exchange){ // 交换机
    return BindingBuilder.bind(queue).to(exchange).with("my"); // 路由键
}

@Bean
public Binding bindSecond(@Qualifier("secondQueue") Queue queue, 
                          @Qualifier("topicExchange") TopicExchange exchange){ 
    return BindingBuilder.bind(queue).to(exchange).with("#.my.#"); // 路由键有通配符
}

@Bean
public Binding bindThird(@Qualifier("thirdQueue") Queue queue,
                         @Qualifier("fanoutExchange") FanoutExchange exchange){
    return BindingBuilder.bind(queue).to(exchange); // fanout没有路由键
}

2.2 Producer

void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
@Component
public class MyProvider {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send(){
        // 发送4条消息        
        amqpTemplate.convertAndSend("DIRCET_EXCHANGE","my","a direct msg"); 

        amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "msg", "a topic msg1");
        amqpTemplate.convertAndSend("TOPIC_EXCHANGE", "a.my.b", "a topic msg2");

        amqpTemplate.convertAndSend("FANOUT_EXCHANGE","","a fanout msg");
    }
}

2.3 Consumer

@Component
@RabbitListener(queues = "FIRST_QUEUE") // 指定监听的队列,
public class FirstConsumer {

    @RabbitHandler // 有消息时,就会自动调用当前方法
    public void process(String msg){
        System.out.println(" first queue received msg : " + msg);
    }
}
@Component
@RabbitListener(queues = {"SECOND_QUEUE","THIRD_QUEUE"}) // 配置一个消费者监听多个队列
public class SecondConsumer {

    @RabbitHandler
    public void process(String msg){
        System.out.println(" second queue received msg : " + msg);
    }
}

2.4 配置文件

全部配置总体上分成三大类:连接类、消息发送类、消息消费类。

1.连接配置

# rabbitmq连接基本配置
spring.rabbitmq.addresses=43.105.136.120:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 开启confirm机制
spring.rabbitmq.publisher-confirms=true
# 开启return模式
spring.rabbitmq.publisher-returns=true
# 配合return机制使用,表示接收路由不可达的消息
spring.rabbitmq.template.mandatory=true
属性值 说明 默认值
address 客户端连接的地址,有多个的时候用逗号分隔
改地址可以IP与port的结合
host RabbitMQ的主机地址 localhost
port RabbitMQ的端口号
virtual-host 连接到RabbitMQ的虚拟主机
username 登录到RabbitMQ的用户名
password 登录到RabbitMQ的密码
ssl.enabled 启用SSL支持 false
ssl.key-store 保存SSL证书的地址
ssl.key-store-password 访问SSL证书的地址使用的密码
ssl.trust-store SSL的可信地址
ssl.trust-store-password 访问SSL的可信地址的密码
ssl.algorithm SSL算法,默认使用Rabbit的客户端算法库
cache.channel.checkout-timeout 当缓存已满时,获取Channel的等待时间,单位为毫秒
cache.channel.size 缓存中保持的Channe丨数量
cache.connection.mode 连接缓存的模式 CHANNEL
cache.connection.size 缓存的连接数
connnection-timeout 连接超时参数单位为毫秒:设置为“0”代表无穷大
dynamic 默认创建—个AmqpAdmin的Beantrue

注:Producer与Consumer都需要先配置RabbitMQ连接信息。

2.消息发送配置(Producer)

# 开启confirm机制
spring.rabbitmq.publisher-confirms=true
# 开启return模式
spring.rabbitmq.publisher-returns=true
# 配合return机制使用,表示接收路由不可达的消息
spring.rabbitmq.template.mandatory=true
属性值 说明 默认值
publisher-confirms 开启 Publisher Confirm 机制
publisher-returns 开启 Publisher Return 机制
template.mandatory 启用强制信息 false
template.receive-timeout receive()方法的超时时间 0
template.reply-timeout sendAndReceive()方法的超时时间 5000
template.retry.enabled 设置为true的时候RabbitTemplate能够实现重试 false
template.retry.initial-interval 第一次与第二次发布消息的时间间隔 1000
template.retry.max-attempts 尝试发布消息的最大数量 3
template.retry.max-interval 尝试发布消息的最大时间间隔 10000
template.retry.multiplier 上一次尝试时间间隔的乘数 1.0

3.消息消费配置(Consumer)

# 设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
# 设置当前消费者数量(线程数)
spring.rabbitmq.listener.simple.concurrency=5
# 设置消费者最大并发数量
spring.rabbitmq.listener.simple.max-concurrency=10
属性值 说明 默认值
listener.simple.acknowledge-mode 容器的acknowledge模式
listener.simple.auto-startup 肩动时自动启动容器 true
listener.simple.concurrency 消费者的最小数量
listener.simple.default-requeue-rejected 投递失败时是否重新排队 true
listener.simple.max-concurrency 消费者的最大数量
listener.simple.missing-queues-fatal 容器上声明的队列不可用时是否失敗
listener.simple.prefetch 在单个请求中处理的消息个数,他应该大于等于事务数量
listener.simple.retry.enabled 不论是不是重试的发布 false
listener.simple.retry.initial-interval 第一次与第二次投递尝试的时间间隔 1000ms
listener.simple.retry.max-attempts 尝试投递消息的最大数量 3
listener.simple.retry.max-interval 两次尝试的最大时间间隔 10000ms
listener.simple.retry.multiplier 上一次尝试时间间隔的乘数 1.0
listener.simple.retry.stateless 重试是有状态的还是无状态的 true
listener.simple.transaction-size 在一个事务中处理的消息数量 = 为了获得最佳效果,
该值应设罝为小于等于每个请求中处理的消息个数,即 listener.prefetch 的值