详解Java消息队列-Spring整合ActiveMq
1、概述
首先和大家一起回顾一下java 消息服务,在我之前的博客《java消息队列-jms概述》中,我为大家分析了:
1.消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由java 实现。
2.优势:异步、可靠
3.消息模型:点对点,发布/订阅
4.jms中的对象
然后在另一篇博客《java消息队列-activemq实战》中,和大家一起从0到1的开启了一个activemq 的项目,在项目开发的过程中,我们对activemq有了一定的了解:
1.多种语言和协议编写客户端。语言: java, c, c++, c#, ruby, perl, python, php。应用协议: openwire,stomp rest,ws notification,xmpp,amqp
2.完全支持jms1.1和j2ee 1.4规范 (持久化,xa消息,事务)
3.对spring的支持,activemq可以很容易内嵌到使用spring的系统里面去,而且也支持spring2.0的特性
4.通过了常见j2ee服务器(如 geronimo,jboss 4, glassfish,weblogic)的测试,其中通过jca 1.5 resource adaptors的配置,可以让activemq可以自动的部署到任何兼容j2ee 1.4 商业服务器上
5.支持多种传送协议:in-vm,tcp,ssl,nio,udp,jgroups,jxta
6.支持通过jdbc和journal提供高速的消息持久化
7.从设计上保证了高性能的集群,客户端-服务器,点对点
8.支持ajax
9.支持与axis的整合
10.可以很容易得调用内嵌jms provider,进行测试
在接下来的这篇博客中,我会和大家一起来整合spring 和activemq,这篇博文,我们基于spring+jms+activemq+tomcat,实现了point-to-point的异步队列消息和pub/sub(发布/订阅)模型,简单实例,不包含任何业务。
2、目录结构
2.1 项目目录
ide选择了idea(建议大家使用),为了避免下载jar 的各种麻烦,底层使用maven搭建了一个项目,整合了spring 和activemq
2.2 pom.xml
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelversion>4.0.0</modelversion> <groupid>crawl-page</groupid> <artifactid>crawl-page</artifactid> <packaging>war</packaging> <version>1.0-snapshot</version> <name>crawl-page maven webapp</name> <url>http://maven.apache.org</url> <!-- 版本管理 --> <properties> <springframework>4.1.8.release</springframework> </properties> <dependencies> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>4.10</version> <scope>test</scope> </dependency> <!-- jsp相关 --> <dependency> <groupid>jstl</groupid> <artifactid>jstl</artifactid> <version>1.2</version> </dependency> <dependency> <groupid>javax.servlet</groupid> <artifactid>servlet-api</artifactid> <scope>provided</scope> <version>2.5</version> </dependency> <!-- spring --> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-core</artifactid> <version>${springframework}</version> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-context</artifactid> <version>${springframework}</version> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-tx</artifactid> <version>${springframework}</version> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-webmvc</artifactid> <version>${springframework}</version> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-jms</artifactid> <version>${springframework}</version> </dependency> <!-- xbean 如<amq:connectionfactory /> --> <dependency> <groupid>org.apache.xbean</groupid> <artifactid>xbean-spring</artifactid> <version>3.16</version> </dependency> <!-- activemq --> <dependency> <groupid>org.apache.activemq</groupid> <artifactid>activemq-core</artifactid> <version>5.7.0</version> </dependency> <dependency> <groupid>org.apache.activemq</groupid> <artifactid>activemq-pool</artifactid> <version>5.12.1</version> </dependency> <!-- 自用jar包,可以忽略--> <dependency> <groupid>commons-httpclient</groupid> <artifactid>commons-httpclient</artifactid> <version>3.1</version> </dependency> </dependencies> <build> <finalname>crawl-page</finalname> <plugins> <plugin> <groupid>org.apache.tomcat.maven</groupid> <artifactid>tomcat7-maven-plugin</artifactid> <configuration> <port>8080</port> <path>/</path> </configuration> </plugin> </plugins> </build> </project>
因为这里pom.xml 文件有点长,就不展开了。
我们可以看到其实依赖也就几个,1、spring 核心依赖 2、activemq core和pool(这里如果同学们选择导入jar,可以直接导入我们上一篇博客中说道的那个activemq-all 这个jar包)3、java servlet 相关依赖
这里面我们选择的activemq pool 的依赖版本会和之后的dtd 有关系,需要版本对应,所以同学们等下配置activemq 文件的时候,需要注意dtd 版本选择
2.3 web.xml
web.xml 也大同小异,指定spring 配置文件,springmvc 命名,编码格式
<?xml version="1.0" encoding="utf-8"?> <web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd" version="3.0"> <display-name>archetype created web application</display-name> <!-- 加载spring的配置文件,例如hibernate、jms等集成 --> <context-param> <param-name>contextconfiglocation</param-name> <param-value> classpath:applicationcontext*.xml; </param-value> </context-param> <listener> <listener-class>org.springframework.web.context.contextloaderlistener</listener-class> </listener> <servlet> <servlet-name>springmvc</servlet-name> <servlet-class>org.springframework.web.servlet.dispatcherservlet</servlet-class> <init-param> <param-name>contextconfiglocation</param-name> <param-value>classpath:spring-mvc.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>springmvc</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> <!-- 处理编码格式 --> <filter> <filter-name>characterencodingfilter</filter-name> <filter-class>org.springframework.web.filter.characterencodingfilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>utf-8</param-value> </init-param> <init-param> <param-name>forceencoding</param-name> <param-value>true</param-value> </init-param> </filter> <filter-mapping> <filter-name>characterencodingfilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> </web-app>
2.4 springmvc 和applicationcontext.xml
这里面的springmvc没什么特别,有需要的同学可以参考一下:
<?xml version="1.0" encoding="utf-8"?> <!-- 查找最新的schemalocation 访问 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"> <!-- 启用mvc注解 --> <mvc:annotation-driven /> <!-- 指定sping组件扫描的基本包路径 --> <context:component-scan base-package="com.jayce" > <!-- 这里只扫描controller,不可重复加载service --> <context:include-filter type="annotation" expression="org.springframework.stereotype.controller"/> </context:component-scan> <!-- jsp视图解析器--> <bean class="org.springframework.web.servlet.view.internalresourceviewresolver"> <property name="prefix" value="/web-inf/views/" /> <property name="suffix" value=".jsp" /> <!-- 定义其解析视图的order顺序为1 --> <property name="order" value="1" /> </bean> </beans>
applicationcontext.xml 主要使用来装载bean,我们项目中并没有什么特别的java bean,因此只用来指出包扫描路径:
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemalocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.14.3.xsd"> <bean class="org.springframework.beans.factory.annotation.autowiredannotationbeanpostprocessor"/> <!-- 配置扫描路径 --> <context:component-scan base-package="com.jayce"> <!-- 只扫描service,也可以添加repostory,但是要把controller排除在外,controller由spring-mvc.xml去加载 --> <context:exclude-filter type="annotation" expression="org.springframework.stereotype.controller"/> </context:component-scan> </beans>
2.5 applicationcontext-activemq.xml
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemalocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd" > <context:component-scan base-package="com.jayce" /> <mvc:annotation-driven /> <amq:connectionfactory id="amqconnectionfactory" brokerurl="tcp://192.168.148.128:61616" username="admin" password="admin" /> <!-- 配置jms连接工长 --> <bean id="connectionfactory" class="org.springframework.jms.connection.cachingconnectionfactory"> <constructor-arg ref="amqconnectionfactory" /> <property name="sessioncachesize" value="100" /> </bean> <!-- 定义消息队列(queue) --> <bean id="demoqueuedestination" class="org.apache.activemq.command.activemqqueue"> <!-- 设置消息队列的名字 --> <constructor-arg> <value>jaycekon</value> </constructor-arg> </bean> <!-- 配置jms模板(queue),spring提供的jms工具类,它发送、接收消息。 --> <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate"> <property name="connectionfactory" ref="connectionfactory" /> <property name="defaultdestination" ref="demoqueuedestination" /> <property name="receivetimeout" value="10000" /> <!-- true是topic,false是queue,默认是false,此处显示写出false --> <property name="pubsubdomain" value="false" /> </bean> <!-- 配置消息队列监听者(queue) --> <bean id="queuemessagelistener" class="com.jayce.filter.queuemessagelistener" /> <!-- 显示注入消息监听容器(queue),配置连接工厂,监听的目标是demoqueuedestination,监听器是上面定义的监听器 --> <bean id="queuelistenercontainer" class="org.springframework.jms.listener.defaultmessagelistenercontainer"> <property name="connectionfactory" ref="connectionfactory" /> <property name="destination" ref="demoqueuedestination" /> <property name="messagelistener" ref="queuemessagelistener" /> </bean> </beans>
这里和大家讲解一下这个配置文件,如果大家能够从上述配置文件中看懂,可以跳过。同学们也可以在activemq官网中的查看。
1、activemq 中的dtd,我们在声明相关配置之前,我们需要先导入activemq 中的dtd,不然spring 并不理解我们的标签是什么意思。
我们在pom.xml 文件中有配置了activemq 的版本依赖我们这里的版本,需要和依赖的版本一样,不然是找不到相关的dtd
2、amq:connectionfactory:很直白的一个配置项,用于配置我们链接工厂的地址和用户名密码,这里需要注意的是选择tcp连接而不是http连接
3、jmstemplate:比较重要的一个配置,这里指定了连接工厂,默认消息发送目的地,还有连接时长,发布消息的方式
3、项目结构
3.1 producerservice
package com.jayce.service; import org.springframework.jms.core.jmstemplate; import org.springframework.jms.core.messagecreator; import org.springframework.stereotype.service; import javax.annotation.resource; import javax.jms.destination; import javax.jms.jmsexception; import javax.jms.message; import javax.jms.session; /** * created by administrator on 2017/1/5. */ @service public class producerservice { @resource(name="jmstemplate") private jmstemplate jmstemplate; public void sendmessage(destination destination,final string msg){ system.out.println(thread.currentthread().getname()+" 向队列"+destination.tostring()+"发送消息---------------------->"+msg); jmstemplate.send(destination, new messagecreator() { public message createmessage(session session) throws jmsexception { return session.createtextmessage(msg); } }); } public void sendmessage(final string msg){ string destination = jmstemplate.getdefaultdestinationname(); system.out.println(thread.currentthread().getname()+" 向队列"+destination+"发送消息---------------------->"+msg); jmstemplate.send(new messagecreator() { public message createmessage(session session) throws jmsexception { return session.createtextmessage(msg); } }); } }
将消息生产者做成一个服务,当我们需要发送消息的时候,只需要调用producerservice实例中的sendmessage 方法就可以向默认目的发送一个消息。
这里提供了两个发送方式,一个是发送到默认的目的地,一个是根据目的地发送消息。
3.2 consumerservice
package com.jayce.service; import org.springframework.jms.core.jmstemplate; import org.springframework.stereotype.service; import javax.annotation.resource; import javax.jms.destination; import javax.jms.jmsexception; import javax.jms.textmessage; /** * created by administrator on 2017/1/5. */ @service public class consumerservice { @resource(name="jmstemplate") private jmstemplate jmstemplate; public textmessage receive(destination destination){ textmessage textmessage = (textmessage) jmstemplate.receive(destination); try{ system.out.println("从队列" + destination.tostring() + "收到了消息:\t" + textmessage.gettext()); } catch (jmsexception e) { e.printstacktrace(); } return textmessage; } }
因为我们项目中并没有什么业务,所以的话对消息的处理也就是打印输出。我们只需要调用jmstemplate中的 receive 方法,就可以从里面获取到一条消息。
再和我们上一篇博客对比一下,上一篇博客中,我们接受到信息之后需要手动确认事务,这样activemq中才会确定这条消息已经被正确读取了。而整合了spring之后,事务将由spring 来管理。
3.3 messagecontroller
package com.jayce.controller; import com.jayce.service.consumerservice; import com.jayce.service.producerservice; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.stereotype.controller; import org.springframework.web.bind.annotation.requestmapping; import org.springframework.web.bind.annotation.requestmethod; import org.springframework.web.bind.annotation.responsebody; import javax.annotation.resource; import javax.jms.destination; import javax.jms.textmessage; /** * created by administrator on 2017/1/5. */ @controller public class messagecontroller { private logger logger = loggerfactory.getlogger(messagecontroller.class); @resource(name = "demoqueuedestination") private destination destination; //队列消息生产者 @resource(name = "producerservice") private producerservice producer; //队列消息消费者 @resource(name = "consumerservice") private consumerservice consumer; @requestmapping(value = "/sendmessage", method = requestmethod.post) @responsebody public void send(string msg) { logger.info(thread.currentthread().getname()+"------------send to jms start"); producer.sendmessage(msg); logger.info(thread.currentthread().getname()+"------------send to jms end"); } @requestmapping(value= "/receivemessage",method = requestmethod.get) @responsebody public object receive(){ logger.info(thread.currentthread().getname()+"------------receive from jms start"); textmessage tm = consumer.receive(destination); logger.info(thread.currentthread().getname()+"------------receive from jms end"); return tm; } }
控制层里面需要注入我们的生产者和消费者(实际开发中,生产者和消费者肯定不会在同一个项目中的,不然就消息服务这个东西就没有意义了)。
现在服务层和控制层都好了,接下来我们就进行一个简单的测试
4、项目测试
4.1 启动activemq
先确定你的activemq服务已经开启。
4.2 启动项目
项目使用了tomcat 插件,避免了本地再下载tomcat的麻烦,有需要的同学可以使用一下。
<plugins> <plugin> <groupid>org.apache.tomcat.maven</groupid> <artifactid>tomcat7-maven-plugin</artifactid> <configuration> <port>8080</port> <path>/</path> </configuration> </plugin> </plugins>
4.3 发送消息
这里用了chrome 的一个插件postman 有兴趣的同学可以了解一下,在chrome 拓展程序中可以找到,避免了后端的同学去弄页面!
我们发送了一个post 请求之后,看一下服务器的效果:
我们可以看到,已经向队列发送了一条消息。我们看一下activemq现在的状态:
我们可以看到,一条消息已经成功发送到了activemq中。
4.4 接收消息
使用get请求访问服务器后台:
服务的输出:
activemq服务器状态:
我们可以看到,消费者已经消费了一条信息,并且没有断开与activemq之间的链接。
4.5 监听器
在实际项目中,我们很少会自己手动去获取消息,如果需要手动去获取消息,那就没有必要使用到activemq了,可以用一个redis 就足够了。
不能手动去获取消息,那么我们就可以选择使用一个监听器来监听是否有消息到达,这样子可以很快的完成对消息的处理。
4.5.1 applicationcontext-activemq.xml 配置
在上面的配置文件中,我们已经默认的添加了这段监听器的配置文件,如果同学们不想使用这个监听器,可以直接注释掉。
<!-- 配置消息队列监听者(queue) --> <bean id="queuemessagelistener" class="com.jayce.filter.queuemessagelistener" /> <!-- 显示注入消息监听容器(queue),配置连接工厂,监听的目标是demoqueuedestination,监听器是上面定义的监听器 --> <bean id="queuelistenercontainer" class="org.springframework.jms.listener.defaultmessagelistenercontainer"> <property name="connectionfactory" ref="connectionfactory" /> <property name="destination" ref="demoqueuedestination" /> <property name="messagelistener" ref="queuemessagelistener" /> </bean>
4.5.2 messagelistener
我们需要创建一个类实现messagelistener 接口:
package com.jayce.filter; import javax.jms.jmsexception; import javax.jms.message; import javax.jms.messagelistener; import javax.jms.textmessage; /** * created by administrator on 2017/1/5. */ public class queuemessagelistener implements messagelistener { public void onmessage(message message) { textmessage tm = (textmessage) message; try { system.out.println("queuemessagelistener监听到了文本消息:\t" + tm.gettext()); //do something ... } catch (jmsexception e) { e.printstacktrace(); } } }
实现接口的onmessage 方法,我们将需要的业务操作在里面解决,这样子,就完成了我们生产者-中间件-消费者,这样一个解耦的操作了。
4.5.3 测试
和上面一样,使用postman 发送post请求,我们可以看到控制台里面,消息马上就能打印出来:
再看看activemq服务器的状态:
我们可以看到,使用监听器的效果,和手动接收消息的效果是一样的。
这样子一整个项目下来,我们已经成功的整合了spring和activemq。
4.6 压力测试
这里其实也算不上什么压力测试,在配置pom.xml文件的时候,大家有看到一个 commons-httpclient 的依赖,接下来我们使用httpclient 不停的想服务器发送消息,看一下服务器解决消息的速度如何:
package com.jaycekon.test; import org.apache.commons.httpclient.httpclient; import org.apache.commons.httpclient.methods.postmethod; import org.junit.test; import java.io.ioexception; import java.util.concurrent.atomic.atomicinteger; /** * created by administrator on 2017/1/5. */ public class client { @test public void test() { httpclient httpclient = new httpclient(); new thread(new sender(httpclient)).start(); } } class sender implements runnable { public static atomicinteger count = new atomicinteger(0); httpclient httpclient; public sender(httpclient client) { httpclient = client; } public void run() { try { system.out.println(thread.currentthread().getname()+"---send message-"+count.getandincrement()); postmethod post = new postmethod("http://127.0.0.1:8080/sendmessage"); post.addparameter("msg", "hello world!"); httpclient.executemethod(post); system.out.println(thread.currentthread().getname()+"---send message success-"+count.getandincrement()); } catch (ioexception e) { e.printstacktrace(); } } }
这里面用了httpclient 来向服务器发送post 请求,然后计数输出,有兴趣的同学可以自己测试一下,可以多开几个线程,这里只开了一个线程。
5、项目源码:crawl-page_jb51.rar
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: Java多线程实例
推荐阅读
-
消息队列 RabbitMQ 与 Spring 整合使用的实例代码
-
浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)
-
spring整合JMS实现同步收发消息(基于ActiveMQ的实现)
-
详解spring boot整合JMS(ActiveMQ实现)
-
消息队列 RabbitMQ 与 Spring 整合使用的实例代码
-
Java中消息队列任务的平滑关闭详解
-
浅谈Java消息队列总结篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)
-
PHP使用ActiveMQ实现消息队列的方法详解
-
SpringBoot2.x系列教程63--SpringBoot整合消息队列之RabbitMQ详解
-
Java Spring Boot消息服务万字详解分析