springboot集成ActiveMQ
程序员文章站
2022-04-30 19:50:24
...
一、出现以下异常时,需要注意,receive获取消息时不可有返回值,否则循环报此异常
Execution of JMS message listener failed, and no ErrorHandler has been set.
二、如果使用pool,记得设置为false否则报错
#是否用Pooledconnectionfactory代替普通的ConnectionFactory
spring.activemq.pool.enabled=false
ActiveMQ页面列表说明:
Number Of Pending Messages:消息队列中待处理的消息
Number Of Consumers:消费者的数量
Messages Enqueued:累计进入过消息队列的总量
Messages Dequeued:累计消费过的消息总量
定时调度功能:
直接在发送消息类名之上加上@EnableScheduling,
发送消息的方法之上加上注解@Scheduled(fixedDelay = 5000),fixedDelay单位为毫秒
说明:
需要建立两个springboot项目
广播模式:群发
订阅模式:点对点
<!--消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--消息队列连接池-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
发布方配置
server:
#Tomcat端口
port: 8099
spring:
activemq:
# MQ所在的服务器的地址
broker-url: tcp://localhost:61616
发布方配置代码
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BeanConfig {
//定义存放消息的队列
@Bean
public Queue queue() {
return new ActiveMQQueue("ActiveMQQueue");
}
//定义存放消息的队列
@Bean
public Topic topic() {
return new ActiveMQTopic("ActiveMQTopic");
}
}
发布方接口代码
import javax.jms.Queue;
import javax.jms.Topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProviderController {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@RequestMapping("/sendqueue")
public void sendqueue(String msg) {
// 指定消息发送的目的地及内容
System.out.println("成功发送订阅消息:" + msg);
this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
}
@RequestMapping("/sendtopic")
public void sendtopic(String msg) {
// 指定消息发送的目的地及内容
System.out.println("成功发送广播消息:" + msg);
this.jmsMessagingTemplate.convertAndSend(this.topic, msg);
}
}
消费方配置
server:
#Tomcat端口
port: 8077
spring:
activemq:
# MQ所在的服务器的地址
broker-url: tcp://localhost:61616
#true开启广播模式,订阅模式无法获取消息
#false开启订阅模式,广播模式无法获取消息
jms:
pub-sub-domain: false
消费方代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.TextMessage;
@Component
public class ConsumerService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@JmsListener(destination = "ActiveMQQueue")// 使用JmsListener配置消费者监听的队列,destination是队列名称
// @SendTo("SQueue")// SendTo 会将此方法返回的数据,发送到消息队列 ,否则不设置返回类型-void
public void receviceQueueMsg(String msg) {
System.out.println("成功接受订阅消息:" + msg);
}
@JmsListener(destination="ActiveMQTopic")
public void receviceTopicMsg(TextMessage msg) {//注:此处方法不可有返回值
String str = "";
try {
str = msg.getText();
System.out.println("成功接收广播消息:"+ msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}