欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springmvc+activeMQ

程序员文章站 2022-03-23 16:18:15
...

什么要使用ActiveMQ

  1. 多个项目之间集成 
    (1) 跨平台 
    (2) 多语言 
    (3) 多项目
  2. 降低系统间模块的耦合度,解耦 
    (1) 软件扩展性
  3. 系统前后端隔离 
    (1) 前后端隔离,屏蔽高安全区

什么是JMS:

    JMS 即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

ActiveMQ介绍

     MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
特点:
1、支持多种语言编写客户端
2、对spring的支持,很容易和spring整合
3、支持多种传输协议:TCP,SSL,NIO,UDP等
4、支持AJAX
消息形式:
1、点对点(queue)
2、一对多(topic)

安装ActiveMQ

    不想在本机安装,所以自己开了个虚拟机,在linux   centos7下面安装ActiveMQ。

1、下载ActiveMQ

    官网下载http://activemq.apache.org/download-archives.htmlapache-activemq-5.15.0-bin.tar.gz

    自己选择对应的版本,例如我自己下载的是apache-activemq-5.15.0-bin.tar.gz的5.15.0的linux的版本,上传到linux中,解压,然后进入  {解压文件}/bin/linux-x86-{64位/32位}    下面输入 ./activemq start启动,如果要关闭,输入  ./activemq stop就行了。启动成功后,在浏览器输入   http://{ip}:8161/admin  , 默认账号:admin,密码:admin,  会出现以下界面:

springmvc+activeMQ

ActiveMQ+SpringMVC

首先创建一个简单的springmvc项目,这里就不展示了,接着添加activeMQ的配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans     
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
        >
    
    <context:component-scan base-package="com.unionpay.demo.activemq" />
    <mvc:annotation-driven />

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->   
    <amq:connectionFactory id="amqConnectionFactory" 
        brokerURL="tcp://192.168.10.12:61616"  <!--正确的链接地址-->
        userName="admin" 
        password="admin" />

    <!-- 配置JMS连接工长,这种为不适用pool链接形式 -->
    <!-- <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean> -->
  
        
    <!-- 使用pool进行链接 -->  
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    	<property name="connectionFactory" ref="amqConnectionFactory"/>
    	<property name="maxConnections" value="10"/>
    </bean>
    
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    	<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
    </bean>
         
    
    <!-- 定义消息队列(Queue) -->
    <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息队列的名字 -->
        <constructor-arg>
            <value>unionpay.demo</value>
        </constructor-arg>
    </bean>
    
    <!-- 定义消息监听(Queue) -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="client">
    	<jms:listener destination="unionpay.demo" ref="mqListen" />
    	<jms:listener destination="unionpay.demo" ref="mqListen2" />
    </jms:listener-container>
    
    <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="demoQueueDestination" />
        <property name="receiveTimeout" value="10000" />
        <!-- true是topic,false是queue,默认是false,此处显示写出false -->
        <property name="pubSubDomain" value="false" />
    </bean>
    
    <bean id="producerService" class="com.unionpay.demo.activemq.producer.ProducerService">
    </bean>
    
    <bean id="mqListen" class="com.unionpay.demo.activemq.consumer.MqListen">
    </bean>
    
    <bean id="mqListen2" class="com.unionpay.demo.activemq.consumer.MqListen2">
    </bean>
	
 </beans>

添加依赖:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${springframework}</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.0</version>
</dependency>

我们来构建一个消息的发送者:

@Service
public class ProducerService {

	@Resource(name="jmsTemplate")
	private JmsTemplate jmsTemplate;
	
	
	public void sendMessage(Destination destination,final String msg){
		System.out.println("Send " + msg + " to Destination " + destination.toString());
		
		MessageCreator messageCreator = new MessageCreator(){

			public Message createMessage(Session session) throws JMSException {

				return session.createTextMessage(msg);
			}
			
		};
		
		jmsTemplate.send(destination, messageCreator);
	}
}

我们可以自己构建一个简单的controller,页面这里就不展示了:

@Controller
public class DemoController {

	@Resource(name = "demoQueueDestination")
	private Destination demoQueueDestination;

	@Resource(name = "producerService")
	private ProducerService producer;


	@RequestMapping(value = "/onsend", method = RequestMethod.POST)
	public ModelAndView producer(@RequestParam("message") String message) {
		System.out.println("---------send to jms");
		ModelAndView mv = new ModelAndView();
		Random ran = new Random();
		int num = ran.nextInt(100);
                producer.sendMessage(demoQueueDestination, "消息序号:"+num);
                System.out.println("---------成功发送消息----消息序号:"+num); 
		return mv;
	}

创建我们在配置文件中定义的两个监听类:

public class MqListen implements MessageListener {

	public void onMessage(Message message) {
		System.out.println("开始监听============");
		TextMessage tm = (TextMessage) message;
		try {
			System.out.println("成功监听消息="+tm.getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}

}

public class MqListen2 implements MessageListener {

	public void onMessage(Message message) {
		System.out.println("开始监听2============");
		TextMessage tm = (TextMessage) message;
		try {
			System.out.println("成功监听消息2="+tm.getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}

}

我们启动项目,运行发送消息,我们会发现,只有一个监听器监听到了,另一个没有反应。这里就是我们前面提到过的queue的点对点模式,虽然我们配置了两个监听器,但是当消息被第一个监听器消费了之后,就不会再往下传了,就终止了。

接下来,我们来尝试下一对多的模式(topic):我们只需将配置文件中的

org.apache.activemq.command.ActiveMQQueue修改成org.apache.activemq.command.ActiveMQTopic,然后将jmsTemplate中的pubSubDomain改为true,就行了,同样我们启动项目,发送消息,我们会发现,两个监听器都同时接收到了消息。

在这里,我们用的是mq的监听模式,如果不想使用监听,我们可以配置一个接收器:

@Service
public class ConsumerService {

	@Resource(name = "jmsTemplate")
	private JmsTemplate jmsTemplate;

      
	public TextMessage receive(Destination destination) {
		TextMessage tm = (TextMessage) jmsTemplate.receive(destination);
		if (tm != null) {
			try {
				System.out.println("Get Message: " + tm.getText() + " from Destination " + destination.toString());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
		return tm;
	}

}

 

相关标签: MQ