欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Springboot整合Active消息队列

程序员文章站 2022-06-15 13:56:19
简单理解: active是apache公司旗下的一个消息总线,activemq是一个开源兼容java message service(jms) 面向消息的中件间. 是一个提供松耦合的应用程序架构....

       简单理解:

       active是apache公司旗下的一个消息总线,activemq是一个开源兼容java message service(jms) 面向消息的中件间. 是一个提供松耦合的应用程序架构.

       主要用来在服务与服务之间进行异步通信的。

一、搭建步骤
    1、相应jar包

<!-- 整合消息队列activemq -->
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-activemq</artifactid>
  </dependency>
  
 <!-- 如果配置线程池则加入 -->
  <dependency> 
   <groupid>org.apache.activemq</groupid> 
   <artifactid>activemq-pool</artifactid> 
  </dependency>

    2、application.properties文件

#整合jms测试,安装在别的机器,防火墙和端口号记得开放
spring.activemq.broker-url=tcp://47.96.44.110:61616

spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增加依赖
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

#集群配置(后续需要在配上)
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
#消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉)
# spring.jms.pub-sub-domain=true

   3、springboot主类

<!-- 主类需要多加一个@enablejms注解,不过貌似我没有加的时候,也能运行,为安全起见姑且加上 -->
@springbootapplication
@enablejms

4.5.......根据不同消息模式来写了。

二、点对点案例
   我在这里案例中创建了两个点对点队列,所以他会有两个queue对象,同样对应每个queue对象,都会有单一对应的消费者。

      1、springboot主类

@springbootapplication
@enablejms
public class main {

 public static void main(string[] args) {
  springapplication.run(main.class, args);
 }
 
 //新建一个的queue对象,交给sringboot管理,这个queue的名称叫"first.queue".
 @bean
 public queue queue(){
  return new activemqqueue("first.queue");
 }
}

      2.1、first.queue对应消费者

@component
public class firstconsumer {

 //名为"first.queue"消息队列的消费者,通过jmslistener进行监听有没有消息,有消息会立刻读取过来
 @jmslistener(destination="first.queue")
 public void receivequeue(string text){
  system.out.println("firstconsumer收到的报文为:"+text);
 }
}

       2.2、two.queue对应消费者(后面会创建)

@component
public class twoconsumer {

 //名为"two.queue"消息队列的消费者
 @jmslistener(destination="two.queue")
 public void receivequeue(string text){
  system.out.println("twoconsumer收到的报文为:"+text);
 }
}

      3、service类

/**
 * 功能描述:消息生产
 */
public interface producerservice {

 // 功能描述:指定消息队列,还有消息 
 public void sendmessage(destination destination, final string message);
 

 // 功能描述:使用默认消息队列, 发送消息
 public void sendmessage( final string message);

}

      4、serviceimpl实现类

/**
 * 功能描述:消息生产者实现类
 */
@service
public class producerserviceimpl implements producerservice{

 //这个队列就是springboot主类中bean的对象
 @autowired
 private queue queue;
 
 //用来发送消息到broker的对象,可以理解连接数据库的jdbc
 @autowired
 private jmsmessagingtemplate jmstemplate; 
 
 //发送消息,destination是发送到的队列,message是待发送的消息
 @override
 public void sendmessage(destination destination, string message) {  
  jmstemplate.convertandsend(destination, message); 
 }
 
 //发送消息,queue是发送到的队列,message是待发送的消息
 @override
 public void sendmessage(final string message) { 
  jmstemplate.convertandsend(this.queue, message); 
 }  
}

     5.queuecontroller类

/**
 * 功能描述:点对点消息队列控制层
 */
@restcontroller
@requestmapping("/api/v1")
public class queuecontroller {
 
 @autowired
 private producerservice producerservice;  

 // 这里后面调用的是springboot主类的quene队列
 @getmapping("first")
 public object common(string msg){
  producerservice.sendmessage(msg); 
  return "success";
 }  
 
 // 这个队列是新建的一个名为two.queue的点对点消息队列
 @getmapping("two")
 public object order(string msg){
  
  destination destination = new activemqqueue("two.queue");
  producerservice.sendmessage(destination, msg);
  
  return "success";
 }  
}

      6、案例演示:

Springboot整合Active消息队列

从演示效果可以得出以下结论:

     1:当springboot启动时候,就生成了这两个队列,而且他们都会有一个消费者

     2:当我通过页面访问的时候,就相当于生产者把消息放到队列中,一旦放进去就会被消费者监听到,就可以获取生产者放进去的值并在后台打印出

顺便对页面中四个单词进行解释:

   number of pending messages :待处理消息的数量。我们每次都会被监听处理掉,所以不存在待处理,如果存在就说这里面哪里出故障了,需要排查

   number of consumers : 消费者数量

   messages enqueued:    消息排列,这个只增不见,代表已经处理多少消息

   messages dequeued:    消息出队。

 三、发布/订阅者模式

 在上面点对点代码的基础上,添加发布/订阅相关代码

     1.appliaction.properties文件

#消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉)
spring.jms.pub-sub-domain=true

      2.springboot主类添加

//新建一个topic队列
 @bean
 public topic topic(){
  return new activemqtopic("video.topic");
 }

      3.添加多个消费者类

//这里定义了三个消费者
@component
public class topicsub {
 
 @jmslistener(destination="video.topic")
 public void receive1(string text){
  system.out.println("video.topic 消费者:receive1="+text);
 }
  
 @jmslistener(destination="video.topic")
 public void receive2(string text){
  system.out.println("video.topic 消费者:receive2="+text);
 }
  
 @jmslistener(destination="video.topic")
 public void receive3(string text){
  system.out.println("video.topic 消费者:receive3="+text);
 } 
}

      4.service类

 //功能描述:消息发布者
 public void publish(string msg);

     5.serviceimpl实现类

//=======发布订阅相关代码=========
 
  @autowired
  private topic topic;
    
   @override
  public void publish(string msg) {
   this.jmstemplate.convertandsend(this.topic, msg);
   
  }

       6.controller类

// 这个队列是新建的一个名为two.queue的点对点消息队列
  @getmapping("topic")
  public object topic(string msg){

   producerservice.publish(msg);
   
   return "success";
  }

      7.演示效果:

Springboot整合Active消息队列

    从演示效果总结如下:

     1:springboot启动的时候,在topics目录下,一共出现了5个消费者。first.queue一个消费者、two.queue一个消费者、video.topic三个消费者

     2:当我在控制台输入信息后,video.topic的三个消费者都会监听video.topic发布的消息,并在控制台打印。

四、如何让点对点和发布订阅同时有效

为什么这么说呢,因为当我向上面一样同时开启,会发现点对点模式已经失效了。

 效果演示

Springboot整合Active消息队列

从演示效果,可以得出如下结论:

     1:我们发现我们在页面输入..../two?msg=555消息后,后台并没有成功打印消息。再看active界面发现,这个queue对象,确实有一条待处理的消息,但是我们发现,它对应的消费者数量是为0.

     2:然而我们在打开topic页面发现,这里却存在一个消费者。

所以我个人理解是,当同时启动的时候,所产生的消费者默认都是topic消费者,没有queue消费者,所以它监听不到queue所待处理的消息。

当配置文件不加:spring.jms.pub-sub-domain=true  那么系统会默认支持quene(点对点模式),但一旦加上这段配置,系统又变成只支持发布订阅模式。

那如何同时都可以成功呢?

 思路如下:

第一步:还是需要去掉配置文件中的:

#消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉)
#spring.jms.pub-sub-domain=true

第二步:在发布订阅者的中消费者中指定独立的containerfactory

因为你去掉上面的配置,那么系统就默认是queue,所以@jmslistener如果不指定独立的containerfactory的话是只能消费queue消息

@jmslistener(destination="video.topic", containerfactory="jmslistenercontainertopic")
 public void receive1(string text){
  system.out.println("video.topic 消费者:receive1="+text);
 }
 
 
 @jmslistener(destination="video.topic", containerfactory="jmslistenercontainertopic")
 public void receive2(string text){
  system.out.println("video.topic 消费者:receive2="+text);
 }
 
 //第三步我不添加containerfactory="jmslistenercontainertopic"看等下是否会打印出
 @jmslistener(destination="video.topic")
 public void receive3(string text){
  system.out.println("video.topic 消费者:receive3="+text);
 }

第三步:定义独立的topic定义独立的jmslistenercontainer

在springboot主类中添加:

@bean
  public jmslistenercontainerfactory<?> jmslistenercontainertopic(connectionfactory activemqconnectionfactory) {
   defaultjmslistenercontainerfactory bean = new defaultjmslistenercontainerfactory();
   bean.setpubsubdomain(true);
   bean.setconnectionfactory(activemqconnectionfactory);
   return bean;
  }

效果:

Springboot整合Active消息队列

得出结论:

    1:点对点,和发布订阅都有用

    2:receive3没有指定独立的containerfactory一样没有打印出来。

源码
github地址:https://github.com/yudiandemingzi/springbootacitvemq