springmvc+activeMQ
什么要使用ActiveMQ
- 多个项目之间集成
(1) 跨平台
(2) 多语言
(3) 多项目 - 降低系统间模块的耦合度,解耦
(1) 软件扩展性 - 系统前后端隔离
(1) 前后端隔离,屏蔽高安全区
什么是JMS:
JMS 即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
ActiveMQ介绍
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
特点:
1、支持多种语言编写客户端
2、对spring的支持,很容易和spring整合
3、支持多种传输协议:TCP,SSL,NIO,UDP等
4、支持AJAX
消息形式:
1、点对点(queue)
2、一对多(topic)
安装ActiveMQ
不想在本机安装,所以自己开了个虚拟机,在linux centos7下面安装ActiveMQ。
1、下载ActiveMQ
官网下载http://activemq.apache.org/download-archives.htmlapache-activemq-5.15.0-bin.tar.gz
自己选择对应的版本,例如我自己下载的是apache-activemq-5.15.0-bin.tar.gz的5.15.0的linux的版本,上传到linux中,解压,然后进入 {解压文件}/bin/linux-x86-{64位/32位} 下面输入 ./activemq start启动,如果要关闭,输入 ./activemq stop就行了。启动成功后,在浏览器输入 http://{ip}:8161/admin , 默认账号:admin,密码:admin, 会出现以下界面:
ActiveMQ+SpringMVC
首先创建一个简单的springmvc项目,这里就不展示了,接着添加activeMQ的配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
>
<context:component-scan base-package="com.unionpay.demo.activemq" />
<mvc:annotation-driven />
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://192.168.10.12:61616" <!--正确的链接地址-->
userName="admin"
password="admin" />
<!-- 配置JMS连接工长,这种为不适用pool链接形式 -->
<!-- <bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean> -->
<!-- 使用pool进行链接 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="amqConnectionFactory"/>
<property name="maxConnections" value="10"/>
</bean>
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
<!-- 定义消息队列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg>
<value>unionpay.demo</value>
</constructor-arg>
</bean>
<!-- 定义消息监听(Queue) -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="client">
<jms:listener destination="unionpay.demo" ref="mqListen" />
<jms:listener destination="unionpay.demo" ref="mqListen2" />
</jms:listener-container>
<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="demoQueueDestination" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false" />
</bean>
<bean id="producerService" class="com.unionpay.demo.activemq.producer.ProducerService">
</bean>
<bean id="mqListen" class="com.unionpay.demo.activemq.consumer.MqListen">
</bean>
<bean id="mqListen2" class="com.unionpay.demo.activemq.consumer.MqListen2">
</bean>
</beans>
添加依赖:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.0</version>
</dependency>
我们来构建一个消息的发送者:
@Service
public class ProducerService {
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
public void sendMessage(Destination destination,final String msg){
System.out.println("Send " + msg + " to Destination " + destination.toString());
MessageCreator messageCreator = new MessageCreator(){
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
};
jmsTemplate.send(destination, messageCreator);
}
}
我们可以自己构建一个简单的controller,页面这里就不展示了:
@Controller
public class DemoController {
@Resource(name = "demoQueueDestination")
private Destination demoQueueDestination;
@Resource(name = "producerService")
private ProducerService producer;
@RequestMapping(value = "/onsend", method = RequestMethod.POST)
public ModelAndView producer(@RequestParam("message") String message) {
System.out.println("---------send to jms");
ModelAndView mv = new ModelAndView();
Random ran = new Random();
int num = ran.nextInt(100);
producer.sendMessage(demoQueueDestination, "消息序号:"+num);
System.out.println("---------成功发送消息----消息序号:"+num);
return mv;
}
创建我们在配置文件中定义的两个监听类:
public class MqListen implements MessageListener {
public void onMessage(Message message) {
System.out.println("开始监听============");
TextMessage tm = (TextMessage) message;
try {
System.out.println("成功监听消息="+tm.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class MqListen2 implements MessageListener {
public void onMessage(Message message) {
System.out.println("开始监听2============");
TextMessage tm = (TextMessage) message;
try {
System.out.println("成功监听消息2="+tm.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我们启动项目,运行发送消息,我们会发现,只有一个监听器监听到了,另一个没有反应。这里就是我们前面提到过的queue的点对点模式,虽然我们配置了两个监听器,但是当消息被第一个监听器消费了之后,就不会再往下传了,就终止了。
接下来,我们来尝试下一对多的模式(topic):我们只需将配置文件中的
org.apache.activemq.command.ActiveMQQueue修改成org.apache.activemq.command.ActiveMQTopic,然后将jmsTemplate中的pubSubDomain改为true,就行了,同样我们启动项目,发送消息,我们会发现,两个监听器都同时接收到了消息。
在这里,我们用的是mq的监听模式,如果不想使用监听,我们可以配置一个接收器:
@Service
public class ConsumerService {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
public TextMessage receive(Destination destination) {
TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
if (tm != null) {
try {
System.out.println("Get Message: " + tm.getText() + " from Destination " + destination.toString());
} catch (JMSException e) {
e.printStackTrace();
}
}
return tm;
}
}
上一篇: 口腔溃疡反复发作须警惕
推荐阅读