欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot 2.x (13):整合ActiveMQ

程序员文章站 2022-06-24 11:13:33
ActiveMQ5.x不多做介绍了,主要是SpringBoot的整合 特点: 1)支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议 2)支持许多高级功能,如消息组,虚拟目标,通配符和复合目标 3) 完全支持JMS 1.1和J2EE 1.4,支持瞬 ......

activemq5.x不多做介绍了,主要是springboot的整合

特点:
1)支持来自java,c,c ++,c#,ruby,perl,python,php的各种跨语言客户端和协议
2)支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
3) 完全支持jms 1.1和j2ee 1.4,支持瞬态,持久,事务和xa消息
4) spring支持,activemq可以轻松嵌入到spring应用程序中,并使用spring的xml配置机制进行配置
5) 支持在流行的j2ee服务器(如tomee,geronimo,jboss,glassfish和weblogic)中进行测试
6) 使用jdbc和高性能日志支持非常快速的持久化

 

下载:

实际开发推荐部署到linux系统,具体操作网上也有教程

我这里为了方便,直接安装在本地windows机器上

 

如果想了解更多,查看官方文档:

 

进入bin目录win64目录启动activemq.bat即可

访问localhost:8161进入首页

访问http://localhost:8161/admin/进入管理页面,默认用户名和密码都是admin

 

整合:

依赖

        <dependency>
            <groupid>org.springframework.boot</groupid>
            <artifactid>spring-boot-starter-activemq</artifactid>
        </dependency>

连接池

        <dependency>
            <groupid>org.apache.activemq</groupid>
            <artifactid>activemq-pool</artifactid>
        </dependency>

基本的配置

# activemq
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

使用activemq必须要在springboot启动类中开启jms,并进行配置

package org.dreamtech.avtivemq;

import javax.jms.connectionfactory;

import org.apache.activemq.activemqconnectionfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.context.annotation.bean;
import org.springframework.core.env.environment;
import org.springframework.jms.annotation.enablejms;
import org.springframework.jms.core.jmsmessagingtemplate;
import org.springframework.jms.core.jmstemplate;

@springbootapplication
@enablejms
public class avtivemqapplication {

    public static void main(string[] args) {
        springapplication.run(avtivemqapplication.class, args);
    }

    @autowired
    private environment env;

    @bean
    public connectionfactory connectionfactory() {
        activemqconnectionfactory connectionfactory = new activemqconnectionfactory();
        connectionfactory.setbrokerurl(env.getproperty("spring.activemq.broker-url"));
        connectionfactory.setusername(env.getproperty("spring.activemq.user"));
        connectionfactory.setpassword(env.getproperty("spring.activemq.password"));
        return connectionfactory;
    }

    @bean
    public jmstemplate genjmstemplate() {
        return new jmstemplate(connectionfactory());

    }

    @bean
    public jmsmessagingtemplate jmsmessagetemplate() {
        return new jmsmessagingtemplate(connectionfactory());
    }
}

点对点模型:

首先实现消息的发送

package org.dreamtech.avtivemq.service;

import javax.jms.destination;

/**
 * 消息生产
 * 
 * @author xu yiqing
 *
 */
public interface producerservice {
    /**
     * 使用指定消息队列发送
     * 
     * @param destination
     * @param message
     */
    void sendmsg(destination destination, final string message);
}
package org.dreamtech.avtivemq.service.impl;

import javax.jms.destination;

import org.dreamtech.avtivemq.service.producerservice;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.jms.core.jmsmessagingtemplate;
import org.springframework.stereotype.service;

@service
public class producerserviceimpl implements producerservice {
    @autowired
    private jmsmessagingtemplate jmstemplate;

    @override
    public void sendmsg(destination destination, string message) {
        jmstemplate.convertandsend(destination, message);
    }

}
package org.dreamtech.avtivemq.controller;

import javax.jms.destination;

import org.apache.activemq.command.activemqqueue;
import org.dreamtech.avtivemq.service.producerservice;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.restcontroller;

@restcontroller
public class ordercontroller {
    @autowired
    private producerservice producerservice;
    
    @getmapping("/order")
    private object order(string msg) {
        destination destination = new activemqqueue("order.queue");
        producerservice.sendmsg(destination,msg);
        return "order";
    }
}

访问:http://localhost:8080/order?msg=demo,然后查看activemq界面:

SpringBoot 2.x (13):整合ActiveMQ

有生产者就就有消费者:监听消息队列

package org.dreamtech.avtivemq.jms;

import org.springframework.jms.annotation.jmslistener;
import org.springframework.stereotype.component;

@component
public class orderconsumer {
    /**
     * 监听指定消息队列
     * 
     * @param text
     */
    @jmslistener(destination = "order.queue")
    public void receivequeue(string text) {
        system.out.println("[ orderconsumer收到的报文 : " + text + " ]");
    }
}

由于实时监听,一启动springboot就会打印:

[ orderconsumer收到的报文 : demo ]

 

发布订阅模型:比如抖音小视频,某网红发布新视频,多名粉丝收到消息

默认activemq只支持点对点模型,想要开启发布订阅模型,需要进行配置

spring.jms.pub-sub-domain=true

spring管理主题对象

    @bean
    public topic topic() {
        return new activemqtopic("demo.topic");
    }

发布者

    /**
     * 消息发布者
     * 
     * @param msg
     */
    void publish(string msg);
    @autowired
    private jmsmessagingtemplate jmstemplate;
    @autowired
    private topic topic;

    @override
    public void publish(string msg) {
        jmstemplate.convertandsend(topic, msg);
    }
    @autowired
    private producerservice producerservice;
    @getmapping("/topic")
    private object topic(string msg) {
        producerservice.publish(msg);
        return "success";
    }

订阅者(消费者):一人发布,多人订阅

package org.dreamtech.avtivemq.jms;

import org.springframework.jms.annotation.jmslistener;
import org.springframework.stereotype.component;

@component
public class topicconsumer {
    @jmslistener(destination = "demo.topic")
    public void receiver1(string text) {
        system.out.println("topicconsumer : receiver1 : " + text);
    }

    @jmslistener(destination = "demo.topic")
    public void receiver2(string text) {
        system.out.println("topicconsumer : receiver2 : " + text);
    }

    @jmslistener(destination = "demo.topic")
    public void receiver3(string text) {
        system.out.println("topicconsumer : receiver3 : " + text);
    }
}

启动项目,访问:

http://localhost:8080/topic?msg=666

打印如下

topicconsumer : receiver1 : 666
topicconsumer : receiver3 : 666
topicconsumer : receiver2 : 666

 

那么点对点和发布订阅模型可以一起使用吗?

不可以

如何配置?

1.注释掉 #spring.jms.pub-sub-domain=true

2.加入bean:给topic定义独立的jmslistenercontainer

    @bean
    public jmslistenercontainerfactory<?> jmslistenercontainertopic(connectionfactory activemqconnectionfactory) {
        defaultjmslistenercontainerfactory bean = new defaultjmslistenercontainerfactory();
        bean.setpubsubdomain(true);
        bean.setconnectionfactory(activemqconnectionfactory);
        return bean;
    }

3.@jmslistener如果不指定独立的containerfactory的话是只能消费queue消息

    @jmslistener(destination = "demo.topic", containerfactory = "jmslistenercontainertopic")
    public void receiver1(string text) {
        system.out.println("topicconsumer : receiver1 : " + text);
    }