欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ActiveMQ学习记录(三)

程序员文章站 2022-07-15 08:08:59
...

    继续学习ActiveMQ,这节就讲讲ActiveMQ和spring的结合了,直接开始吧。

    第一步:创建Maven工程,然后在pom中引入相关的配置(主要是dependencies):

<dependencies>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>4.1.6.RELEASE</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
      <version>2.4.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.3</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
</dependencies>

    第二步:创建一些文件,整个工程结构如图所示

            ActiveMQ学习记录(三)

    第三步:编写activemq.properties、spring-activemq.xml和spring-context.xml内容,内容注释代码里都有写

    activemq.properties:

#ActiveMQ Config
activemq.brolerURL=tcp://127.0.0.1:61616
activemq.username=admin
activemq.password=admin  
activemq.pool.maxConnections=10
activemq.queueName=myspringqueue  

    spring-context.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-2.5.xsd"
       default-autowire="byName">

    <!--读入配置文件activemq.properties-->
    <context:property-placeholder location="classpath:activemq.properties" />
    <!--开启注解配置-->
    <context:annotation-config />
    <!--扫描-->
    <context:component-scan base-package="com.activemq.*" />

    <import resource="classpath:spring-activemq.xml" />
</beans>

    spring-activemq.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-2.5.xsd"
       default-autowire="byName">

    <!--创建ActiveMQConnectionFactory-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${activemq.brolerURL}" />
        <property name="userName" value="${activemq.username}" />
        <property name="password" value="${activemq.password}" />
    </bean>

    <!--ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
        可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们资源消耗
    -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory" ref="targetConnectionFactory"/>
        <property name="maxConnections" value="${activemq.pool.maxConnections}" />
    </bean>

    <!--spring用于管理真正的ConnectionFactory-->
    <bean id="myconnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!--目标ConnectionbFactory对应真实的可以产生JMS connection的ConnectionFactory-->
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
    </bean>

    <!--目的地-->
    <bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>${activemq.queueName}</value>
        </constructor-arg>
    </bean>

    <!--spring提供jms工具类,他可以进行消息的发送、接收等-->
    <!--queue模式-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--这个connectionFactory对应的是真正的ConnectionFactory,即我们自己定义的spring提供的那个ConnectionFactory对象-->
        <property name="connectionFactory" ref="myconnectionFactory"/>
        <property name="defaultDestinationName" value="${activemq.queueName}" />
    </bean>

    <!--配置自定义监听:MessageListener-->
    <bean id="msgQueueMessageListener" class="com.activemq.Consumer"></bean>

    <!--将连接工厂、目标队列、自定义监听注入jms模板,其实就是配置消费者通过messageListener接受消息-->
    <bean id="sessionAeareLinstenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="myconnectionFactory"/>
        <property name="destination" ref="msgQueue"/>
        <property name="messageListener" ref="msgQueueMessageListener"/>
    </bean>
</beans>

    从ActiveMQConectionFactory到PooledConnectionFactory,到Spring提供的SingleConnectionFactory,就是一个适配的过程。

    配置connectionFactory是Spring用于创建到JMS服务器链接的,Spring提供了多种connectionFactory.具体常见如下:

    1.PooledConnectionFactory: JmsTemlate每次发送消息时只会缓存connection,session和productor,不会缓存consumer。因此只适合于生产者发送消息,这个只是在要求消息处理的及时性不是特别高的情况下。

    2.SingleConnectionFactory: 对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。
    3.CachingConnectionFactory: 继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。

    注意Spring的套路经常是这样的,提供XxxTemplate,比如HibernateTemplate,对于JMS,提供了JmsTemplate。
    生产者应该持有JmsTemplate进行发送消息。

    消费者,提供监听器、监听的目的地、连接工厂即可。

    第四步:编写Producer和Consumer

    Producer:

public class Producer {

    public void sendMessage(JmsTemplate jmsTemplate,final String info){
        jmsTemplate.send(new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(info);
            }
        });
    }
}

    Consumer:

public class Consumer implements SessionAwareMessageListener{

    @Override
    public void onMessage(Message message, Session session) throws JMSException {
        if(message instanceof TextMessage){
            System.out.println("consumer get msg :" + ((TextMessage) message).getText() );
        }
    }
}

    第五步:编写Test测试类进行测试。

    Test:

public class Test{
    public static void main(String[] args){
        ApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
        JmsTemplate jmsTemplate = (JmsTemplate)context.getBean("jmsTemplate");
        Producer producer = new Producer();
        producer.sendMessage(jmsTemplate,"producer send a message");
    }
}

    运行结果:

    ActiveMQ学习记录(三)

    这样就是一个简单的spring+ActiveMQ的结合了。

    下面在看一下JmsTemplate属性和其父类JmsDestinationAccessor

ActiveMQ学习记录(三)

ActiveMQ学习记录(三)

    可以看到这个参数pubSubDomain=false,说明默认是使用点对点(P2P)模式,如果将这个设置成true就变成了主题模式

    那接下来来尝试下发布订阅怎么实现的,其中知识已在注释在添加。

    第一步,添加一个ConsumerTwo,内容是:

public class Consumer implements SessionAwareMessageListener{

    @Override
    public void onMessage(Message message, Session session) throws JMSException {
        if(message instanceof TextMessage){
            System.out.println("consumer get msg :" + ((TextMessage) message).getText() );
        }
    }
}

    第二步:修改activemq.xml(修改内容较多,直接附上全部代码):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-2.5.xsd"
       default-autowire="byName">

    <!--创建ActiveMQConnectionFactory-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${activemq.brolerURL}" />
        <property name="userName" value="${activemq.username}" />
        <property name="password" value="${activemq.password}" />
        <!--异步发送-->
        <property name="useAsyncSend" value="true" />
    </bean>

    <!--ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
        可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们资源消耗
    -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory" ref="targetConnectionFactory"/>
        <property name="maxConnections" value="${activemq.pool.maxConnections}" />
    </bean>

    <!--spring用于管理真正的ConnectionFactory的ConnectionFactory
        1.SingleConnectionFactory:对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。
        2.CachingConnectionFactory:继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,
                                    同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。
                                    我们使用CachingConnectionFactory来作为示例。
    -->
    <!--producerConnectionFactory和consumerOneConnectionFactory区别是多设置了clientId,理由就不解释了,上一篇有讲-->
    <bean id="producerConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!--目标ConnectionbFactory对应真实的可以产生JMS connection的ConnectionFactory-->
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
        <!--Session缓存数量
            <property name="sessionCacheSize" value="10" />
        -->
    </bean>

    <!--目的地 topic模式-->
    <bean id="msgTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg>
            <!--订阅多个topic,则以英文逗号’,’隔开实现订阅多个topic。-->
            <value>myTestSpringTopic</value>
        </constructor-arg>
    </bean>

    <!--下面是生产者的配置-->

    <!--消息发送者客户端,spring提供jms工具类,他可以进行消息的发送、接收等-->
    <!--topic模式-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--这个connectionFactory对应的是我们自己定义的spring提供的那个ConnectionFactory对象-->
        <property name="connectionFactory" ref="producerConnectionFactory"/>
        <!--注意,和之前的P2P模式配置不一样,这里是defaultDestination,不是defaultDestinationName-->
        <property name="defaultDestination" ref="msgTopic" />
        <!--知识点一介绍了,默认是false,表示的是P2P模式,所以这里设置JmsTemplate这个属性为true,即Topic模式-->
        <property name="pubSubDomain" value="true"/>
        <!--其他属性配置-->
        <property name="receiveTimeout" value="10000" />

        <property name="explicitQosEnabled" value="true"/>

        <property name="deliveryMode" value="2" />


        <!--其他属性介绍:
                deliveryMode, priority, timeToLive 的开关要生效,必须配置为true,默认false
            <property name="explicitQosEnabled" value="true"/>

                DeliveryMode.NON_PERSISTENT=1:非持久化 ;
                DeliveryMode.PERSISTENT=2:持久化
            <property name="deliveryMode" value="2" />

                带事务的,那么配置(如果session带有事务,并且事务成功提交,则消息被自动签收。
                如果事务回滚,则消息会被再次传送。):
                javax.jms.Session.SESSION_TRANSACTED = 0
                不带事务,几种配置:(不带事务的session的签收方式,取决于session的配置)
                javax.jms.Session.AUTO_ACKNOWLEDGE = 1  消息自动签收
                javax.jms.Session.CLIENT_ACKNOWLEDGE = 2  客户端调用acknowledge方法手动签收
                javax.jms.Session.DUPS_OK_ACKNOWLEDGE = 3  不必必须签收
            <property name="sessionAcknowledgeMode" value="1" />
        -->
    </bean>


    <!--下面是消费者的配置-->

    <bean id="consumerOneConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!--目标ConnectionbFactory对应真实的可以产生JMS connection的ConnectionFactory-->
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
        <property name="clientId" value="consumer-one" />
        <!--Session缓存数量
            <property name="sessionCacheSize" value="10" />
        -->
    </bean>

    <bean id="consumerTwoConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!--目标ConnectionbFactory对应真实的可以产生JMS connection的ConnectionFactory-->
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
        <property name="clientId" value="consumer-two" />
        <!--Session缓存数量
            <property name="sessionCacheSize" value="10" />
        -->
    </bean>


    <!--配置自定义监听:MessageListener-->
    <bean id="msgTopicMessageListenerOne" class="com.activemq.Consumer"></bean>
    <bean id="msgTopicMessageListenerTwo" class="com.activemq.ConsumerTwo"></bean>

    <!--Consumer定义消息配置-->
    <bean id="sessionAeareLinstenerContainerOne" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumerOneConnectionFactory"/>
        <!--知识点一介绍了,默认是false,表示的是P2P模式,这里设置成true来开启订阅模式-->
        <property name="pubSubDomain" value="true"/>
        <property name="destination" ref="msgTopic"/>
        <!-- 消息持久化 -->
        <property name="subscriptionDurable" value="true"/>
        <!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
        <property name="clientId" value="consumer-one"/>
        <property name="messageListener" ref="msgTopicMessageListenerOne"/>

        <!-- 消息应答方式
            AUTO_ACKNOWLEDGE = 1 :自动确认
            CLIENT_ACKNOWLEDGE = 2:客户端手动确认
            DUPS_OK_ACKNOWLEDGE = 3: 自动批量确认
            SESSION_TRANSACTED = 0:事务提交并确认
            INDIVIDUAL_ACKNOWLEDGE = 4:单条消息确认
        -->
        <property name="sessionAcknowledgeMode" value="1"/>
        <!--
         如果我们要进行本地的事务管理,只需要在定义对应的消息监听容器时指定其sessionTransacted属性为true
         <property name="sessionTransacted" value="true"/>
        -->
    </bean>

    <!--ConsumerTwo定义消息配置-->
    <bean id="sessionAeareLinstenerContainerTwo" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumerTwoConnectionFactory"/>
        <!--知识点一介绍了,默认是false,表示的是P2P模式,这里设置成true来开启订阅模式-->
        <property name="pubSubDomain" value="true"/>
        <property name="destination" ref="msgTopic"/>
        <property name="clientId" value="consumer-two"/>
        <!-- 消息持久化 -->
        <property name="subscriptionDurable" value="true"/>
        <!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
        <property name="messageListener" ref="msgTopicMessageListenerTwo"/>

        <!-- 消息应答方式
            AUTO_ACKNOWLEDGE = 1 :自动确认
            CLIENT_ACKNOWLEDGE = 2:客户端手动确认
            DUPS_OK_ACKNOWLEDGE = 3: 自动批量确认
            SESSION_TRANSACTED = 0:事务提交并确认
            INDIVIDUAL_ACKNOWLEDGE = 4:单条消息确认
        -->
        <property name="sessionAcknowledgeMode" value="1"/>
        <!--
         如果我们要进行本地的事务管理,只需要在定义对应的消息监听容器时指定其sessionTransacted属性为true
         <property name="sessionTransacted" value="true"/>
        -->
    </bean>
</beans>

    这里面提几个注意的点:

    ①producerConnectionFactory中不能添加consumerOneConnectionFactory、consumerTwoConnectionFactory一样的属性clientId,理由在第二节提到了,因为clientId是唯一不能重复的。

    ②将之前queue改成Topic,即将ActiveMQQueue改成ActiveMQTopic,并对其构造方法设置属性。

    ③JmsTemplate其实相当于生产者的工具,因此connectionFactory是producerConnectionFactory。

   ④defaultDestination对应的是topic,在上面是defaultDestinationName,如果是defaultDestinationName,那就是myTestSpringTopic。

    ⑤Topic模式,必须要将pubSubDomain这个设置成true,上面有讲理由。

    ⑥如果不需要持久化的话,需要删掉上面consumerOneConnectionFactory、consumerTwoConnectionFactory里面的clientId属性以外,还需要删掉对应监听器sessionAeareLinstenerContainerTwo、sessionAeareLinstenerContainerOne里的clientId,并修改其subscriptionDurable值为false,不然会报错。

    第三步,运行Test,结果如下:

    ActiveMQ学习记录(三)

    OK,ActiveMQ+spring结合就到这里了。谢谢!


相关标签: MQ