欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ActiveMQ笔记

程序员文章站 2024-02-16 09:35:10
...

概念理解

JMS(java消息服务)

是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS就是一个java平台的技术规范。

ActiveMQ

是Apache出品,最流行的,能力强劲的开源消息总线。是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

ActiveMQ安装

1,下载压缩包解压

下载地址:http://activemq.apache.org/

2,解压

tar -zxvf apache-activemq-5.6.0-bin.tar.gz

3,启动ActiveMQ

/liud/ActiveMQ/apache-activemq-5.6.0/bin 目录下运行

./activemq start

ActiveMQ笔记

查看监听的端口61616

netstat -tnlp

ActiveMQ笔记

web控制台地址
http://120.79.25.197:8161/admin

admin/admin

消息传输数据类型

JMS规范中的消息类型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五种

消息传输模式

1,点对点消息模式

特性

每个消息只有一个接收者。

消息的发送者和消息的接收者没有时间的依赖性。

当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息。

代码实现 

生产消息

spring配置activemq 连接工厂

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://120.79.25.197:61616" />
	</bean>

配置缓存连接工厂

<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
		<property name="targetConnectionFactory" ref="connectionFactory" />
		<!-- Session缓存数量 -->
		<property name="sessionCacheSize" value="10" />
	</bean>

设置发送消息的目的地队列

<bean id="queue1" class="org.apache.activemq.command.ActiveMQQueue">
		<!-- 设置消息队列的名字 -->
		<constructor-arg  value="queueMessage1" />
	</bean>

消息的生产者

<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
		<constructor-arg ref="cachingConnectionFactory" />
		<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
		<property name="pubSubDomain" value="false" />
	</bean>

创建一个服务类,使用jmsQueueTemplate发送文本消息

package com.service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
@SuppressWarnings("restriction")
@Service("QueueService")
public class QueueService {
	@Resource(name = "jmsQueueTemplate")
	private JmsTemplate jmsQueueTemplate;
	public void send(Destination destination, final Object message) {
		jmsQueueTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message.toString());
			}
		});
	}
}

创建测试类

package com.test;

import javax.annotation.Resource;

import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.service.QueueService;

@SuppressWarnings("restriction")
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:applicationContext-jms.xml" })
public class TestQueue {

	@Autowired
	private QueueService queueService;

	@Resource(name = "queue1")
	private ActiveMQQueue queue1;

	@Test
	public void TestQueue1() {
		System.out.println("begin");
		queueService.send(queue1, "nihao_detail2");
		System.out.println("end");
	}

}

运行测试类TestQueue1方法,activemq控制台中可以看到队列消息

ActiveMQ笔记

Number Of Consumers   消费者数量

Messages Enqueued  进入队列的消息数量,这个数据只增不减

Messages Dequeued 出队列的消息数量

接收消息

使用监听spring配置

<!-- 消息监听器 -->  
	<bean id="QueueMessageListener" class="com.listener.QueueMessageListener"/>
	
	<!-- 消息监听容器 -->  
	<bean id="QueueContainer"  
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
		<property name="connectionFactory" ref="cachingConnectionFactory" />  
		<property name="destination" ref="queue1" />  
		<property name="messageListener" ref="QueueMessageListener" />  
	</bean>

监听器类重写onMeassage方法

package com.listener;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
public class QueueMessageListener implements MessageListener {
	public void onMessage(Message message) {
		try {
			// 如果是文本消息
			if (message instanceof TextMessage) {
				TextMessage tm = (TextMessage) message;
				System.out.println("get textMessage:\t" + tm.getText());
			}

			// 如果是Map消息
			if (message instanceof MapMessage) {
				MapMessage mm = (MapMessage) message;
				System.out.println("get mapMessage:\t" + mm.getString("msgId"));
			}

			// 如果是Object消息
			if (message instanceof ObjectMessage) {
				ObjectMessage om = (ObjectMessage) message;
				String exampleUser = (String) om.getObject();
				System.out.println("get ObjectMessage:\t" + exampleUser.toString());
			}

			// 如果是bytes消息
			if (message instanceof BytesMessage) {
				byte[] b = new byte[1024];
				int len = -1;
				BytesMessage bm = (BytesMessage) message;
				while ((len = bm.readBytes(b)) != -1) {
					System.out.println(new String(b, 0, len));
				}
			}

			// 如果是Stream消息
			if (message instanceof StreamMessage) {
				StreamMessage sm = (StreamMessage) message;
				System.out.println(sm.readString());
				System.out.println(sm.readInt());
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

启动中间件后,消息队列中增加新消息后onMessage方法就会监听到。

也可以主动接收消息,代码如下:

@Resource(name = "jmsQueueTemplate")
	private JmsTemplate jmsQueueTemplate;
	@Test
	public void TestConsumerMessage() throws Exception {
		Message message = jmsQueueTemplate.receive(queue1);
		if (message instanceof TextMessage) {
			TextMessage tm = (TextMessage) message;
			System.out.println("get1 textMessage:\t" + tm.getText());
		}
	}

2,发布订阅消息模式

特点

一个消息可传递多个订阅者;

发布和订阅者有时间依赖性,只有当客户端创建订阅后才能接收消息,并且订阅者需一直保持活动状态;

代码实现

生产消息

定义消息的目的地

<!-- 发送消息的目的地(一个队列) -->
	<bean id="Topic1" class="org.apache.activemq.command.ActiveMQTopic">
		<!-- 设置消息队列的名字 -->
		<constructor-arg  value="TopicMessage1" />
	</bean>

使用spring生产消息模板

<!-- 定义JmsTemplate的Topic类型 -->
	<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
		<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
		<constructor-arg ref="cachingConnectionFactory" />
		<!-- pub/sub模型(发布/订阅) -->
		<property name="pubSubDomain" value="true" />
	</bean>

生产消息服务类

package com.service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

@SuppressWarnings("restriction")
@Service("TopicService")
public class TopicService {
	@Resource(name = "jmsTopicTemplate")
	private JmsTemplate jmsTopicTemplate;

	public void send(Destination destination, final Object message) {
		jmsTopicTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message.toString());
			}
		});
	}

}

测试类

package com.test;

import javax.annotation.Resource;

import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.service.TopicService;

@SuppressWarnings("restriction")
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:applicationContext-jmsProducer.xml" })
public class TestTopic {

	@Autowired
	private TopicService topicService;

	@Resource(name = "Topic1")
	private ActiveMQTopic TopicMessage;

	@Test
	public void TestTopic1() {
		System.out.println("begin");
		topicService.send(TopicMessage, "nihao_topicmessage2");
		System.out.println("end");

	}
	

}

接收消息

监听器类1

package com.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class TopicMessageListener1 implements MessageListener {

	public void onMessage(Message message) {
		TextMessage tm = (TextMessage) message;
		try {
			System.out.println("---------消息消费1---------");
			System.out.println("消息内容:\t" + tm.getText());
			System.out.println("消息ID:\t" + tm.getJMSMessageID());
			System.out.println("消息Destination:\t" + tm.getJMSDestination());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

监听器类2

package com.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class TopicMessageListener2 implements MessageListener {

	public void onMessage(Message message) {
		TextMessage tm = (TextMessage) message;
		try {
			System.out.println("---------消息消费2---------");
			System.out.println("消息内容:\t" + tm.getText());
			System.out.println("消息ID:\t" + tm.getJMSMessageID());
			System.out.println("消息Destination:\t" + tm.getJMSDestination());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}


sping配置文件中添加监听器

<!-- 消息监听器 -->
	<bean id="TopicMessageListener1" class="com.listener.TopicMessageListener1"/> 
	
	<bean id="TopicContainer1"  
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
		<property name="connectionFactory" ref="cachingConnectionFactory" />  
		<property name="destination" ref="Topic1" />  
		<property name="messageListener" ref="TopicMessageListener1" />  
	</bean>  
	
	<bean id="TopicMessageListener2" class="com.listener.TopicMessageListener2"/>
	
	<bean id="TopicContainer2" 
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
		<property name="connectionFactory" ref="cachingConnectionFactory" />  
		<property name="destination" ref="Topic1" />  
		<property name="messageListener" ref="TopicMessageListener2" />  
	</bean>