欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spring AMQP整合RabbitMQ

程序员文章站 2022-07-12 12:29:58
...

RabbitAdmin

  • RabbitAdmin类可以很好的操作RabbitMQ, 在Spring中直接进行注入即可

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory ConnectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
    
  • 注意 : autoStartup必须要设置为true, 否则Spring容器就不会加载RabbitAdmin类

  • RabbitAdmin底层实现就是从Spring容器中获取Exchange, Binding, RoutingKey已及Queue的@Bean声明

  • 然后RabbitAdmin底层使用RabbitTemplate的excute方法执行对应的声明, 修改, 删除等一系列RabbitMQ基础功能操作, 如添加一个交换机, 删除一个绑定, 清空一个队列等

添加Maven依赖 :

    <properties>
        <spring.version>4.3.20.RELEASE</spring.version>
        <junit.version>4.12</junit.version>
        <spring-rabbit.version>1.7.5.RELEASE</spring-rabbit.version>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>${spring-rabbit.version}</version>
        </dependency>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
    </dependencies>

XML方式配置

方式一 : 使用标签模式配置

<!--配置ConnectionFactory, 指定连接rabbitMQ的参数-->
<rabbit:connection-factory id="connectionFactory" host="192.168.72.138" port="5672"               virtual-host="/" username="guest" password="guest"/>
<!--指定RabbitAdmin, auto-startup默认为true-->
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>

方式二: 使用Spring Bean的方式配置

    <!--配置ConnectionFactory, 指定连接rabbitMQ的参数-->
    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="host" value="192.168.72.138"/>
        <property name="port" value="5672"/>
        <property name="virtualHost" value="/"/>
        <property name="username" value="guest"/>
        <property name="password" value="guest"/>
    </bean>

    <!--配置rabbitAdmin, auto-startup默认为true-->
    <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
        <constructor-arg ref="connectionFactory"/>
    </bean>

测试代码:

使用Spring集成Junit进行测试

// 用于Spring集成Junit, 添加到测试类上面
// @RunWith(SpringJUnit4ClassRunner.class)
// @ContextConfiguration("classpath:application.xml")  

	@Test
    public void testRabbitAdmin() {
        // 创建Direct模式Exchange
        rabbitAdmin.declareExchange(new DirectExchange("test_spring_declare_exchange", false, false));
        // 创建Topic模式Exchange
        rabbitAdmin.declareExchange(new TopicExchange("test_spring_topic_exchange", false, false));
        // 创建Fanout
        rabbitAdmin.declareExchange(new FanoutExchange("test_spring_fanout_exchange", false, false));
        // 创建队列
        rabbitAdmin.declareQueue(new Queue("test_spring_direct_queue", false));
        rabbitAdmin.declareQueue(new Queue("test_spring_topic_queue", false));
        rabbitAdmin.declareQueue(new Queue("test_spring_fanout_queue", false));

        // 可以直接创建绑定, new的Queue和Exchange必须在上面声明过
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("test_spring_topic_queue", false))
                        .to(new TopicExchange("test_spring_topic_exchange", false, false))
                        .with("sping.*"));
        rabbitAdmin.declareBinding(
                BindingBuilder.bind(new Queue("test_spring_fanout_queue", false))
                        .to(new FanoutExchange("test_spring_fanout_exchange", false, false)));

    }

Java类配置方式

配置类:

package com.qiyexue.annotation;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

/**
 * 基于Java类的方式配置
 *
 * @author 七夜雪
 * @date 2018-12-19 20:28
 */
@Configuration
@ComponentScan("com.qiyexue.*")
public class RabbitConfig {

    /**
     * 配置ConnectionFactory
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setAddresses("192.168.72.138:5672");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    /**
     * 声明RabbitAdmin
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 需要将AutoStartup设置为true
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

}

测试类:

测试类和基于xml配置的测试类是一样的, 只是在Spring启动的注解上修改一下, 改成加载类配置, 而不是加载配置文件配置,测试类声明如下:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={RabbitConfig.class})
public class RabbitAdminTest {
    @Autowired
    private RabbitAdmin rabbitAdmin;
}    

SpringAMQP声明

  • 使用SpringAMQP去声明, 可以基于Spring的xml配置去声明, 也可以使用基于Java类配置去声明
  • 只Spring容器启动之后, 下面的声明就会自动被声明到RabbitMQ中

XML格式声明

    <!--声明topic模式exchange-->
    <rabbit:topic-exchange name="topic002" durable="true"
                           auto-delete="false">
        <!--声明exchange和queue之间的绑定-->
        <rabbit:bindings>
            <rabbit:binding pattern="huangquan.*" queue="queue002"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <rabbit:queue name="queue002" durable="true" exclusive="false"
                  auto-delete="false">
    </rabbit:queue>

    <!--声明direct模式exchange-->
    <rabbit:direct-exchange name="direct003" durable="true" auto-delete="false">
        <!--声明exchange和queue之间的绑定-->
        <rabbit:bindings>
            <rabbit:binding queue="queue003" key="hongchen"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <rabbit:queue name="queue003" durable="true" exclusive="false"
                  auto-delete="false">
    </rabbit:queue>

Java类配置方式声明

    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     * HeadersExchange :通过添加属性key-value匹配
     * DirectExchange:按照routingkey分发到指定队列
     * TopicExchange:多关键字匹配
     */
    @Bean
    public TopicExchange exchange001(){
        TopicExchange exchange = new TopicExchange("topic001", false, false);
        return exchange;
    }

    /**
     * 声明队列
     * @return
     */
    @Bean
    public Queue queue001(){
       return new Queue("queue001", false);
    }

    /**
     * 声明绑定
     * @return
     */
    @Bean
    public Binding binding001(){
        return BindingBuilder.bind(queue001()).to(exchange001()).with("biluo.*");
    }

RabbitTemplate

  • 消息模板, 在与SpringAMQP整合的时候, 进行发送消息的关键类
  • 该类提供了丰富的消息发送方法, 包括可靠性消息投递方法, 回调监听消息接口ConfirmCallback, 返回值确认接口ReturnCallback等。
  • 将此类注入到Spring容器中, 就可以直接使用了
  • 在与Spring整合时, 需要进行实例化, 但是在与SpringBoot整合时, 直接在配置文件里添加配置即可

XML配置

    <!--方式一 : 使用rabbit:template标签配置-->
    <!--<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>-->
    <!--方式二 : 使用普通Spring Bean方式配置-->
    <bean id="rabbitTemlate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg ref="connectionFactory"></constructor-arg>
    </bean>

Java类配置方式

    /**
     * RabbitTemplate配置
     * RabbitTemplate主要用于消息发送
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

RabbitTemplate常用API

发送普通消息:

    // 设置Message属性
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("name", "七夜雪");
        messageProperties.getHeaders().put("age", 18);
        Message message = new Message("听雪楼中听雪落".getBytes(), messageProperties);
        // 使用Send方法必须传入message类型
        rabbitTemplate.send("topic001", "biluo.test", message);

        // 使用convertAndSend方法, 可以使用String类型, 或者Object类型消息, 会自动转换
        rabbitTemplate.convertAndSend("topic002", "huangquan.test", "上穷碧落下黄泉", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.out.println("----------添加额外设置------------");
                message.getMessageProperties().getHeaders().put("name", "黄泉");
                message.getMessageProperties().getHeaders().put("company", "听雪楼");
                return message;
            }
        });

SimpleMessageListenerContainer

  • 简单消息监听容器
  • 这个类功能很强大, 对于消费者的配置项, 这份都可以实现
  • 监听队列(支持多个队列), 自动启动, 自动声明等
  • 设置事务特性, 事务管理器, 事务属性, 事务容量(并发), 是否开启事务, 回滚消息等
  • 可以设置消费者数量, 最小最大数量, 批量消费等
  • 设置消息确认和自动确认模式, 是否重回队列, 异常捕获Handler函数
  • 设置消费者标签生成策略, 是否独占模式, 消费者属性等
  • 设置具体的消息监听器, 消息转换器等
  • SimpleMessageListenerContainer可以进行动态设置, 比如在运行中的应用可以动态的修改其消费者数量的大小, 接收消息的模式等, 很多基于RabbitMQ的定制的后端管控台进行动态设置的时候, 也是基于这一特性去实现的

XML配置

自定义MessageListener:

package com.qiyexue.xml;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

/**
 * 自定义MessageListener
 *
 * @author 七夜雪
 * @date 2018-12-23 8:31
 */
public class MyMessageListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        System.out.println(msg);
    }
}

    <bean id="myMessageListener" class="com.qiyexue.xml.MyMessageListener"/>

	<!--方法一 : 使用rabbit:listener-container标签设置listener-container-->
    <rabbit:listener-container connection-factory="connectionFactory"
                               auto-startup="true"
                               acknowledge="auto"
                               max-concurrency="5"
                               concurrency="1"
                               requeue-rejected="false">

        <!--使用queue-names="queue001"也可以, 但是queues和queue-names不能同时使用-->
        <rabbit:listener ref="myMessageListener" queues="queue002,queue003"/>
    </rabbit:listener-container>

    <!--方法二 : 使用普通Spring Bean方式设置MessageListenerContainer-->
    <bean id="simpleMessageListenerContainer"
          class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="autoStartup" value="true"/>
        <property name="acknowledgeMode" value="AUTO"/>
        <property name="concurrentConsumers" value="1"/>
        <property name="defaultRequeueRejected" value="false"/>
        <property name="consumerTagStrategy" ref="myConsumerTag"/>
        <property name="messageListener" ref="myMessageListener"/>
        <property name="queueNames" value="queue001,queue002,queue003"/>
    </bean>

Java类配置方式

配置类中添加如下配置:

    @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 添加要监听的队列
//        container.setQueues(queue001());
        // 添加要监听的队列, 传入队列名, 和setQueues使用其中一个即可
        container.setQueueNames("queue001","queue002", "queue003");
        // 设置当前消费者格式
        container.setConcurrentConsumers(1);
        // 设置最大消费者个数
        container.setMaxConcurrentConsumers(5);
        // 是否重回队列
        container.setDefaultRequeueRejected(false);
        // 设置签收模式
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setExposeListenerChannel(true);
        // 设置消费者标签
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

        // 设置监听MessageListener
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.out.println(msg);
            }
        });

        return container;
    }

测试直接使用RabbitTemplate发送消息即可

MessageListenerAdapter

  • 适配器模式监听消息

  • defaultListenerMethod默认监听方法名称, 用于设置监听方法名称, 默认handleMessage

  • Delegate委托对象, 委派设计模式, 实际真实的委托对象, 用于处理消息

  • queueOrTagToMethodName队列标识与方法名称组成的集合

    • 可以一一进行队列与方法名称的匹配
    • 队列与方法名称绑定, 即指定队列里的消息会被绑定的方法所接受处理

XML方式配置

    <!--使用MessageListenerAdapter-->
    <bean id="messageListenerAdapter"
          class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <constructor-arg>
            <ref bean="messageDelegate"/>
        </constructor-arg>
    </bean>
	<!--委托对象-->
    <bean id="messageDelegate" class="com.qiyexue.adapter.MessageDelegate"/>

Java类方式配置

        /**
         * 适配器方式 : 默认的方法名字的:handleMessage
         * 可以自己指定方法名
         * 也可以添加一个转换器, 将字节数组转换为String, 默认简单消息也是会转换成String的
         */
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        adapter.setMessageConverter(new TextMessageConverter());
        container.setMessageListener(adapter);

        /**
         * 适配器模式, 队列与方法名绑定
         */
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        Map<String, String> queueToMethodMap = new HashMap<>();
        queueToMethodMap.put("queue001", "queue1Method");
        queueToMethodMap.put("queue002", "queue2Method");
        adapter.setQueueOrTagToMethodName(queueToMethodMap);
        container.setMessageListener(adapter);

测试方法 :

    @Test
    public void testSendMessage4Text() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("听雪楼中听雪落...".getBytes(), messageProperties);
        rabbitTemplate.send("topic001", "biluo.test", message);
        rabbitTemplate.send("topic002", "huangquan.test", message);
    }

MessageConverter

  • 在发送消息的时候, 正常情况下消息体是以二进制的数据方式进行传输, 如果希望内部进行转换, 或者指定自定义的转换器, 就需要用到MessageConverter

  • 自定义常用转换器 : MessageConverter, 一般来讲,都需要实现这个接口

    实现下面两个方法:

    • toMessage : Java对象转换为Message
    • fromMessage : Message对象转换成java对象
  • 转换器类型

    • Json转换器 : Jackson2JsonMessageConverter, 可以进行java对象的转换功能
    • DefaultJackson2JavaTypeMapper映射器 : 可以进行java对象的映射
    • 自定义二进制转换器 : 如图片,PDF, PPT等, 可以将多个转换器放到一个全局转换器ContentTypeDelegatingMessageConverter中