JMS之ActiveMQ(二):简单学习
程序员文章站
2022-07-15 08:58:37
...
参考上一章JMS对象模型图
添加依赖(因为我是在springboot下测试的,springboot内部集成了activemq的依赖)
<!--ActiveMQ支持-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
测试Queue和Topic两种目的类型,代码如下
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.jms.*;
import java.io.IOException;
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestActiveMq {
//发送消息方法Queue
@Test
public void testMQProducerQueue() throws JMSException {
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
//6、使用会话对象创建生产者对象
MessageProducer producer = session.createProducer(queue);
//7、使用会话对象创建一个消息对象
TextMessage textMessage = session.createTextMessage("hello!test-queue");
//8、发送消息
producer.send(textMessage);
//9、关闭资源
producer.close();
session.close();
connection.close();
}
//接受消息方法Queue
@Test
public void testMQConsumerQueue() throws JMSException, IOException {
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Queue queue = session.createQueue("test-queue");
//6、使用会话对象创建消费者对象
MessageConsumer consumer = session.createConsumer(queue);
//7、向consumer对象中设置一个messageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//8、程序等待接收用户消息
System.in.read();
//9、关闭资源
consumer.close();
session.close();
connection.close();
}
//发送消息方法Topic
@Test
public void TestMQProducerTopic() throws JMSException{
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Topic topic = session.createTopic("test-topic");
//6、使用会话对象创建生产者对象
MessageProducer producer = session.createProducer(topic);
//7、使用会话对象创建一个消息对象
TextMessage textMessage = session.createTextMessage("hello!test-topic");
//8、发送消息
producer.send(textMessage);
//9、关闭资源
producer.close();
session.close();
connection.close();
}
//接受消息方法Topic
@Test
public void TestMQConsumerTopic() throws JMSException, IOException {
//1、创建工厂连接对象,需要制定ip和端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2、使用连接工厂创建一个连接对象
Connection connection = connectionFactory.createConnection();
//3、开启连接
connection.start();
//4、使用连接对象创建会话(session)对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
Topic topic = session.createTopic("test-topic");
//6、使用会话对象创建消费者对象
MessageConsumer consumer = session.createConsumer(topic);
//7、向consumer对象中设置一个messageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
//8、程序等待接收用户消息
System.in.read();
//9、关闭资源
consumer.close();
session.close();
connection.close();
}
}
Queue测试:首先运行testMQProducerQueue方法发送消息,消息待处理
然后运行testMQConsumerQueue方法接收消息,消息已出队。
控制台打印消息。
Topic测试:先运行testMQProducerTopic发送消息,消息已经添加到消息队列中
然后运行testMQConsumerTopic接收消息
可以看到有了一个消费者,但是消息还未出队,这是因为正常情况下我们的topic消息不会在服务器持久化,所以要先打开消费者,再打开生产者,这个时候再运行生产者发送一条消息就看到消息已经出列了