SpringBoot整合ActiveMQ
一、前言
相信看到这篇文章,大家也应该知道ActiveMQ是一个消息中间件。主要特点就是异步处理,用来减少响应时间和解耦。主要的使用场景就是将比较耗时而且不需要即时同步返回结果的操作作为消息放入消息队列。由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦合。
二、理解ActiveMQ
怎样理解ActiveMQ呢?ActiveMQ有两种消息传递模型,一种是点对点模型,一种是发布-订阅模型。我个人比较喜欢拿微信作为对象来代入进行理解。点对点模型,就好比我给你发送微信信息,一对一,我发送给你信息之后,你在忙的时候,就肯定没有理我(额,除非~这种关系应该不存在),等你闲下来的时候,再看我给你的信息。发布-订阅模型,就好比微信公众号,一对多,只要有人订阅了我的公众号,如果我发送了一条推文,那么凡是订阅了我公众号的人,就都会收到我发的这条推文。
好了,废话不多说,下面直接用SpringBoot来整合ActiveMQ。
三、队列整合
1、新建一个maven工程,引入SpringBoot和ActiveMQ依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.chen</groupId>
<artifactId>chen</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<name>Springboot_mq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、application.yml 核心配置
server:
port: 8086
spring:
activemq:
broker-url: tcp://192.168.203.129:61616
user: admin
password: admin
jms:
pub-sub-domain: false # false代表队列,true代表主题
queue: queue01 # 自定义命名队列
3、配置队列bean,相当于Spring容器的bean标签,注意@EnableJms注解,开启JMS的适配规则。
package com.chen.config;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.stereotype.Component;
@Component
@EnableJms // 开启JMS适配
public class ConfigBean {
@Value("${queue}")
private String myQueue; // 注入配置文件中的queue
@Bean
public ActiveMQQueue queue() {
return new ActiveMQQueue(myQueue);
}
}
4、生产者发送消息
package com.chen.produce;
import java.util.UUID;
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
// 调用一次一个信息发出
public void produceMessage() {
jmsMessagingTemplate.convertAndSend(queue, "这是一条消息");
}
}
5、消费者接收消息,我这里直接使用@JmsListener注解进行监听队列,底层是由MQ的监听器实现的,还有另外一种接收消息的方法,就是MQ的receive()方法。
package com.chen.consummer;
import javax.jms.TextMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Queue_consummer {
@JmsListener(destination = "${queue}") // 注解监听
public void receive(TextMessage textMessage) throws Exception{
System.out.println("消费者收到消息:"+textMessage.getText());
}
}
6、接下来利用Junit进行一个单元测试,直接运行单元方法,调用生产者进行发送消息
package com.chen;
import javax.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import com.chen.produce.Queue_Produce;
@SpringBootTest(classes = SpringbootMqApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class TestActiveMQ {
@Resource
private Queue_Produce queue_produce;
@Test
public void testSend() throws Exception {
queue_produce.produceMessage();
}
}
可以在ActiveMQ图形化界面看到,生产者生产了一条消息,被消费者消费了一条消息
7、增加定时投递功能,利用Spring自带的@Scheduled注解实现定时任务,每隔一段时间生产一个消息,注意使用这个注解,需要在主启动类开启这个功能,不然无效启用
package com.chen.produce;
import java.util.UUID;
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
// 调用一次一个信息发出
public void produceMessage() {
jmsMessagingTemplate.convertAndSend(queue, "这是一条消息");
}
// 带定时投递的业务方法
@Scheduled(fixedDelay = 3000) // 每3秒自动调用
public void produceMessageScheduled() {
jmsMessagingTemplate.convertAndSend(queue, "这是一条定时投递的消息,标记:" + UUID.randomUUID().toString().substring(0, 6));
System.out.println("投递完成");
}
}
package com.chen;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling // 开启对定时任务的支持
public class SpringbootMqApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootMqApplication.class, args);
}
}
四、整合主题
1、重新新建一个maven项目,依赖和第三整合队列一样,配置application.yml,注意需要将false改为true,代表使用的是主题
server:
port: 8088
spring:
activemq:
broker-url: tcp://192.168.203.129:61616
user: admin
password: admin
jms:
pub-sub-domain: true
topic: topic01
2、配置主题bean
package com.chen.config;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ConfigBean {
@Value("${topic}")
private String topicName;
@Bean
public Topic topic() {
return new ActiveMQTopic(topicName);
}
}
3、同样是生产者
package com.chen.produce;
import java.util.UUID;
import javax.jms.Topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class Topic_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay = 3000)
public void produceTopic() {
jmsMessagingTemplate.convertAndSend(topic, "这是一条主题消息,标记:" + UUID.randomUUID().toString().substring(0, 6));
}
}
4、消费者
package com.chen.consummer;
import javax.jms.TextMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Topic_Consummer {
@JmsListener(destination = "${topic}")
public void receive(TextMessage textMessage) throws Exception{
System.out.println("消费者接收订阅的主题消息:"+textMessage.getText());
}
}
5、因为我这里使用了定时任务,所以直接启用主启动类就行,会自动调用生产者每隔三秒生产一条消息
package com.chen;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class SpringbootMqTopicApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootMqTopicApplication.class, args);
}
}
本文地址:https://blog.csdn.net/qq_42109746/article/details/111938571