欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

rabbitmq延迟队列demo

程序员文章站 2022-07-02 17:37:28
工程结构: 定义jar包依赖的版本,版本很重要,rabbit依赖spring,必须一致,否则报错: dependencies: spring-applicationContext: mq-applicationContext-producer.xml: mq-applicationContext-c ......

 

工程结构:

rabbitmq延迟队列demo

 

定义jar包依赖的版本,版本很重要,rabbit依赖spring,必须一致,否则报错:

<properties>
    <springframework.version>4.2.7.release</springframework.version>
    <spring-rabbit.version>1.6.1.release</spring-rabbit.version>
    <junit.version>4.12</junit.version>
</properties>

dependencies:

<dependencies>

    <!-- logging begin -->
    <dependency>
        <groupid>org.slf4j</groupid>
        <artifactid>slf4j-api</artifactid>
        <version>1.7.5</version>
    </dependency>
    <dependency>
        <groupid>ch.qos.logback</groupid>
        <artifactid>logback-core</artifactid>
        <version>1.0.13</version>
    </dependency>
    <dependency>
        <groupid>ch.qos.logback</groupid>
        <artifactid>logback-classic</artifactid>
        <version>1.0.13</version>
    </dependency>
    <!-- 代码直接调用common-logging会被桥接到slf4j -->
    <dependency>
        <groupid>org.slf4j</groupid>
        <artifactid>jcl-over-slf4j</artifactid>
        <version>1.7.5</version>
    </dependency>
    <!-- logging end -->

    <!--springframework-->
    <dependency>
        <groupid>org.springframework</groupid>
        <artifactid>spring-core</artifactid>
        <version>${springframework.version}</version>
    </dependency>
    <dependency>
        <groupid>org.springframework</groupid>
        <artifactid>spring-context</artifactid>
        <version>${springframework.version}</version>
    </dependency>

    <!-- rabbitmq spring依赖 -->
    <dependency>
        <groupid>org.springframework.amqp</groupid>
        <artifactid>spring-rabbit</artifactid>
        <version>${spring-rabbit.version}</version>
    </dependency>

    <!--common utils-->
    <dependency>
        <groupid>com.alibaba</groupid>
        <artifactid>fastjson</artifactid>
        <version>1.2.6</version>
    </dependency>
    <dependency>
        <groupid>org.apache.commons</groupid>
        <artifactid>commons-lang3</artifactid>
        <version>3.3.2</version>
    </dependency>

    <!--test begin-->
    <dependency>
        <groupid>junit</groupid>
        <artifactid>junit</artifactid>
        <version>${junit.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    <dependency>
        <groupid>org.springframework</groupid>
        <artifactid>spring-test</artifactid>
        <version>${springframework.version}</version>
        <!--<scope>test</scope>-->
    </dependency>
    <!--test end-->
</dependencies>

 

spring-applicationcontext:

<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"

       xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd 
         http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <bean class="org.springframework.beans.factory.config.propertyplaceholderconfigurer">
        <property name="fileencoding" value="utf-8"></property>
        <property name="locations">
            <list>
                <value>classpath:applicationcontext.properties</value>
            </list>
        </property>
    </bean>

    <context:annotation-config/>

    <bean class="org.springframework.beans.factory.annotation.autowiredannotationbeanpostprocessor"/>
    <!-- 配置扫描路径 -->
    <context:component-scan base-package="demo"></context:component-scan>

    <!--rabbit server参数 -->
    <rabbit:connection-factory id="connectionfactory"
                               username="${paycenter.mq.user.username}"
                               password="${paycenter.mq.user.password}"
                               addresses="${paycenter.mq.user.host}"></rabbit:connection-factory>

    <import resource="classpath:mq-applicationcontext-producer.xml"/>
    <import resource="classpath:mq-applicationcontext-consumer.xml"/>
</beans>

 

mq-applicationcontext-producer.xml:

 

<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemalocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="connectionfactory"/>

    <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 -->
    <bean id="mqmessageconverter"
          class="org.springframework.amqp.support.converter.simplemessageconverter">
    </bean>

    <!--<bean id="publisherconfirmsreturns" class="com.emaxcard.mq.rabbit.publisherconfirmsreturns"></bean>-->


    <!--========================延迟队列配置 begin =========================-->
    <rabbit:queue id="agentpayqueryqueue2" durable="true" auto-delete="true" exclusive="false"
                  name="agentpayqueryqueue2"/>
    <rabbit:direct-exchange id="agentpayqueryexchange2" durable="true" auto-delete="true" name="agentpayqueryexchange2">
        <rabbit:bindings>
            <rabbit:binding queue="agentpayqueryqueue2" key="delay"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <rabbit:queue id="agentpayqueryqueue1" durable="true" auto-delete="true" exclusive="false"
                  name="agentpayqueryqueue1">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="agentpayqueryexchange2"/>
            <entry key="x-message-ttl" value="10000" value-type="java.lang.long"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:direct-exchange id="agentpayqueryexchange1" durable="true" auto-delete="true" name="agentpayqueryexchange1">
        <rabbit:bindings>
            <rabbit:binding queue="agentpayqueryqueue1" key="delay"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!--定义rabbittemplate实例-->
    <!--confirm-callback="publisherconfirmsreturns" return-callback="publisherconfirmsreturns"-->
    <rabbit:template id="agentpayquerymsgtemplate"
                     exchange="agentpayqueryexchange1" routing-key="delay"
                     connection-factory="connectionfactory" message-converter="mqmessageconverter"
                     mandatory="true"
    />
    <!--========================延迟队列配置 end =========================-->

</beans>

 

mq-applicationcontext-consumer.xml:

<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemalocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">


    <bean id="agentpayqueryconsumer" class="demo.testmqconsumer" />

    <!-- todo 后续删除
    receive-timeout:等待接收超时时长 影响连接创建和销毁

    concurrency:消费者个数
    max-concurrency:最大消费者个数
    min-start-interval:陆续启动  减少并发环境(或是三方系统突然的网络延迟) 大量连接导致的性能耗损
    min-stop-interval:陆续销毁   减少突然的安静 导致大量可用连接被销毁
    min-consecutive-active: 连续n次没有接收发生超时  则认定为需要创建 消费者
    min-consecutive-idle: 连续n次发生了接收超时   则认定消费者需要销毁

    prefetch:每个消费者预读条数 因为异步调用三方 性能瓶颈在网络与三方系统所以预读取条数设置为1(默认为5) 只有一条消息被ack才会接收下一条消息
    transaction-size:会影响prefetch的数量
    -->
    <!--  监听器 -->
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionfactory" acknowledge="auto"
                               max-concurrency="20"
                               concurrency="5"
                               prefetch="10">
        <rabbit:listener ref="agentpayqueryconsumer" queue-names="agentpayqueryqueue2" />
    </rabbit:listener-container>
</beans>

 

 

producer类:
package demo;

import org.junit.test;
import org.junit.runner.runwith;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.test.context.contextconfiguration;
import org.springframework.test.context.junit4.springjunit4classrunner;

@runwith(springjunit4classrunner.class)
@contextconfiguration(locations = "classpath:applicationcontext.xml")
public class testmqproducer {

    private static logger logger = loggerfactory.getlogger(testmqproducer.class.getsimplename());

    @autowired
    private rabbittemplate agentpayquerymsgtemplate;

    @test
    public void test() throws exception {
        for (int i = 0; i <= 100; i++) {
            object data = string.valueof(i);
            agentpayquerymsgtemplate.convertandsend(data);
            logger.info("入队:{}", data);
        }
        thread.sleep(12000);
    }
}

 

 

consumer类:
package demo;

import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.messagelistener;

public class testmqconsumer implements messagelistener {

    private static logger logger = loggerfactory.getlogger(testmqconsumer.class.getsimplename());

    public void onmessage(message message) {
        string data = new string(message.getbody());

        try {
            //模拟处理慢
            thread.sleep(1);

            logger.info("出队:{}", data);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }

    }

}

 

 

 至此代码就完毕了。

 

说明:上面定义队列时我把auto-delete属性设置为true, 所以,当消费者消费完并关闭连接后,队列会自动删除。exchange也如是。(通过mq控制台看,栗子中的agentpayqueryqueue2和agentpayqueryexchange2在执行完就自动消失了,agentpayqueryqueue1和agentpayqueryexchange1还存在。)

spring-rabbit-x.xml里对queue和exchange的auto-delete属性的解释:

flag indicating that an queue will be deleted when it is no longer in use, i.e. the connection that declared it is closed. default is false.(rabbit:queue)

flag indicating that an exchange will be deleted when no longer in use, i.e. the connection that declared it is closed. default is false.(rabbit:exchange)

 

消费端的concurrency说明:

同样,看spring-rabbit-x.xml的解释:

the number of concurrent consumers to start for each listener initially.
see also 'max-concurrency'.

 

上面我设置的值是5,从mq控制台里看queue的consumer见下图:

rabbitmq延迟队列demo

从出队日志,可以看出来,共有5个线程在消费这些消息。

rabbitmq延迟队列demo

 

 

 rabbitmq延迟队列demo