springmvc + amqp + batch + rabbitmq
最近在学习spring amqp + spring batch + rabbitmq 的一些东西,简单的就是实现向mq中写入日志,使用spring batch批量处理,当然了,在实际生产中这个例子不一定合理,基于学习需要,暂且不秋考虑!!!
因为日志是一直不断记录的,当然不一次就写完,这里使用的是quartz,定时去mq中取数据的方式,spring amqp可以使用listener的方式(SimpleMessageListenerContainer)来监听,尝试了下,没实现,大家可以一起探讨下!
闲话少说,进入正题!
环境:springmvc集成amqp+batch+rabbitmq
主要配置文件有:
- applicationContext.xml
- applicationContext-rabbitmq.xml
- applicationContext-batch-beans.xml
- applicationContext-batch.xml
- applicationContext-quartz.xml
applicationContext.xml就不在介绍,主要是加载properties文件,datasource,当然还有一些公共的配置;
applicationContext-rabbitmq.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/rabbit" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd"> <connection-factory id="rabbitConnectionFactory" host="${rabbit.mq.host}" username="${rabbit.mq.user}" password="${rabbit.mq.pwd}" port="${rabbit.mq.port}" connection-timeout="${rabbit.mq.connectionTimeout}" virtual-host="${rabbit.mq.virtualHost}" publisher-confirms="true" publisher-returns="true" /> <admin connection-factory="rabbitConnectionFactory" /> <queue id="logDirectQueue" name="log-direct-queue" durable="true" auto-delete="false" exclusive="false" /> <queue id="logReplyDirectQueue" name="log-reply-direct-queue" durable="true" auto-delete="false" exclusive="false" /> <direct-exchange id="logDirectExchange" name="log-direct-exchange" durable="true" auto-delete="false"> <bindings> <binding queue="log-direct-queue" key="log-direct-routing-key" /> </bindings> </direct-exchange> <template id="logDirectTemplate" queue="log-direct-queue" exchange="log-direct-exchange" routing-key="log-direct-routing-key" connection-factory="rabbitConnectionFactory" confirm-callback="sendConfirmCallBack" return-callback="sendReturnCallBack" recovery-callback="sendRecoveryCallback" reply-queue="log-reply-direct-queue"> <reply-listener acknowledge="auto" auto-startup="true" min-start-interval="3000" /> </template> </beans:beans>
配置队列+exchange+binding+template时,需要注意的点,上面已经用红色注明,要保持一致,即将队列(log-direct-queue)与exchange(log-direct-exchange)通过路由规则(log-direct-routing-key)绑定;
applicationContext-batch-beans.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd"> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"> </bean> <bean id="amqpItemReader" class="org.springframework.batch.item.amqp.AmqpItemReader"> <constructor-arg name="amqpTemplate" ref="logDirectTemplate" /> <property name="itemType" value="com.beauty.entity.BeautyHandlerLogs" /> </bean> </beans>
上面的不解释,主要说下标记的代码:使用batch自带的AmqpItemReader读取mq中的消息;
applicationContext-batch.xml内容如下:
<beans:beans xmlns="http://www.springframework.org/schema/batch" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> <beans:import resource="applicationContext-batch-beans.xml" /> <job id="logProcessJob"> <step id="logStep" allow-start-if-complete="true"> <tasklet> <chunk reader="amqpItemReader" writer="amqpItemWriterImpl" commit-interval="10" /> </tasklet> </step> </job> </beans:beans>
具体配置不介绍了,主要说明下标记代码:只有添加上 allow-start-if-complete="true",quartz中的定时任务才能不断从mq中获取消息,即 logProcessJob 能不断重复执行,否则只取一次就不再执行了!这个需要特别注意,我也是找了好久,又尝试了好久才搞定的!
注:tasklet 也有该属性,也有同样的功能,可以尝试下!
applicationContext-quartz.xml内容如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd"> <bean id="SpringJobSchedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> <property name="triggers"> <list> <ref bean="cronTriggerBean" /> </list> </property> </bean> <bean id="cronTriggerBean" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <property name="jobDetail" ref="amqpItemReaderJobMethod" /> <property name="cronExpression" value="0/10 * * * * ?" /> </bean> <bean id="amqpItemReaderJobMethod" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <property name="targetObject"> <ref bean="amqpItemReaderQtz" /> </property> <property name="targetMethod"> <value>execute</value> </property> </bean> </beans>
使用MethodInvokingJobDetailFactoryBean绑定pojo的方式实现,类的方式报错,版本不一致,于是弃用;
amqpItemReaderQtz 内容如下:
import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component("amqpItemReaderQtz") public class AmqpItemReaderQtz { @Autowired private JobLauncher jobLauncher; @Autowired private Job logProcessJob; public void execute() { try { JobExecution result = jobLauncher.run(logProcessJob, new JobParameters()); // 调用batch任务 System.out.println(result); } catch (Exception e) { e.printStackTrace(); } } }
写到这,基本上已经完成了,至于mq环境搭建,发送消息等就不再介绍!
如有问题,可留言讨论,特别是不使用死循环和quartz的场景下,如何能使batch的reader一直能读取mq中的消息的方法,很想学习下!
共勉!!
推荐阅读
-
深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议
-
解决rabbitmq无限循环异常问题亲测 Caused by: org.springframework.amqp.AmqpException: No method found for class [B
-
Spring AMQP(集成了Rabbitmq)---代码版
-
Spring AMQP(集成了Rabbitmq)
-
RabbitMQ入门-AMQP协议
-
RabbitMQ与Spring AMQP
-
消息队列RabbitMQ之Spring-AMQP
-
rabbitMq-Spring AMQP
-
spring amqp rabbitmq 学习(二) 接收消息
-
spring amqp rabbitmq 学习(三) MessageConverter