欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springmvc + amqp + batch + rabbitmq

程序员文章站 2022-05-29 17:05:16
...

最近在学习spring amqp + spring batch + rabbitmq 的一些东西,简单的就是实现向mq中写入日志,使用spring batch批量处理,当然了,在实际生产中这个例子不一定合理,基于学习需要,暂且不秋考虑!!!

 

因为日志是一直不断记录的,当然不一次就写完,这里使用的是quartz,定时去mq中取数据的方式,spring amqp可以使用listener的方式(SimpleMessageListenerContainer)来监听,尝试了下,没实现,大家可以一起探讨下!

 

闲话少说,进入正题!

 

环境:springmvc集成amqp+batch+rabbitmq

主要配置文件有:

  • applicationContext.xml
  • applicationContext-rabbitmq.xml
  • applicationContext-batch-beans.xml
  • applicationContext-batch.xml
  • applicationContext-quartz.xml

 

applicationContext.xml就不在介绍,主要是加载properties文件,datasource,当然还有一些公共的配置;

  

 

 applicationContext-rabbitmq.xml内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans
	xmlns="http://www.springframework.org/schema/rabbit"
	xmlns:beans="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/rabbit
	http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">

	<connection-factory
		id="rabbitConnectionFactory"
		host="${rabbit.mq.host}"
		username="${rabbit.mq.user}"
		password="${rabbit.mq.pwd}"
		port="${rabbit.mq.port}"
		connection-timeout="${rabbit.mq.connectionTimeout}"
		virtual-host="${rabbit.mq.virtualHost}"
		publisher-confirms="true"
		publisher-returns="true" />

	<admin connection-factory="rabbitConnectionFactory" />

	<queue
		id="logDirectQueue"
		name="log-direct-queue"
		durable="true"
		auto-delete="false"
		exclusive="false" />
	<queue
		id="logReplyDirectQueue"
		name="log-reply-direct-queue"
		durable="true"
		auto-delete="false"
		exclusive="false" />
	<direct-exchange
		id="logDirectExchange"
		name="log-direct-exchange"
		durable="true"
		auto-delete="false">
		<bindings>
			<binding
				queue="log-direct-queue"
				key="log-direct-routing-key" />
		</bindings>
	</direct-exchange>
	<template
		id="logDirectTemplate"
		queue="log-direct-queue"
		exchange="log-direct-exchange"
		routing-key="log-direct-routing-key"
		connection-factory="rabbitConnectionFactory"
		confirm-callback="sendConfirmCallBack"
		return-callback="sendReturnCallBack"
		recovery-callback="sendRecoveryCallback"
		reply-queue="log-reply-direct-queue">
		<reply-listener
			acknowledge="auto"
			auto-startup="true"
			min-start-interval="3000" />
	</template>

</beans:beans>

配置队列+exchange+binding+template时,需要注意的点,上面已经用红色注明,要保持一致,即将队列(log-direct-queue)与exchange(log-direct-exchange)通过路由规则(log-direct-routing-key)绑定;

 

 

 

applicationContext-batch-beans.xml内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans
	xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop.xsd">

	<bean
		id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property
			name="jobRepository"
			ref="jobRepository" />
	</bean>

	<bean
		id="jobRepository"
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
	</bean>


	<bean
		id="amqpItemReader"
		class="org.springframework.batch.item.amqp.AmqpItemReader">
		<constructor-arg
			name="amqpTemplate"
			ref="logDirectTemplate" />
		<property
			name="itemType"
			value="com.beauty.entity.BeautyHandlerLogs" />
	</bean>

</beans>

上面的不解释,主要说下标记的代码:使用batch自带的AmqpItemReader读取mq中的消息;

 

 

applicationContext-batch.xml内容如下:

<beans:beans
	xmlns="http://www.springframework.org/schema/batch"
	xmlns:beans="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
	http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/batch
	http://www.springframework.org/schema/batch/spring-batch.xsd">

	<beans:import resource="applicationContext-batch-beans.xml" />

	<job id="logProcessJob">
		<step id="logStep" allow-start-if-complete="true">
			<tasklet>
				<chunk
					reader="amqpItemReader"
					writer="amqpItemWriterImpl"
					commit-interval="10" />
			</tasklet>
		</step>
	</job>


</beans:beans>

具体配置不介绍了,主要说明下标记代码:只有添加上 allow-start-if-complete="true",quartz中的定时任务才能不断从mq中获取消息,即 logProcessJob 能不断重复执行,否则只取一次就不再执行了!这个需要特别注意,我也是找了好久,又尝试了好久才搞定的!

注:tasklet 也有该属性,也有同样的功能,可以尝试下!

 

 

applicationContext-quartz.xml内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans
	xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop 
        http://www.springframework.org/schema/aop/spring-aop.xsd">

	<bean
		id="SpringJobSchedulerFactoryBean"
		class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
		<property name="triggers">
			<list>
				<ref bean="cronTriggerBean" />
			</list>
		</property>
	</bean>

	<bean
		id="cronTriggerBean"
		class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
		<property
			name="jobDetail"
			ref="amqpItemReaderJobMethod" />
		<property
			name="cronExpression"
			value="0/10 * * * * ?" />
	</bean>

	<bean
		id="amqpItemReaderJobMethod"
		class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
		<property name="targetObject">
			<ref bean="amqpItemReaderQtz" />
		</property>
		<property name="targetMethod">
			<value>execute</value>
		</property>
	</bean>


</beans>

使用MethodInvokingJobDetailFactoryBean绑定pojo的方式实现,类的方式报错,版本不一致,于是弃用;

 

amqpItemReaderQtz 内容如下:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component("amqpItemReaderQtz")
public class AmqpItemReaderQtz {

	@Autowired
	private JobLauncher jobLauncher;

	@Autowired
	private Job logProcessJob;

	public void execute() {
		try {
			JobExecution result = jobLauncher.run(logProcessJob, new JobParameters()); // 调用batch任务
			System.out.println(result);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

 

写到这,基本上已经完成了,至于mq环境搭建,发送消息等就不再介绍!

 

如有问题,可留言讨论,特别是不使用死循环和quartz的场景下,如何能使batch的reader一直能读取mq中的消息的方法,很想学习下!

 

共勉!!