欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringMVC和rabbitmq集成的使用案例

程序员文章站 2022-06-25 14:48:20
1.添加maven依赖 com.rabbitmq amqp-cli...

1.添加maven依赖

<dependency>
 <groupid>com.rabbitmq</groupid>
 <artifactid>amqp-client</artifactid>
 <version>3.5.1</version>
</dependency>
<dependency>
 <groupid>org.springframework.amqp</groupid>
 <artifactid>spring-rabbit</artifactid>
 <version>1.4.5.release</version>
</dependency>

2.spring主配置文件中加入rabbitmq xml文件的配置

<!-- rabbitmq 配置 -->
 <import resource="/application-mq.xml"/>

3.jdbc配置文件中加入 rabbitmq的链接配置

#rabbitmq配置
mq.host=localhost
mq.username=donghao
mq.password=donghao
mq.port=5672
mq.vhost=testmq

4.新建application-mq.xml文件,添加配置信息

 <beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
 xsi:schemalocation="http://www.springframework.org/schema/beans 
 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
 http://www.springframework.org/schema/rabbit
 http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
 <description>rabbitmq 连接服务配置</description>
 <!-- 连接配置 -->
 <rabbit:connection-factory id="connectionfactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/>
 <rabbit:admin connection-factory="connectionfactory"/>
 <!-- spring template声明-->
 <rabbit:template exchange="koms" id="amqptemplate" connection-factory="connectionfactory" message-converter="jsonmessageconverter" />
 <!-- 消息对象json转换类 -->
 <bean id="jsonmessageconverter" class="org.springframework.amqp.support.converter.jackson2jsonmessageconverter" /> 
 <!-- 
  durable:是否持久化
  exclusive: 仅创建者可以使用的私有队列,断开后自动删除
  auto_delete: 当所有消费客户端连接断开后,是否自动删除队列
  -->
  <!-- 申明一个消息队列queue -->
 <rabbit:queue id="order" name="order" durable="true" auto-delete="false" exclusive="false" />
  <rabbit:queue id="activity" name="activity" durable="true" auto-delete="false" exclusive="false" />
  <rabbit:queue id="mail" name="mail" durable="true" auto-delete="false" exclusive="false" />
  <rabbit:queue id="stock" name="stock" durable="true" auto-delete="false" exclusive="false" />
  <rabbit:queue id="autoprint" name="autoprint" durable="true" auto-delete="false" exclusive="false" />
 <!--
  rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。 
 rabbit:binding:设置消息queue匹配的key
  -->
 <!-- 交换机定义 -->
 <rabbit:direct-exchange name="koms" durable="true" auto-delete="false" id="koms">
 <rabbit:bindings>
  <rabbit:binding queue="order" key="order"/>
   <rabbit:binding queue="activity" key="activity"/>
   <rabbit:binding queue="mail" key="mail"/>
   <rabbit:binding queue="stock" key="stock"/>
   <rabbit:binding queue="autoprint" key="autoprint"/>
 </rabbit:bindings>
</rabbit:direct-exchange>
 <!--
   queues:监听的队列,多个的话用逗号(,)分隔 
  ref:监听器
  -->
 <!-- 配置监听 acknowledeg = "manual" 设置手动应答 当消息处理失败时:会一直重发 直到消息处理成功 -->
 <rabbit:listener-container connection-factory="connectionfactory" acknowledge="manual">
 <!-- 配置监听器 -->
  <rabbit:listener queues="activity" ref="activitylistener"/>
   <rabbit:listener queues="order" ref="orderlistener"/>
  <rabbit:listener queues="mail" ref="maillistener"/>
  <rabbit:listener queues="stock" ref="stocklistener"/>
  <rabbit:listener queues="autoprint" ref="autoprintlistener"/>
 </rabbit:listener-container>
</beans>

5.新增公共入队类

@service
public class mqproducerimpl{
@resource
 private amqptemplate amqptemplate;
 private final static logger logger = loggerfactory.getlogger(mqproducerimpl.class);
 //公共入队方法
 public void senddatatoqueue(string queuekey, object object) {
  try {
   amqptemplate.convertandsend(queuekey, object);
  } catch (exception e) {
   logger.error(e.tostring());
  }
 }
}

6.创建监听类

SpringMVC和rabbitmq集成的使用案例

import java.io.ioexception;
import java.util.list;
import javax.annotation.resource;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.amqp.core.message;
import org.springframework.amqp.rabbit.core.channelawaremessagelistener;
import org.springframework.amqp.utils.serializationutils;
import org.springframework.stereotype.component;
import org.springframework.transaction.annotation.transactional;
import com.cn.framework.domain.basedto;
import com.cn.framework.util.constantutils;
import com.cn.framework.util.rabbitmq.producer.mqproducer;
import com.kxs.service.activityservice.iactivityservice;
import com.kxs.service.messageservice.imessageservice;
import com.rabbitmq.client.channel;
/**
 * 活动处理listener
* @author
* @date 2017年6月30日
**/
@component
public class activitylistener implements channelawaremessagelistener {
 private static final logger log = loggerfactory.getlogger(activitylistener.class);
 @override
 @transactional
 public void onmessage(message message,channel channel) {
 }
}

SpringMVC和rabbitmq集成的使用案例

项目启动后 控制台会打印出监听的日志信息 这里写图片描述

结尾:仅供参考,自己用作学习记录,不喜勿喷,共勉!

补充:rabbitmq与springmvc集成并实现发送消息和接收消息(持久化)方案

rabbitmq本篇不介绍了,直接描述rabbitmq与springmvc集成并实现发送消息和接收消息(持久化)。

使用了spring-rabbit 发送消息和接收消息,我们使用的maven来管理jar包,在maven的pom.xml文件中引入jar包

<span style="font-size:18px;"> <dependency>
  <groupid>org.springframework.amqp</groupid>
  <artifactid>spring-rabbit</artifactid>
   <version>1.3.6.release</version>
 </dependency></span>

1.实现生产者

第一步:是要设置调用安装rabbitmq的ip、端口等

配置一个global.properties文件

SpringMVC和rabbitmq集成的使用案例

第二步:通过springmvc把global.properties文件读进来

<span style="font-size:18px;"><!-- 注入属性文件 --> 
 <bean id="propertyconfigurer" class="org.springframework.beans.factory.config.propertyplaceholderconfigurer"> 
  <property name="locations"> 
   <list> 
    <value>classpath:global.properties</value> 
   </list> 
  </property> 
 </bean> </span>

第三步:配置 rabbitmq服务器连接、创建rabbittemplate 消息模板类等,在springmvc的配置文件加入下面这些

<bean id="rmqproducer2" class="cn.test.spring.rabbitmq.rmqproducer"></bean>
<span style="font-size:18px;"> <!-- 创建连接类 --> 
 <bean id="connectionfactory" class="org.springframework.amqp.rabbit.connection.cachingconnectionfactory"> 
  <constructor-arg value="localhost" /> 
  <property name="username" value="${rmq.manager.user}" /> 
  <property name="password" value="${rmq.manager.password}" /> 
  <property name="host" value="${rmq.ip}" /> 
  <property name="port" value="${rmq.port}" /> 
 </bean> 
  
 <bean id="rabbitadmin" class="org.springframework.amqp.rabbit.core.rabbitadmin"> 
  <constructor-arg ref="connectionfactory" /> 
 </bean> 
  <!-- 创建rabbittemplate 消息模板类 --> 
 <bean id="rabbittemplate" class="org.springframework.amqp.rabbit.core.rabbittemplate"> 
  <constructor-arg ref="connectionfactory"></constructor-arg> 
 </bean> </span>

第四步:实现消息类实体和发送消息

类实体

<span style="font-size:18px;">/**
 * 消息
 *
 */
public class rabbitmessage implements serializable
{
	private static final long serialversionuid = -6487839157908352120l;	
	private class<?>[] paramtypes;//参数类型
	private string exchange;//交换器	
	private object[] params;	
	private string routekey;//路由key	
	public rabbitmessage(){} 
	public rabbitmessage(string exchange,string routekey,object...params)
	{
		this.params=params;		
		this.exchange=exchange;
		this.routekey=routekey;
	}
	
	@suppresswarnings("rawtypes")
	public rabbitmessage(string exchange,string routekey,string methodname,object...params)
	{
		this.params=params;		
		this.exchange=exchange;
		this.routekey=routekey;
		int len=params.length;
		class[] clazzarray=new class[len];
		for(int i=0;i<len;i++)
			clazzarray[i]=params[i].getclass();
		this.paramtypes=clazzarray;
	}
	
	public byte[] getserialbytes()
	{
		byte[] res=new byte[0];
		bytearrayoutputstream baos=new bytearrayoutputstream();
		objectoutputstream oos;
		try {
			oos = new objectoutputstream(baos);
			oos.writeobject(this);
			oos.close();
			res=baos.tobytearray();
		} catch (ioexception e) {
			e.printstacktrace();
		}		
		return res;
	}	
	
	public string getroutekey() {
		return routekey;
	} 
 
	public string getexchange() {
		return exchange;
	}
 
	public void setexchange(string exchange) {
		this.exchange = exchange;
	}
 
	public void setroutekey(string routekey) {
		this.routekey = routekey;
	} 
 
	public class<?>[] getparamtypes() {
		return paramtypes;
	} 

	public object[] getparams() {
		return params;
	}	
}
</span>

发送消息

<span style="font-size:18px;">/**
 * 生产着
 *
 */ 
public class rmqproducer
{ 
 @resource
 private rabbittemplate rabbittemplate; 
 /**
 * 发送信息
 * @param msg
 */
 public void sendmessage(rabbitmessage msg)
 {
 try {
 system.out.println(rabbittemplate.getconnectionfactory().gethost());
 system.out.println(rabbittemplate.getconnectionfactory().getport());
 //发送信息
  rabbittemplate.convertandsend(msg.getexchange(), msg.getroutekey(), msg); 
 } catch (exception e) {
 } 
 } 
}</span>

说明:

1. rabbittemplate.convertandsend(msg.getexchange(), msg.getroutekey(), msg);

源代码中的send调用的方法,一些发送消息帮我们实现好了。

SpringMVC和rabbitmq集成的使用案例

2.上面的代码实现没申明交换器和队列,rabbitmq不知交换器和队列他们的绑定关系,如果rabbitmq管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。

SpringMVC和rabbitmq集成的使用案例

我们也可以用代码申明:

rabbitadmin要申明:eclareexchange方法 参数是交换器

bindingbuilder.bind(queue).to(directexchange).with(queuename);//将queue绑定到exchange 
rabbitadmin.declarebinding(binding);//声明绑定关系

源代码有这些方法:

SpringMVC和rabbitmq集成的使用案例

这样就可以实现交换器和队列的绑定关系

交换器我们可以申明为持久化,还有使用完不会自动删除

topicexchange 参数的说明:name是交换器名称,durable:true 是持久化 autodelete:false使用完不删除

源代码:

SpringMVC和rabbitmq集成的使用案例

队列也可以申明为持久化

SpringMVC和rabbitmq集成的使用案例

第五步:实现测试类

<span style="font-size:18px;">@resource
 private rmqproducer rmqproducer2; 
 @test
 public void test() throws ioexception
 { 
 string exchange="testexchange";交换器
 string routekey="testqueue";//队列
 string methodname="test";//调用的方法
 //参数
 map<string,object> param=new hashmap<string, object>();
 param.put("data","hello");
 
 rabbitmessage msg=new rabbitmessage(exchange,routekey, methodname, param);
 //发送消息
 rmqproducer2.sendmessage(msg);
 
 }</span>

结果:rabbitmq有一条消息

SpringMVC和rabbitmq集成的使用案例

2.消费者

第一步:rabbitmq服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 rabbitmq服务器连接、创建rabbittemplate 消息模板类、消息转换器、消息转换器监听器等,在springmvc的配置文件加入下面这些

<span style="font-size:18px;"> <!-- 创建连接类 --> 
 <bean id="connectionfactory" class="org.springframework.amqp.rabbit.connection.cachingconnectionfactory"> 
  <constructor-arg value="localhost" /> 
  <property name="username" value="${rmq.manager.user}" /> 
  <property name="password" value="${rmq.manager.password}" /> 
  <property name="host" value="${rmq.ip}" /> 
  <property name="port" value="${rmq.port}" /> 
 </bean> 
  
 <bean id="rabbitadmin" class="org.springframework.amqp.rabbit.core.rabbitadmin"> 
  <constructor-arg ref="connectionfactory" /> 
 </bean> 
  <!-- 创建rabbittemplate 消息模板类 --> 
 <bean id="rabbittemplate" class="org.springframework.amqp.rabbit.core.rabbittemplate"> 
  <constructor-arg ref="connectionfactory"></constructor-arg> 
 </bean>  
 
  <!-- 创建消息转换器为simplemessageconverter --> 
 <bean id="serializermessageconverter" class="org.springframework.amqp.support.converter.simplemessageconverter"></bean>  
 
 <!-- 设置持久化的队列 --> 
 <bean id="queue" class="org.springframework.amqp.core.queue"> 
  <constructor-arg index="0" value="testqueue"></constructor-arg> 
  <constructor-arg index="1" value="true"></constructor-arg> 
  <constructor-arg index="2" value="false"></constructor-arg> 
  <constructor-arg index="3" value="false"></constructor-arg> 
 </bean>  
 
  <!--创建交换器的类型 并持久化--> 
 <bean id="topicexchange" class="org.springframework.amqp.core.topicexchange">
  <constructor-arg index="0" value="testexchange"></constructor-arg> 
  <constructor-arg index="1" value="true"></constructor-arg> 
  <constructor-arg index="2" value="false"></constructor-arg> 
 </bean>
 
 <util:map id="arguments">  
 </util:map> 
 
 <!-- 绑定交换器、队列 --> 
 <bean id="binding" class="org.springframework.amqp.core.binding"> 
  <constructor-arg index="0" value="testqueue"></constructor-arg> 
  <constructor-arg index="1" value="queue"></constructor-arg> 
  <constructor-arg index="2" value="testexchange"></constructor-arg>
  <constructor-arg index="3" value="testqueue"></constructor-arg> 
  <constructor-arg index="4" value="#{arguments}"></constructor-arg> 
 </bean> 
 
 
 <!-- 用于接收消息的处理类 --> 
 <bean id="rmqconsumer" class="cn.test.spring.rabbitmq.rmqconsumer"></bean> 
 
 <bean id="messagelisteneradapter" class="org.springframework.amqp.rabbit.listener.adapter.messagelisteneradapter"> 
  <constructor-arg ref="rmqconsumer" /> 
  <property name="defaultlistenermethod" value="rmqproducermessage"></property> 
  <property name="messageconverter" ref="serializermessageconverter"></property> 
 </bean> 
 
 <!-- 用于消息的监听的容器类simplemessagelistenercontainer,监听队列 queues可以传多个--> 
 <bean id="listenercontainer" class="org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer"> 
  <property name="queues" ref="queue"></property> 
  <property name="connectionfactory" ref="connectionfactory"></property> 
  <property name="messagelistener" ref="messagelisteneradapter"></property> 
 </bean> 
 </span>

说明:

1.org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer中的queues可以传入多个队列

SpringMVC和rabbitmq集成的使用案例

2.org.springframework.amqp.rabbit.listener.adapter.messagelisteneradapter

有哪个消费者适配器来处理 ,参数defaultlistenermethod是默认调用方法来处理消息。

3.交换器和队列的持久化在生产者有介绍过了。

4.org.springframework.amqp.core.binding这个类的绑定,在springmvc配置文件中配置时,

destinationtype这个参数要注意点

源代码:

SpringMVC和rabbitmq集成的使用案例

第二步:处理消息

<span style="font-size:18px;">/**
 * 消费者
 *
 */
public class rmqconsumer 
{
 public void rmqproducermessage(object object){ 
 rabbitmessage rabbitmessage=(rabbitmessage) object; 
 system.out.println(rabbitmessage.getexchange());
 system.out.println(rabbitmessage.getroutekey());
 system.out.println(rabbitmessage.getparams().tostring()); 
 } 
}</span>

在启动过程中会报这样的错误,可能是你的交换器和队列没配置好

SpringMVC和rabbitmq集成的使用案例

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。