欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

消息中间件(2)-ActiveMq & Spring 技术集成

程序员文章站 2022-07-13 15:44:07
...
  • 一、为什么需要spring + activeMq 集成?

1、原生的JMS使用起来,太繁琐,需要封装很多层才能在正式代码中使用,

2、activemq一套开源的JMS实现方案,实现了服务端和客户端,开箱即用

3、spring一整套组件,直接拿来主义

 

  • 二、实现步骤

1、开启activemq服务端

直接从官网下载 http://activemq.apache.org/,开箱即用的东西,略过。

 

2、客户端调用所需要的关键jar包,使用的maven管理,其他包管理类似

 

<dependency>
	<groupId>org.springframework</groupId>
	<artifactId>spring-jms</artifactId>
	<version>4.0.6.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-core</artifactId>
	<version>5.7.0</version>
</dependency>
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-pool</artifactId>
	<version>5.7.0</version>
</dependency>

 

 

实际使用的时候,可以适当调整版本号

 

3、在JMS规范 中,使用JMS几个步骤,

   (1)、用JNDI 得到ConnectionFactory对象;

    (2)、用JNDI 得到目标队列或主题对象,即Destination对象;

    (3)、用ConnectionFactory创建Connection 对象;

    (4)、用Connection对象创建一个或多个JMS Session;

    (5)、用Session 和Destination 创建MessageProducer和MessageConsumer;

    (6)、通知Connection 开始传递消息。

而activemq使用的步骤也是大同小异

 

   3.1、建立ConnectionFactory以及spring高度封装的JmsTemplate

     

@Bean
public PooledConnectionFactory PooledConnectionFactory(){
	ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
	connectionFactory.setBrokerURL(MQ_URL);//配置MQ_URL,例如tcp://localhost:61616
	
	PooledConnectionFactory PooledConnectionFactory = new PooledConnectionFactory();
	PooledConnectionFactory.setConnectionFactory(connectionFactory);
	
	return PooledConnectionFactory;
}

@Bean
public JmsTemplate JmsTemplate(){
	JmsTemplate JmsTemplate = new JmsTemplate();
	JmsTemplate.setConnectionFactory(PooledConnectionFactory());
	
	return JmsTemplate;
}

  3.2 建立Queue或者Topic

 

 

@Bean(name="queue1")
public Destination queue1Destination(){
	Destination destination = new ActiveMQQueue(queue1);//队列 定义queue1名称即可
       //Destination destination = new ActiveMQTopic(queue1);//主题
	
	return destination;
}

   

 

   第3,4步直接省略,JmsTemplate已经做好了

   3.3 发送消息

   

@Component
public class MyMessageSender {
	@Autowired JmsTemplate jmsTemplate;
	@Resource(name="queue1") Destination destination;
	
	public void send(String msg){
		jmsTemplate.send(destination, new MessageCreator() {
			@Override
			public Message createMessage(Session session) throws JMSException {
				TextMessage message = session.createTextMessage();
				message.setText(msg);
				return message;
			}
		});
	}
}

  发送消息是如此的简单,制定队列名称,就可以直接发送消息到队列或者主题

 

  3.4消息监听,有了消息发送就得有Consumer处理消息

  

@Component
public class MyMessageListener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		System.out.println("==========Thread:"+Thread.currentThread().getId());
		System.out.println("==========MyMessageListener"+message.toString());
	}

}

   以及支持该消息监听的消息容器

    

@Autowired MyMessageListener messageListener;

@Bean(name="queue1ListenerContainer")
	public DefaultMessageListenerContainer DefaultMessageListenerContainer(){
		DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
		listenerContainer.setConnectionFactory(PooledConnectionFactory());
		listenerContainer.setDestination(queue1Destination());
		listenerContainer.setMessageListener(messageListener);
		listenerContainer.setConcurrentConsumers(2);//可以理解为起两个线程去消费消息
		return listenerContainer;
	}

 

至此主流程开发结束。