hornetq 集成 spring
程序员文章站
2022-03-02 08:56:41
...
一、简介
HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的协议将被HornetQ支持。
• HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!
• HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的以来第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。
• HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
• HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
• HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中
二、软件下载
hornet 2.4.0.Final
下载地址:http://downloads.jboss.org/hornetq/hornetq-2.4.0.Final-bin.zip
三、目录结构
解压后,目录结构如下图:
bin:启动目录包括run.bat、run.sh、stop.bat和stop.sh四个文件,分别为windows和linux下的启动 和 停止 命令。
config:配置文件,直接启动时,使用的是config\stand-alone\non-clustered中的配置文件
examples:示例,很多例子可以用来学习
lib:相关的jar包
四、集成spring(不考虑集群)
我使用的spring为3.2.9
1、使用maven构建,在pom.xml文件中增加如下代码:
2、spring配置文件
3、修改hornetq配置
修改安装目录下config/stand-alone/non-clustered/hornetq-configuration.xml的配置文件:
代码中host改为本机可以访问到的ip,如果设置成127.0.0.1,其它机器将不能访问该服务器的消息队列
修改hornetq-jms.xml配置,在该配置文件中增加名称为testQueue的队列,增加如下代码,其它配置可以使用默认配置
4、启动hornetq
windows下直接运行run.bat,linux下运行run.sh
5、发送消息
6、接收消息
先写监听器,监听器需要配置在 spring配置文件的 jmsContainer中
五、异常处理
收到消息进行处理时,可能会出现异常,比如说数据存取失败等,需要对异常消息进行处理。
在这里我们实现spring-jms包中的SessionAwareMessageListener接口来进行消息监听,操作session的commit和rollback方法来确认和回滚消息,代码如下:
回滚后后的消息会进行几次重试(好像是5次),5次之后如果还是没有消费成功,就会进入死消息的队列,hornetq默认的死消息队列在hornetq-jms.xml配置文件中有默认配置,如下
之后可以在该队列中取出消息,进行分析处理。
使用java代码连接hornetq时,会创建session,创建时可以指定消息确认机制,非事务时为AUTO_ACKNOWLEDGE和CLIENT_ACKNOWLEDGE,前者为自动通知mq,后者为需要手动调用message.acknowledge()进行通知,事务时为SESSION_TRANSACTED,但是spring封装了该操作,默认是AUTO_ACKNOWLEDGE,我们需要改成事务的,所以在spring配置监听的时候加上一个属性,如下:
加入sessionTransacted为true,就可以设置事务的消息,从而解决处理时异常。
六、安全认证
上述虽然可以进行消息的发送和处理,但是使用的都是默认用户,存在一定的安全隐患,hornetq支持用户的定义和权限的分配,具体的安全配置可以参考hornetq的用户手册(http://hornetq.sourceforge.net/docs/hornetq-2.1.2.Final/user-manual/zh/html/index.html),这里只讲spring如何进行hornetq的安全认证。我们都知道jms的connectionfactory提供两个创建连接的方法,createConnection()和createConnection(username,password),前者是使用默认用户访问,后者是采用了安全策略,根据前面的配置可以发现,spring接收消息的时候使用的是spring-jms包的监听类DefaultMessageListenerContainer,翻看源码发现DefaultMessageListenerContainer类中的connectionFactory是调用createConnection()来创建连接的,不能加入用户名和密码,这样连接时就会报错。解决这个问题需要借助于spring-jms包中的另一个类UserCredentialsConnectionFactoryAdapter,看类名就知道是connectionFactory的用户认证代理类,在这里面可以设置用户名和密码。UserCredentialsConnectionFactoryAdapter继承了javax.jms.ConnectionFactory,所以我们可以使用这个代理类来包装原先的connectionFactory,主要代码如下:
改为这样配置后,就可以通过用户认证了。
七、其他说明
1、hornetq支持消息持久化,可以将消息持久化到硬盘,防止hornetq服务器down机后的消息丢失
2、如果消息已发送,但此时接收端出错,重启接收端服务,如果消息未被消费,依然可以继续处理消息
HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的协议将被HornetQ支持。
• HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!
• HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的以来第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。
• HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
• HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
• HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中
二、软件下载
hornet 2.4.0.Final
下载地址:http://downloads.jboss.org/hornetq/hornetq-2.4.0.Final-bin.zip
三、目录结构
解压后,目录结构如下图:
bin:启动目录包括run.bat、run.sh、stop.bat和stop.sh四个文件,分别为windows和linux下的启动 和 停止 命令。
config:配置文件,直接启动时,使用的是config\stand-alone\non-clustered中的配置文件
examples:示例,很多例子可以用来学习
lib:相关的jar包
四、集成spring(不考虑集群)
我使用的spring为3.2.9
1、使用maven构建,在pom.xml文件中增加如下代码:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.36.Final</version> </dependency> <dependency> <groupId>org.hornetq</groupId> <artifactId>hornetq-commons</artifactId> <version>2.4.7.Final</version> </dependency> <dependency> <groupId>org.hornetq</groupId> <artifactId>hornetq-core-client</artifactId> <version>2.4.7.Final</version> </dependency> <dependency> <groupId>org.hornetq</groupId> <artifactId>hornetq-jms-client</artifactId> <version>2.4.1.Final</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>3.2.9.RELEASE</version> </dependency>
2、spring配置文件
<!--配置jms模板,程序里使用 jmsTemplate来发送消息 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="pubSubDomain" value="true" /> </bean> <!—创建队列, testQueue为队列名称,在config\stand-alone\non-clustered\hornetq-jms.xml的文件中配置 --> <bean id="messageQueue" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createQueue"> <constructor-arg value="testQueue" /> </bean> <!—hornetq连接配置 host为队列所在服务器的ip,post为连接端口--> <bean id="transportConfiguration" class="org.hornetq.api.core.TransportConfiguration"> <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" /> <constructor-arg> <map key-type="java.lang.String" value-type="java.lang.Object"> <entry key="host" value="172.18.8.35"></entry> <entry key="port" value="5445"></entry> </map> </constructor-arg> </bean> <!—建立连接工厂 --> <bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createConnectionFactoryWithoutHA"> <constructor-arg type="org.hornetq.api.jms.JMSFactoryType" value="CF" /> <constructor-arg ref="transportConfiguration" /> </bean> <!—设置监听,用来接收消息。messageListener为消息监听,采用注解方式声明 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="messageQueue" /> <property name="messageListener" ref="messageTestListener" /> </bean>
3、修改hornetq配置
修改安装目录下config/stand-alone/non-clustered/hornetq-configuration.xml的配置文件:
<connectors> <connector name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFa ctory</factory-class> <param key="host" value="172.18.8.35"/> <param key="port" value="5445"/> </connector> </connectors> <acceptors> <acceptor name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> <param key="host" value="172.18.8.35"/> <param key="port" value="5445"/> </acceptor> </acceptors>
代码中host改为本机可以访问到的ip,如果设置成127.0.0.1,其它机器将不能访问该服务器的消息队列
修改hornetq-jms.xml配置,在该配置文件中增加名称为testQueue的队列,增加如下代码,其它配置可以使用默认配置
<queue name="testQueue"> <entry name="/queue/testQueue"/> </queue>
4、启动hornetq
windows下直接运行run.bat,linux下运行run.sh
5、发送消息
import javax.annotation.Resource; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; @Controller public class TestMQController { @Resource(name = "jmsTemplate") private JmsTemplate jmsTemplate; @Resource(name = "messageQueue") private Queue queue; @RequestMapping(value = "test") public String test() { jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage om = session.createTextMessage("测试消息"); return om; } }); return ""; } }
6、接收消息
先写监听器,监听器需要配置在 spring配置文件的 jmsContainer中
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; @Component public class MessageTestListener implements MessageListener{ @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println("MessageTestListener = " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
五、异常处理
收到消息进行处理时,可能会出现异常,比如说数据存取失败等,需要对异常消息进行处理。
在这里我们实现spring-jms包中的SessionAwareMessageListener接口来进行消息监听,操作session的commit和rollback方法来确认和回滚消息,代码如下:
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; @Component public class MessageTestListener implements SessionAwareMessageListener<Message>{ public void onMessage(Message message, Session session) throws JMSException { TextMessage textMessage = (TextMessage)message; try { System.out.println("MessageTestListener = " + textMessage.getText()); //提交消息,确认消息处理成功 session.commit(); } catch (JMSException e) { //消息回滚 session.rollback(); e.printStackTrace(); } } }
回滚后后的消息会进行几次重试(好像是5次),5次之后如果还是没有消费成功,就会进入死消息的队列,hornetq默认的死消息队列在hornetq-jms.xml配置文件中有默认配置,如下
<queue name="DLQ"> <entry name="/queue/DLQ"/> </queue>
之后可以在该队列中取出消息,进行分析处理。
使用java代码连接hornetq时,会创建session,创建时可以指定消息确认机制,非事务时为AUTO_ACKNOWLEDGE和CLIENT_ACKNOWLEDGE,前者为自动通知mq,后者为需要手动调用message.acknowledge()进行通知,事务时为SESSION_TRANSACTED,但是spring封装了该操作,默认是AUTO_ACKNOWLEDGE,我们需要改成事务的,所以在spring配置监听的时候加上一个属性,如下:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="messageQueue" /> <property name="messageListener" ref="messageTestListener" /> <property name="sessionTransacted" value="true" /> </bean>
加入sessionTransacted为true,就可以设置事务的消息,从而解决处理时异常。
六、安全认证
上述虽然可以进行消息的发送和处理,但是使用的都是默认用户,存在一定的安全隐患,hornetq支持用户的定义和权限的分配,具体的安全配置可以参考hornetq的用户手册(http://hornetq.sourceforge.net/docs/hornetq-2.1.2.Final/user-manual/zh/html/index.html),这里只讲spring如何进行hornetq的安全认证。我们都知道jms的connectionfactory提供两个创建连接的方法,createConnection()和createConnection(username,password),前者是使用默认用户访问,后者是采用了安全策略,根据前面的配置可以发现,spring接收消息的时候使用的是spring-jms包的监听类DefaultMessageListenerContainer,翻看源码发现DefaultMessageListenerContainer类中的connectionFactory是调用createConnection()来创建连接的,不能加入用户名和密码,这样连接时就会报错。解决这个问题需要借助于spring-jms包中的另一个类UserCredentialsConnectionFactoryAdapter,看类名就知道是connectionFactory的用户认证代理类,在这里面可以设置用户名和密码。UserCredentialsConnectionFactoryAdapter继承了javax.jms.ConnectionFactory,所以我们可以使用这个代理类来包装原先的connectionFactory,主要代码如下:
<!-- 建立连接工厂 (原先的工厂类)--> <bean id="connectionFactory" class="org.hornetq.api.jms.HornetQJMSClient" factory-method="createConnectionFactoryWithoutHA"> <constructor-arg type="org.hornetq.api.jms.JMSFactoryType" value="CF" /> <constructor-arg ref="transportConfiguration" /> </bean> <!-- 创建用户认证工厂类 --> <bean id="myConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter"> <!-- 这里的目标工厂,就是上面定义的工厂 --> <property name="targetConnectionFactory" ref="connectionFactory"/> <property name="username" value="mytest"/> <property name="password" value="mytest"/> </bean> <!-- 创建jms模板时,工厂类改为 myConnectionFactory --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="myConnectionFactory" /> <property name="pubSubDomain" value="true" /> </bean> <!-- 创建监听时,也该用新的工厂类 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConnectionFactory" /> <property name="destination" ref="messageQueue" /> <property name="messageListener" ref="messageTestListener" /> </bean>
改为这样配置后,就可以通过用户认证了。
七、其他说明
1、hornetq支持消息持久化,可以将消息持久化到硬盘,防止hornetq服务器down机后的消息丢失
2、如果消息已发送,但此时接收端出错,重启接收端服务,如果消息未被消费,依然可以继续处理消息
上一篇: 面向程序员的数据库访问性能优化
下一篇: 1024节快乐,我们永远不寂寞