SpringMVC和rabbitmq集成的使用案例
1.添加maven依赖
<dependency> <groupid>com.rabbitmq</groupid> <artifactid>amqp-client</artifactid> <version>3.5.1</version> </dependency> <dependency> <groupid>org.springframework.amqp</groupid> <artifactid>spring-rabbit</artifactid> <version>1.4.5.release</version> </dependency>
2.spring主配置文件中加入rabbitmq xml文件的配置
<!-- rabbitmq 配置 --> <import resource="/application-mq.xml"/>
3.jdbc配置文件中加入 rabbitmq的链接配置
#rabbitmq配置 mq.host=localhost mq.username=donghao mq.password=donghao mq.port=5672 mq.vhost=testmq
4.新建application-mq.xml文件,添加配置信息
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <description>rabbitmq 连接服务配置</description> <!-- 连接配置 --> <rabbit:connection-factory id="connectionfactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/> <rabbit:admin connection-factory="connectionfactory"/> <!-- spring template声明--> <rabbit:template exchange="koms" id="amqptemplate" connection-factory="connectionfactory" message-converter="jsonmessageconverter" /> <!-- 消息对象json转换类 --> <bean id="jsonmessageconverter" class="org.springframework.amqp.support.converter.jackson2jsonmessageconverter" /> <!-- durable:是否持久化 exclusive: 仅创建者可以使用的私有队列,断开后自动删除 auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 --> <!-- 申明一个消息队列queue --> <rabbit:queue id="order" name="order" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="activity" name="activity" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="mail" name="mail" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="stock" name="stock" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue id="autoprint" name="autoprint" durable="true" auto-delete="false" exclusive="false" /> <!-- rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。 rabbit:binding:设置消息queue匹配的key --> <!-- 交换机定义 --> <rabbit:direct-exchange name="koms" durable="true" auto-delete="false" id="koms"> <rabbit:bindings> <rabbit:binding queue="order" key="order"/> <rabbit:binding queue="activity" key="activity"/> <rabbit:binding queue="mail" key="mail"/> <rabbit:binding queue="stock" key="stock"/> <rabbit:binding queue="autoprint" key="autoprint"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- queues:监听的队列,多个的话用逗号(,)分隔 ref:监听器 --> <!-- 配置监听 acknowledeg = "manual" 设置手动应答 当消息处理失败时:会一直重发 直到消息处理成功 --> <rabbit:listener-container connection-factory="connectionfactory" acknowledge="manual"> <!-- 配置监听器 --> <rabbit:listener queues="activity" ref="activitylistener"/> <rabbit:listener queues="order" ref="orderlistener"/> <rabbit:listener queues="mail" ref="maillistener"/> <rabbit:listener queues="stock" ref="stocklistener"/> <rabbit:listener queues="autoprint" ref="autoprintlistener"/> </rabbit:listener-container> </beans>
5.新增公共入队类
@service public class mqproducerimpl{ @resource private amqptemplate amqptemplate; private final static logger logger = loggerfactory.getlogger(mqproducerimpl.class); //公共入队方法 public void senddatatoqueue(string queuekey, object object) { try { amqptemplate.convertandsend(queuekey, object); } catch (exception e) { logger.error(e.tostring()); } } }
6.创建监听类
import java.io.ioexception; import java.util.list; import javax.annotation.resource; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.amqp.core.message; import org.springframework.amqp.rabbit.core.channelawaremessagelistener; import org.springframework.amqp.utils.serializationutils; import org.springframework.stereotype.component; import org.springframework.transaction.annotation.transactional; import com.cn.framework.domain.basedto; import com.cn.framework.util.constantutils; import com.cn.framework.util.rabbitmq.producer.mqproducer; import com.kxs.service.activityservice.iactivityservice; import com.kxs.service.messageservice.imessageservice; import com.rabbitmq.client.channel; /** * 活动处理listener * @author * @date 2017年6月30日 **/ @component public class activitylistener implements channelawaremessagelistener { private static final logger log = loggerfactory.getlogger(activitylistener.class); @override @transactional public void onmessage(message message,channel channel) { } }
项目启动后 控制台会打印出监听的日志信息 这里写图片描述
结尾:仅供参考,自己用作学习记录,不喜勿喷,共勉!
补充:rabbitmq与springmvc集成并实现发送消息和接收消息(持久化)方案
rabbitmq本篇不介绍了,直接描述rabbitmq与springmvc集成并实现发送消息和接收消息(持久化)。
使用了spring-rabbit 发送消息和接收消息,我们使用的maven来管理jar包,在maven的pom.xml文件中引入jar包
<span style="font-size:18px;"> <dependency> <groupid>org.springframework.amqp</groupid> <artifactid>spring-rabbit</artifactid> <version>1.3.6.release</version> </dependency></span>
1.实现生产者
第一步:是要设置调用安装rabbitmq的ip、端口等
配置一个global.properties文件
第二步:通过springmvc把global.properties文件读进来
<span style="font-size:18px;"><!-- 注入属性文件 --> <bean id="propertyconfigurer" class="org.springframework.beans.factory.config.propertyplaceholderconfigurer"> <property name="locations"> <list> <value>classpath:global.properties</value> </list> </property> </bean> </span>
第三步:配置 rabbitmq服务器连接、创建rabbittemplate 消息模板类等,在springmvc的配置文件加入下面这些
<bean id="rmqproducer2" class="cn.test.spring.rabbitmq.rmqproducer"></bean> <span style="font-size:18px;"> <!-- 创建连接类 --> <bean id="connectionfactory" class="org.springframework.amqp.rabbit.connection.cachingconnectionfactory"> <constructor-arg value="localhost" /> <property name="username" value="${rmq.manager.user}" /> <property name="password" value="${rmq.manager.password}" /> <property name="host" value="${rmq.ip}" /> <property name="port" value="${rmq.port}" /> </bean> <bean id="rabbitadmin" class="org.springframework.amqp.rabbit.core.rabbitadmin"> <constructor-arg ref="connectionfactory" /> </bean> <!-- 创建rabbittemplate 消息模板类 --> <bean id="rabbittemplate" class="org.springframework.amqp.rabbit.core.rabbittemplate"> <constructor-arg ref="connectionfactory"></constructor-arg> </bean> </span>
第四步:实现消息类实体和发送消息
类实体
<span style="font-size:18px;">/** * 消息 * */ public class rabbitmessage implements serializable { private static final long serialversionuid = -6487839157908352120l; private class<?>[] paramtypes;//参数类型 private string exchange;//交换器 private object[] params; private string routekey;//路由key public rabbitmessage(){} public rabbitmessage(string exchange,string routekey,object...params) { this.params=params; this.exchange=exchange; this.routekey=routekey; } @suppresswarnings("rawtypes") public rabbitmessage(string exchange,string routekey,string methodname,object...params) { this.params=params; this.exchange=exchange; this.routekey=routekey; int len=params.length; class[] clazzarray=new class[len]; for(int i=0;i<len;i++) clazzarray[i]=params[i].getclass(); this.paramtypes=clazzarray; } public byte[] getserialbytes() { byte[] res=new byte[0]; bytearrayoutputstream baos=new bytearrayoutputstream(); objectoutputstream oos; try { oos = new objectoutputstream(baos); oos.writeobject(this); oos.close(); res=baos.tobytearray(); } catch (ioexception e) { e.printstacktrace(); } return res; } public string getroutekey() { return routekey; } public string getexchange() { return exchange; } public void setexchange(string exchange) { this.exchange = exchange; } public void setroutekey(string routekey) { this.routekey = routekey; } public class<?>[] getparamtypes() { return paramtypes; } public object[] getparams() { return params; } } </span>
发送消息
<span style="font-size:18px;">/** * 生产着 * */ public class rmqproducer { @resource private rabbittemplate rabbittemplate; /** * 发送信息 * @param msg */ public void sendmessage(rabbitmessage msg) { try { system.out.println(rabbittemplate.getconnectionfactory().gethost()); system.out.println(rabbittemplate.getconnectionfactory().getport()); //发送信息 rabbittemplate.convertandsend(msg.getexchange(), msg.getroutekey(), msg); } catch (exception e) { } } }</span>
说明:
1. rabbittemplate.convertandsend(msg.getexchange(), msg.getroutekey(), msg);
源代码中的send调用的方法,一些发送消息帮我们实现好了。
2.上面的代码实现没申明交换器和队列,rabbitmq不知交换器和队列他们的绑定关系,如果rabbitmq管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。
我们也可以用代码申明:
rabbitadmin要申明:eclareexchange方法 参数是交换器
bindingbuilder.bind(queue).to(directexchange).with(queuename);//将queue绑定到exchange rabbitadmin.declarebinding(binding);//声明绑定关系
源代码有这些方法:
这样就可以实现交换器和队列的绑定关系
交换器我们可以申明为持久化,还有使用完不会自动删除
topicexchange 参数的说明:name是交换器名称,durable:true 是持久化 autodelete:false使用完不删除
源代码:
队列也可以申明为持久化
第五步:实现测试类
<span style="font-size:18px;">@resource private rmqproducer rmqproducer2; @test public void test() throws ioexception { string exchange="testexchange";交换器 string routekey="testqueue";//队列 string methodname="test";//调用的方法 //参数 map<string,object> param=new hashmap<string, object>(); param.put("data","hello"); rabbitmessage msg=new rabbitmessage(exchange,routekey, methodname, param); //发送消息 rmqproducer2.sendmessage(msg); }</span>
结果:rabbitmq有一条消息
2.消费者
第一步:rabbitmq服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 rabbitmq服务器连接、创建rabbittemplate 消息模板类、消息转换器、消息转换器监听器等,在springmvc的配置文件加入下面这些
<span style="font-size:18px;"> <!-- 创建连接类 --> <bean id="connectionfactory" class="org.springframework.amqp.rabbit.connection.cachingconnectionfactory"> <constructor-arg value="localhost" /> <property name="username" value="${rmq.manager.user}" /> <property name="password" value="${rmq.manager.password}" /> <property name="host" value="${rmq.ip}" /> <property name="port" value="${rmq.port}" /> </bean> <bean id="rabbitadmin" class="org.springframework.amqp.rabbit.core.rabbitadmin"> <constructor-arg ref="connectionfactory" /> </bean> <!-- 创建rabbittemplate 消息模板类 --> <bean id="rabbittemplate" class="org.springframework.amqp.rabbit.core.rabbittemplate"> <constructor-arg ref="connectionfactory"></constructor-arg> </bean> <!-- 创建消息转换器为simplemessageconverter --> <bean id="serializermessageconverter" class="org.springframework.amqp.support.converter.simplemessageconverter"></bean> <!-- 设置持久化的队列 --> <bean id="queue" class="org.springframework.amqp.core.queue"> <constructor-arg index="0" value="testqueue"></constructor-arg> <constructor-arg index="1" value="true"></constructor-arg> <constructor-arg index="2" value="false"></constructor-arg> <constructor-arg index="3" value="false"></constructor-arg> </bean> <!--创建交换器的类型 并持久化--> <bean id="topicexchange" class="org.springframework.amqp.core.topicexchange"> <constructor-arg index="0" value="testexchange"></constructor-arg> <constructor-arg index="1" value="true"></constructor-arg> <constructor-arg index="2" value="false"></constructor-arg> </bean> <util:map id="arguments"> </util:map> <!-- 绑定交换器、队列 --> <bean id="binding" class="org.springframework.amqp.core.binding"> <constructor-arg index="0" value="testqueue"></constructor-arg> <constructor-arg index="1" value="queue"></constructor-arg> <constructor-arg index="2" value="testexchange"></constructor-arg> <constructor-arg index="3" value="testqueue"></constructor-arg> <constructor-arg index="4" value="#{arguments}"></constructor-arg> </bean> <!-- 用于接收消息的处理类 --> <bean id="rmqconsumer" class="cn.test.spring.rabbitmq.rmqconsumer"></bean> <bean id="messagelisteneradapter" class="org.springframework.amqp.rabbit.listener.adapter.messagelisteneradapter"> <constructor-arg ref="rmqconsumer" /> <property name="defaultlistenermethod" value="rmqproducermessage"></property> <property name="messageconverter" ref="serializermessageconverter"></property> </bean> <!-- 用于消息的监听的容器类simplemessagelistenercontainer,监听队列 queues可以传多个--> <bean id="listenercontainer" class="org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer"> <property name="queues" ref="queue"></property> <property name="connectionfactory" ref="connectionfactory"></property> <property name="messagelistener" ref="messagelisteneradapter"></property> </bean> </span>
说明:
1.org.springframework.amqp.rabbit.listener.simplemessagelistenercontainer中的queues可以传入多个队列
2.org.springframework.amqp.rabbit.listener.adapter.messagelisteneradapter
有哪个消费者适配器来处理 ,参数defaultlistenermethod是默认调用方法来处理消息。
3.交换器和队列的持久化在生产者有介绍过了。
4.org.springframework.amqp.core.binding这个类的绑定,在springmvc配置文件中配置时,
destinationtype这个参数要注意点
源代码:
第二步:处理消息
<span style="font-size:18px;">/** * 消费者 * */ public class rmqconsumer { public void rmqproducermessage(object object){ rabbitmessage rabbitmessage=(rabbitmessage) object; system.out.println(rabbitmessage.getexchange()); system.out.println(rabbitmessage.getroutekey()); system.out.println(rabbitmessage.getparams().tostring()); } }</span>
在启动过程中会报这样的错误,可能是你的交换器和队列没配置好
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。
上一篇: 劝酒顺口溜
下一篇: cdr怎么给图片添加朦胧效果?
推荐阅读
-
php curl抓取网页的介绍和推广及使用CURL抓取淘宝页面集成方法
-
SpringMVC使用ModelAndView的相对路径和绝对路径的问题
-
Oracle使用触发器和mysql中使用触发器的案例比较
-
Java中异常上抛和异常捕捉的具体使用案例
-
Java中stream处理中map与flatMap的比较和使用案例
-
oracle子查询中使用some、any和all的案例
-
使用Python和AWK两种方式实现文本处理的长拼接案例
-
C++ 实验二 NO.1_(3) 1:熟悉DEV环境,练习自己的第一个程序使用DEV集成环境来编辑,运行简单的数据输入和运算实验。(3)编写一个程序,要求:提示输入3个数;显示这3个数,求他们的平均值
-
IDEA集成SVN的配置和使用
-
在集成测试中使用Mock和Stub的几种方法 jmockstubbeanpostprocessorintegration testspring test