ActiveMQ笔记
概念理解
JMS(java消息服务)
是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。JMS就是一个java平台的技术规范。
ActiveMQ
是Apache出品,最流行的,能力强劲的开源消息总线。是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
ActiveMQ安装
1,下载压缩包解压
下载地址:http://activemq.apache.org/
2,解压
tar -zxvf apache-activemq-5.6.0-bin.tar.gz
3,启动ActiveMQ
/liud/ActiveMQ/apache-activemq-5.6.0/bin 目录下运行
./activemq start
查看监听的端口61616
netstat -tnlp
web控制台地址
http://120.79.25.197:8161/admin
admin/admin
消息传输数据类型
JMS规范中的消息类型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五种
消息传输模式
1,点对点消息模式
特性
每个消息只有一个接收者。
消息的发送者和消息的接收者没有时间的依赖性。
当消息发送者发送消息的时候,无论接收者程序在不在运行,都能获取到消息。
代码实现
生产消息
spring配置activemq 连接工厂
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://120.79.25.197:61616" />
</bean>
配置缓存连接工厂
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory" />
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="10" />
</bean>
设置发送消息的目的地队列
<bean id="queue1" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 设置消息队列的名字 -->
<constructor-arg value="queueMessage1" />
</bean>
消息的生产者
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="cachingConnectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
创建一个服务类,使用jmsQueueTemplate发送文本消息
package com.service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
@SuppressWarnings("restriction")
@Service("QueueService")
public class QueueService {
@Resource(name = "jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;
public void send(Destination destination, final Object message) {
jmsQueueTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.toString());
}
});
}
}
创建测试类
package com.test;
import javax.annotation.Resource;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.service.QueueService;
@SuppressWarnings("restriction")
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:applicationContext-jms.xml" })
public class TestQueue {
@Autowired
private QueueService queueService;
@Resource(name = "queue1")
private ActiveMQQueue queue1;
@Test
public void TestQueue1() {
System.out.println("begin");
queueService.send(queue1, "nihao_detail2");
System.out.println("end");
}
}
运行测试类TestQueue1方法,activemq控制台中可以看到队列消息
Number Of Consumers 消费者数量
Messages Enqueued 进入队列的消息数量,这个数据只增不减
Messages Dequeued 出队列的消息数量
接收消息
使用监听spring配置
<!-- 消息监听器 -->
<bean id="QueueMessageListener" class="com.listener.QueueMessageListener"/>
<!-- 消息监听容器 -->
<bean id="QueueContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="destination" ref="queue1" />
<property name="messageListener" ref="QueueMessageListener" />
</bean>
监听器类重写onMeassage方法
package com.listener;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
public class QueueMessageListener implements MessageListener {
public void onMessage(Message message) {
try {
// 如果是文本消息
if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
System.out.println("get textMessage:\t" + tm.getText());
}
// 如果是Map消息
if (message instanceof MapMessage) {
MapMessage mm = (MapMessage) message;
System.out.println("get mapMessage:\t" + mm.getString("msgId"));
}
// 如果是Object消息
if (message instanceof ObjectMessage) {
ObjectMessage om = (ObjectMessage) message;
String exampleUser = (String) om.getObject();
System.out.println("get ObjectMessage:\t" + exampleUser.toString());
}
// 如果是bytes消息
if (message instanceof BytesMessage) {
byte[] b = new byte[1024];
int len = -1;
BytesMessage bm = (BytesMessage) message;
while ((len = bm.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
}
// 如果是Stream消息
if (message instanceof StreamMessage) {
StreamMessage sm = (StreamMessage) message;
System.out.println(sm.readString());
System.out.println(sm.readInt());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
启动中间件后,消息队列中增加新消息后onMessage方法就会监听到。
也可以主动接收消息,代码如下:
@Resource(name = "jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;
@Test
public void TestConsumerMessage() throws Exception {
Message message = jmsQueueTemplate.receive(queue1);
if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
System.out.println("get1 textMessage:\t" + tm.getText());
}
}
2,发布订阅消息模式
特点
一个消息可传递多个订阅者;
发布和订阅者有时间依赖性,只有当客户端创建订阅后才能接收消息,并且订阅者需一直保持活动状态;
代码实现
定义消息的目的地
<!-- 发送消息的目的地(一个队列) -->
<bean id="Topic1" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 设置消息队列的名字 -->
<constructor-arg value="TopicMessage1" />
</bean>
使用spring生产消息模板
<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="cachingConnectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
</bean>
生产消息服务类
package com.service;
import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
@SuppressWarnings("restriction")
@Service("TopicService")
public class TopicService {
@Resource(name = "jmsTopicTemplate")
private JmsTemplate jmsTopicTemplate;
public void send(Destination destination, final Object message) {
jmsTopicTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.toString());
}
});
}
}
测试类
package com.test;
import javax.annotation.Resource;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.service.TopicService;
@SuppressWarnings("restriction")
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "classpath:applicationContext.xml", "classpath:applicationContext-jmsProducer.xml" })
public class TestTopic {
@Autowired
private TopicService topicService;
@Resource(name = "Topic1")
private ActiveMQTopic TopicMessage;
@Test
public void TestTopic1() {
System.out.println("begin");
topicService.send(TopicMessage, "nihao_topicmessage2");
System.out.println("end");
}
}
接收消息
监听器类1
package com.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class TopicMessageListener1 implements MessageListener {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("---------消息消费1---------");
System.out.println("消息内容:\t" + tm.getText());
System.out.println("消息ID:\t" + tm.getJMSMessageID());
System.out.println("消息Destination:\t" + tm.getJMSDestination());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
监听器类2
package com.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class TopicMessageListener2 implements MessageListener {
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("---------消息消费2---------");
System.out.println("消息内容:\t" + tm.getText());
System.out.println("消息ID:\t" + tm.getJMSMessageID());
System.out.println("消息Destination:\t" + tm.getJMSDestination());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
sping配置文件中添加监听器
<!-- 消息监听器 -->
<bean id="TopicMessageListener1" class="com.listener.TopicMessageListener1"/>
<bean id="TopicContainer1"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="destination" ref="Topic1" />
<property name="messageListener" ref="TopicMessageListener1" />
</bean>
<bean id="TopicMessageListener2" class="com.listener.TopicMessageListener2"/>
<bean id="TopicContainer2"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="destination" ref="Topic1" />
<property name="messageListener" ref="TopicMessageListener2" />
</bean>
下一篇: 多线程的实现方法