欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring AMQP

程序员文章站 2022-07-15 08:14:11
...

简介

Sprin有很多不同的项目,其中就有对AMQP的支持:
Spring AMQP
Spring AMQP的页面 : https://spring.io/projects/spring-amqp
Spring AMQP
Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

创建SpringBoot项目

  1. 添加Rabbitmq依赖和配置
    Spring AMQP
  2. 创建后的依赖:pom.xml 文件中引入 AMQP场景启动器
<!-- AMQP场景启动器 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. application.yml中添加RabbitMQ地址:
    Spring AMQP
spring:
  rabbitmq:
    host: 192.168.230.205
    username: admin
    password: admin
    virtual-host: /host_admin

监听者

在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。

package com.springboot.amqp.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**
 * 在 SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,
 * 只要通过简单的注解,就可以成为一个消费者。
 * Created by YongXin Xue on 2020/06/12 0:08
 */
@Component
public class Listener {
    /**
     * @RabbitListener:标注在方法上,可以让方法变为消费者的方法
     * bindings:指定绑定参数
     * @QueueBinding:将消费者的消息队列绑定到指定交换机上
     * @Queue:声明一个队列
     * @Exchange:声明一个交换机
     * 消费者接收消息的方法
     * @param msg   消息
     */
    @RabbitListener(
    bindings = @QueueBinding(
            value = @Queue(value = "spring.test.queue", declare = "true"),
            exchange = @Exchange(
                    value = "springboot.test.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"hello.*", "*.*.test"}
    )
    )
    public void listener(String msg, Channel channel, Message message) {
        System.out.println("[  消费者 ] receive:  ' " + msg + " ' ");
        try {
            //手动回执
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

  • @Componet:类上的注解,注册到Spring容器
  • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
    • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
      • value:这个消费者关联的队列。值是@Queue,代表一个队列
      • exchange:队列所绑定的交换机,值是@Exchange类型
      • key:队列和交换机绑定的RoutingKey

类似listen这样的方法在一个类中可以写多个,就代表多个消费者。

测试

@SpringBootTest
class SpringbootAmqpApplicationTests {

    //spring 将 rabbitmq的生产者封装成了一个模板对象,通过该对象可以发送消息到交换机中
    @Autowired
    AmqpTemplate amqpTemplate;

    @Test
    void contextLoads() {
        String msg = "天王盖地虎";
        //测试发送消息到指定交换机中
        //参数1:交换机名称, 参数2:routing key  参数3:信息
        amqpTemplate.convertAndSend("springboot.test.exchange","aaa.hello.test" ,msg);
    }

}