欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springboot集成ActiveMQ

程序员文章站 2022-04-30 19:50:24
...

一、出现以下异常时,需要注意,receive获取消息时不可有返回值,否则循环报此异常

Execution of JMS message listener failed, and no ErrorHandler has been set.

二、如果使用pool,记得设置为false否则报错

#是否用Pooledconnectionfactory代替普通的ConnectionFactory
spring.activemq.pool.enabled=false

ActiveMQ页面列表说明:
Number Of Pending Messages:消息队列中待处理的消息
Number Of Consumers:消费者的数量
Messages Enqueued:累计进入过消息队列的总量
Messages Dequeued:累计消费过的消息总量

定时调度功能:
直接在发送消息类名之上加上@EnableScheduling,
发送消息的方法之上加上注解@Scheduled(fixedDelay = 5000),fixedDelay单位为毫秒

说明:
需要建立两个springboot项目
广播模式:群发
订阅模式:点对点

		<!--消息队列-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <!--消息队列连接池-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.0</version>
        </dependency>

发布方配置

server:
  #Tomcat端口
  port: 8099
spring:
  activemq:
    # MQ所在的服务器的地址
    broker-url: tcp://localhost:61616

发布方配置代码

import javax.jms.Queue;
import javax.jms.Topic;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BeanConfig {

    //定义存放消息的队列
    @Bean
    public Queue queue() {
        return new ActiveMQQueue("ActiveMQQueue");
    }

    //定义存放消息的队列
    @Bean
    public Topic topic() {
        return new ActiveMQTopic("ActiveMQTopic");
    }
}

发布方接口代码

import javax.jms.Queue;
import javax.jms.Topic;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class ProviderController {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;


    @RequestMapping("/sendqueue")
    public void sendqueue(String msg) {
        // 指定消息发送的目的地及内容
        System.out.println("成功发送订阅消息:" + msg);
        this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
    }

    @RequestMapping("/sendtopic")
    public void sendtopic(String msg) {
        // 指定消息发送的目的地及内容
        System.out.println("成功发送广播消息:" + msg);
        this.jmsMessagingTemplate.convertAndSend(this.topic, msg);
    }
}

消费方配置

server:
  #Tomcat端口
  port: 8077 
spring:
  activemq:
    # MQ所在的服务器的地址
    broker-url: tcp://localhost:61616
  #true开启广播模式,订阅模式无法获取消息
  #false开启订阅模式,广播模式无法获取消息
  jms:
    pub-sub-domain: false

消费方代码

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.TextMessage;


@Component
public class ConsumerService {

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;


    @JmsListener(destination = "ActiveMQQueue")// 使用JmsListener配置消费者监听的队列,destination是队列名称
//    @SendTo("SQueue")// SendTo 会将此方法返回的数据,发送到消息队列 ,否则不设置返回类型-void
    public void receviceQueueMsg(String msg) {
        System.out.println("成功接受订阅消息:" + msg);
    }

    @JmsListener(destination="ActiveMQTopic")
    public void receviceTopicMsg(TextMessage msg) {//注:此处方法不可有返回值
        String str = "";
        try {
            str = msg.getText();
            System.out.println("成功接收广播消息:"+ msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
相关标签: mq