spring-消息
1、异步消息
当一个消息发送时候,消息会被交给消息代理,消息代理可以确保消息被发送到指定的目的地,同时解放发送者,使其能够继续进行其它业务。消息代理通常有activemq、rabbitmq...,目的地通常有队列和主题,队列采用点对点的模型,主题采用发布订阅模型
- 点对点模型:消息队列可以有多个接受者,但每条消息只能被一个接收者取走
- 发布订阅模型:消息队列可以有多个订阅者,每条消息可以发送给多个主题订阅者
2、jms发送/接收消息
1)activemq配置,使用activemq,并使用了jmstemplate。
jms模板为开发者提供了与消息代理进行交互发送和接收消息的标准api,几乎每个消息代理都支持jms。jms模板和spring date提供的jdbc模板一样可以消除样板代码,让开发更集中在业务处理上。
<!-- activemq连接工厂 --> <bean id="activemqconnectionfactory" class="org.apache.activemq.activemqconnectionfactory"> <property name="brokerurl" value="tcp://localhost:61616"></property> </bean> <!-- 消息队列 --> <bean id="queue" class="org.apache.activemq.command.activemqqueue"> <constructor-arg value="userqueue"></constructor-arg> </bean> <!-- 消息主题 --> <bean id="topic" class="org.apache.activemq.command.activemqtopic"> <constructor-arg value="usertopic"></constructor-arg> </bean> <!-- 定义模板 --> <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate"> <property name="connectionfactory" ref="activemqconnectionfactory"></property> </bean>
2)发送消息
package com.cn.activemq; import com.cn.pojo.user; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.jms.core.jmstemplate; import org.springframework.jms.core.messagecreator; import org.springframework.stereotype.component; import javax.jms.destination; import javax.jms.jmsexception; import javax.jms.message; import javax.jms.session; @component public class sendmessagemqutil { @autowired private jmstemplate jmstemplate; @autowired @qualifier("queue") private destination queue; @autowired @qualifier("topic") private destination topic; /** * 队列--发送消息 * @param user */ public void senduserqueue(final user user){ jmstemplate.send(queue, new messagecreator() { public message createmessage(session session) throws jmsexception { return session.createobjectmessage(user); } }); } /** * 主题--发送消息 * @param user */ public void sendusertopic(final user user){ jmstemplate.send(topic, new messagecreator() { public message createmessage(session session) throws jmsexception { return session.createobjectmessage(user); } }); } }
3)接收消息
package com.cn.activemq; import com.cn.pojo.user; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.jms.core.jmstemplate; import org.springframework.stereotype.component; import javax.jms.destination; import javax.jms.jmsexception; import javax.jms.objectmessage; @component public class receivemessagemqutil { @autowired private jmstemplate jmstemplate; @autowired @qualifier("queue") private destination queue; @autowired @qualifier("topic") private destination topic; /** * 队列--接受消息 * @return */ public user receiveuserqueue(){ try { objectmessage objectmessage=(objectmessage) jmstemplate.receive(queue); return (user)objectmessage.getobject(); } catch (jmsexception e) { e.printstacktrace(); } return null; } /** * 主题--接受消息 * @return */ public user receiveusertopic(){ try { objectmessage objectmessage=(objectmessage) jmstemplate.receive(topic); return (user)objectmessage.getobject(); } catch (jmsexception e) { e.printstacktrace(); } return null; } }
4)测试
分别新建测试类
@runwith(springjunit4classrunner.class) @contextconfiguration(locations = {"classpath:spring-activemq.xml", "classpath:spring-redis.xml","classpath:springmvc.xml"})//不可使用classpath:spring-*.xml,否则配置文件不起作用 public class sendmessagemqutiltest { @autowired private sendmessagemqutil sendmessagemqutil; @test public void senduserqueue() throws exception { user user=new user("computer1", "111111"); sendmessagemqutil.senduserqueue(user); } @test public void sendusertopic() throws exception { user user=new user("computer2", "222222"); sendmessagemqutil.sendusertopic(user); } }
@runwith(springjunit4classrunner.class) @contextconfiguration(locations = {"classpath:spring-activemq.xml", "classpath:spring-redis.xml","classpath:springmvc.xml"}) public class receivemessagemqutiltest { @autowired private receivemessagemqutil receivemessagemqutil; @test public void receiveuserqueue() throws exception { user user=receivemessagemqutil.receiveuserqueue(); system.out.println("activemq接收到的数据:"+user); } @test public void receiveusertopic() throws exception { user user=receivemessagemqutil.receiveusertopic(); system.out.println("activemq接收到的数据:"+user); } @test public void receiveusertopic2() throws exception { user user=receivemessagemqutil.receiveusertopic(); system.out.println("activemq接收到的数据:"+user); } }
分别运行测试方法,结合activemq的控制台可以看出队列、主题以及各自的消费者等情况
显示队列:
显示主题:
3、其它
1)设置默认的目的地
上述在发送消息和接收消息时,每次调用发送/接收消息的方法都传入了一个目的地参数。然而,可以在jmstemplate实例化的时候,指定默认的目的地,如下:
<!-- 定义队列 --> <bean id="queue" class="org.apache.activemq.command.activemqqueue"> <constructor-arg value="userqueue"></constructor-arg> </bean> <!-- 定义模板 --> <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate"> <property name="connectionfactory" ref="activemqconnectionfactory"></property> <property name="defaultdestination" ref="queue"></property>//此处注入队列,也可以注入主题 </bean>
采用指定默认目的地的方式,则发送/接收消息调用的方法不用传递目的地了
/** * 队列--发送消息 * @param user */ public void senduserqueue(final user user){ jmstemplate.send(new messagecreator() { public message createmessage(session session) throws jmsexception { return session.createobjectmessage(user); } }); /** * 队列--接受消息 * @return */ public user receiveuserqueue(){ try { objectmessage objectmessage=(objectmessage) jmstemplate.receive(); return (user)objectmessage.getobject(); } catch (jmsexception e) { e.printstacktrace(); } return null; } }
2)发送消息对消息进行转换
除了send(..)方法,jmstemplate还提供了convertandsend()方法,该方法不需要messagecreator参数,而使用内置的消息转换器创建消息并发送。在jmstemplate实例化时未指定消息转换器,在调用convertandsend()方法则使用默认的simplemessageconverter消息转换器;receiveandconvert()方法则在接收时候使用消息转换器
/** * 队列--发送消息 * @param user */ public void senduserqueue(final user user){ jmstemplate.convertandsend(user); } /** * 队列--接受消息 * @return */ public user receiveuserqueue(){ return (user)jmstemplate.receiveandconvert(); }
在jmstemplate实例化指定消息转换器,则会在使用convertandsend()/receiveandconvert()方法使用指定的消息转换器
<!-- 定义队列 --> <bean id="queue" class="org.apache.activemq.command.activemqqueue"> <constructor-arg value="userqueue"></constructor-arg> </bean> <!-- 消息转换器 --> <bean id="mappingjackson2messageconverter" class="org.springframework.jms.support.converter.mappingjackson2messageconverter"></bean> <!-- 定义模板 --> <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate"> <property name="connectionfactory" ref="activemqconnectionfactory"></property> <property name="defaultdestination" ref="queue"></property> <property name="messageconverter" ref="mappingjackson2messageconverter"></property> </bean>
综合1)2),使用convertandsend()/receiveandconvert()发送和接收消息更加简单;在某些情况下,统一配置目的地也简化的使用