rabbitmq延迟队列demo
程序员文章站
2022-07-02 17:37:28
工程结构: 定义jar包依赖的版本,版本很重要,rabbit依赖spring,必须一致,否则报错: dependencies: spring-applicationContext: mq-applicationContext-producer.xml: mq-applicationContext-c ......
工程结构:
定义jar包依赖的版本,版本很重要,rabbit依赖spring,必须一致,否则报错:
<properties> <springframework.version>4.2.7.release</springframework.version> <spring-rabbit.version>1.6.1.release</spring-rabbit.version> <junit.version>4.12</junit.version> </properties>
dependencies:
<dependencies> <!-- logging begin --> <dependency> <groupid>org.slf4j</groupid> <artifactid>slf4j-api</artifactid> <version>1.7.5</version> </dependency> <dependency> <groupid>ch.qos.logback</groupid> <artifactid>logback-core</artifactid> <version>1.0.13</version> </dependency> <dependency> <groupid>ch.qos.logback</groupid> <artifactid>logback-classic</artifactid> <version>1.0.13</version> </dependency> <!-- 代码直接调用common-logging会被桥接到slf4j --> <dependency> <groupid>org.slf4j</groupid> <artifactid>jcl-over-slf4j</artifactid> <version>1.7.5</version> </dependency> <!-- logging end --> <!--springframework--> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-core</artifactid> <version>${springframework.version}</version> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-context</artifactid> <version>${springframework.version}</version> </dependency> <!-- rabbitmq spring依赖 --> <dependency> <groupid>org.springframework.amqp</groupid> <artifactid>spring-rabbit</artifactid> <version>${spring-rabbit.version}</version> </dependency> <!--common utils--> <dependency> <groupid>com.alibaba</groupid> <artifactid>fastjson</artifactid> <version>1.2.6</version> </dependency> <dependency> <groupid>org.apache.commons</groupid> <artifactid>commons-lang3</artifactid> <version>3.3.2</version> </dependency> <!--test begin--> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>${junit.version}</version> <!--<scope>test</scope>--> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-test</artifactid> <version>${springframework.version}</version> <!--<scope>test</scope>--> </dependency> <!--test end--> </dependencies>
spring-applicationcontext:
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <bean class="org.springframework.beans.factory.config.propertyplaceholderconfigurer"> <property name="fileencoding" value="utf-8"></property> <property name="locations"> <list> <value>classpath:applicationcontext.properties</value> </list> </property> </bean> <context:annotation-config/> <bean class="org.springframework.beans.factory.annotation.autowiredannotationbeanpostprocessor"/> <!-- 配置扫描路径 --> <context:component-scan base-package="demo"></context:component-scan> <!--rabbit server参数 --> <rabbit:connection-factory id="connectionfactory" username="${paycenter.mq.user.username}" password="${paycenter.mq.user.password}" addresses="${paycenter.mq.user.host}"></rabbit:connection-factory> <import resource="classpath:mq-applicationcontext-producer.xml"/> <import resource="classpath:mq-applicationcontext-consumer.xml"/> </beans>
mq-applicationcontext-producer.xml:
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd"> <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="connectionfactory"/> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列 --> <bean id="mqmessageconverter" class="org.springframework.amqp.support.converter.simplemessageconverter"> </bean> <!--<bean id="publisherconfirmsreturns" class="com.emaxcard.mq.rabbit.publisherconfirmsreturns"></bean>--> <!--========================延迟队列配置 begin =========================--> <rabbit:queue id="agentpayqueryqueue2" durable="true" auto-delete="true" exclusive="false" name="agentpayqueryqueue2"/> <rabbit:direct-exchange id="agentpayqueryexchange2" durable="true" auto-delete="true" name="agentpayqueryexchange2"> <rabbit:bindings> <rabbit:binding queue="agentpayqueryqueue2" key="delay"/> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:queue id="agentpayqueryqueue1" durable="true" auto-delete="true" exclusive="false" name="agentpayqueryqueue1"> <rabbit:queue-arguments> <entry key="x-dead-letter-exchange" value="agentpayqueryexchange2"/> <entry key="x-message-ttl" value="10000" value-type="java.lang.long"/> </rabbit:queue-arguments> </rabbit:queue> <rabbit:direct-exchange id="agentpayqueryexchange1" durable="true" auto-delete="true" name="agentpayqueryexchange1"> <rabbit:bindings> <rabbit:binding queue="agentpayqueryqueue1" key="delay"/> </rabbit:bindings> </rabbit:direct-exchange> <!--定义rabbittemplate实例--> <!--confirm-callback="publisherconfirmsreturns" return-callback="publisherconfirmsreturns"--> <rabbit:template id="agentpayquerymsgtemplate" exchange="agentpayqueryexchange1" routing-key="delay" connection-factory="connectionfactory" message-converter="mqmessageconverter" mandatory="true" /> <!--========================延迟队列配置 end =========================--> </beans>
mq-applicationcontext-consumer.xml:
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd"> <bean id="agentpayqueryconsumer" class="demo.testmqconsumer" /> <!-- todo 后续删除 receive-timeout:等待接收超时时长 影响连接创建和销毁 concurrency:消费者个数 max-concurrency:最大消费者个数 min-start-interval:陆续启动 减少并发环境(或是三方系统突然的网络延迟) 大量连接导致的性能耗损 min-stop-interval:陆续销毁 减少突然的安静 导致大量可用连接被销毁 min-consecutive-active: 连续n次没有接收发生超时 则认定为需要创建 消费者 min-consecutive-idle: 连续n次发生了接收超时 则认定消费者需要销毁 prefetch:每个消费者预读条数 因为异步调用三方 性能瓶颈在网络与三方系统所以预读取条数设置为1(默认为5) 只有一条消息被ack才会接收下一条消息 transaction-size:会影响prefetch的数量 --> <!-- 监听器 --> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="connectionfactory" acknowledge="auto" max-concurrency="20" concurrency="5" prefetch="10"> <rabbit:listener ref="agentpayqueryconsumer" queue-names="agentpayqueryqueue2" /> </rabbit:listener-container> </beans>
producer类:
package demo; import org.junit.test; import org.junit.runner.runwith; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.test.context.contextconfiguration; import org.springframework.test.context.junit4.springjunit4classrunner; @runwith(springjunit4classrunner.class) @contextconfiguration(locations = "classpath:applicationcontext.xml") public class testmqproducer { private static logger logger = loggerfactory.getlogger(testmqproducer.class.getsimplename()); @autowired private rabbittemplate agentpayquerymsgtemplate; @test public void test() throws exception { for (int i = 0; i <= 100; i++) { object data = string.valueof(i); agentpayquerymsgtemplate.convertandsend(data); logger.info("入队:{}", data); } thread.sleep(12000); } }
consumer类:
package demo; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.amqp.core.message; import org.springframework.amqp.core.messagelistener; public class testmqconsumer implements messagelistener { private static logger logger = loggerfactory.getlogger(testmqconsumer.class.getsimplename()); public void onmessage(message message) { string data = new string(message.getbody()); try { //模拟处理慢 thread.sleep(1); logger.info("出队:{}", data); } catch (interruptedexception e) { e.printstacktrace(); } } }
至此代码就完毕了。
说明:上面定义队列时我把auto-delete属性设置为true, 所以,当消费者消费完并关闭连接后,队列会自动删除。exchange也如是。(通过mq控制台看,栗子中的agentpayqueryqueue2和agentpayqueryexchange2在执行完就自动消失了,agentpayqueryqueue1和agentpayqueryexchange1还存在。)
spring-rabbit-x.xml里对queue和exchange的auto-delete属性的解释:
flag indicating that an queue will be deleted when it is no longer in use, i.e. the connection that declared it is closed. default is false.(rabbit:queue)
flag indicating that an exchange will be deleted when no longer in use, i.e. the connection that declared it is closed. default is false.(rabbit:exchange)
消费端的concurrency说明:
同样,看spring-rabbit-x.xml的解释:
the number of concurrent consumers to start for each listener initially.
see also 'max-concurrency'.
上面我设置的值是5,从mq控制台里看queue的consumer见下图:
从出队日志,可以看出来,共有5个线程在消费这些消息。
上一篇: 常用的伪类和伪元素
推荐阅读
-
python实现RabbitMQ的消息队列的示例代码
-
深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议
-
RabbitMQ消息队列之Windows下安装和部署(一)
-
PHP基于Redis实现轻量级延迟队列
-
rabbitmq~消息失败后重试达到 TTL放到死信队列(事务型消息补偿机制)
-
laravel5.6 框架邮件队列database驱动简单demo示例
-
浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)
-
利用Python学习RabbitMQ消息队列
-
C#调用RabbitMQ实现消息队列的示例代码
-
C#实现rabbitmq 延迟队列功能实例代码