SpringBoot 2.x (13):整合ActiveMQ
activemq5.x不多做介绍了,主要是springboot的整合
特点:
1)支持来自java,c,c ++,c#,ruby,perl,python,php的各种跨语言客户端和协议
2)支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
3) 完全支持jms 1.1和j2ee 1.4,支持瞬态,持久,事务和xa消息
4) spring支持,activemq可以轻松嵌入到spring应用程序中,并使用spring的xml配置机制进行配置
5) 支持在流行的j2ee服务器(如tomee,geronimo,jboss,glassfish和weblogic)中进行测试
6) 使用jdbc和高性能日志支持非常快速的持久化
下载:
实际开发推荐部署到linux系统,具体操作网上也有教程
我这里为了方便,直接安装在本地windows机器上
如果想了解更多,查看官方文档:
进入bin目录win64目录启动activemq.bat即可
访问localhost:8161进入首页
访问http://localhost:8161/admin/进入管理页面,默认用户名和密码都是admin
整合:
依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-activemq</artifactid> </dependency>
连接池
<dependency> <groupid>org.apache.activemq</groupid> <artifactid>activemq-pool</artifactid> </dependency>
基本的配置
# activemq spring.activemq.broker-url=tcp://127.0.0.1:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=true spring.activemq.pool.max-connections=100
使用activemq必须要在springboot启动类中开启jms,并进行配置
package org.dreamtech.avtivemq; import javax.jms.connectionfactory; import org.apache.activemq.activemqconnectionfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.annotation.bean; import org.springframework.core.env.environment; import org.springframework.jms.annotation.enablejms; import org.springframework.jms.core.jmsmessagingtemplate; import org.springframework.jms.core.jmstemplate; @springbootapplication @enablejms public class avtivemqapplication { public static void main(string[] args) { springapplication.run(avtivemqapplication.class, args); } @autowired private environment env; @bean public connectionfactory connectionfactory() { activemqconnectionfactory connectionfactory = new activemqconnectionfactory(); connectionfactory.setbrokerurl(env.getproperty("spring.activemq.broker-url")); connectionfactory.setusername(env.getproperty("spring.activemq.user")); connectionfactory.setpassword(env.getproperty("spring.activemq.password")); return connectionfactory; } @bean public jmstemplate genjmstemplate() { return new jmstemplate(connectionfactory()); } @bean public jmsmessagingtemplate jmsmessagetemplate() { return new jmsmessagingtemplate(connectionfactory()); } }
点对点模型:
首先实现消息的发送
package org.dreamtech.avtivemq.service; import javax.jms.destination; /** * 消息生产 * * @author xu yiqing * */ public interface producerservice { /** * 使用指定消息队列发送 * * @param destination * @param message */ void sendmsg(destination destination, final string message); }
package org.dreamtech.avtivemq.service.impl; import javax.jms.destination; import org.dreamtech.avtivemq.service.producerservice; import org.springframework.beans.factory.annotation.autowired; import org.springframework.jms.core.jmsmessagingtemplate; import org.springframework.stereotype.service; @service public class producerserviceimpl implements producerservice { @autowired private jmsmessagingtemplate jmstemplate; @override public void sendmsg(destination destination, string message) { jmstemplate.convertandsend(destination, message); } }
package org.dreamtech.avtivemq.controller; import javax.jms.destination; import org.apache.activemq.command.activemqqueue; import org.dreamtech.avtivemq.service.producerservice; import org.springframework.beans.factory.annotation.autowired; import org.springframework.web.bind.annotation.getmapping; import org.springframework.web.bind.annotation.restcontroller; @restcontroller public class ordercontroller { @autowired private producerservice producerservice; @getmapping("/order") private object order(string msg) { destination destination = new activemqqueue("order.queue"); producerservice.sendmsg(destination,msg); return "order"; } }
访问:http://localhost:8080/order?msg=demo,然后查看activemq界面:
有生产者就就有消费者:监听消息队列
package org.dreamtech.avtivemq.jms; import org.springframework.jms.annotation.jmslistener; import org.springframework.stereotype.component; @component public class orderconsumer { /** * 监听指定消息队列 * * @param text */ @jmslistener(destination = "order.queue") public void receivequeue(string text) { system.out.println("[ orderconsumer收到的报文 : " + text + " ]"); } }
由于实时监听,一启动springboot就会打印:
[ orderconsumer收到的报文 : demo ]
发布订阅模型:比如抖音小视频,某网红发布新视频,多名粉丝收到消息
默认activemq只支持点对点模型,想要开启发布订阅模型,需要进行配置
spring.jms.pub-sub-domain=true
spring管理主题对象
@bean public topic topic() { return new activemqtopic("demo.topic"); }
发布者
/** * 消息发布者 * * @param msg */ void publish(string msg);
@autowired private jmsmessagingtemplate jmstemplate; @autowired private topic topic; @override public void publish(string msg) { jmstemplate.convertandsend(topic, msg); }
@autowired private producerservice producerservice; @getmapping("/topic") private object topic(string msg) { producerservice.publish(msg); return "success"; }
订阅者(消费者):一人发布,多人订阅
package org.dreamtech.avtivemq.jms; import org.springframework.jms.annotation.jmslistener; import org.springframework.stereotype.component; @component public class topicconsumer { @jmslistener(destination = "demo.topic") public void receiver1(string text) { system.out.println("topicconsumer : receiver1 : " + text); } @jmslistener(destination = "demo.topic") public void receiver2(string text) { system.out.println("topicconsumer : receiver2 : " + text); } @jmslistener(destination = "demo.topic") public void receiver3(string text) { system.out.println("topicconsumer : receiver3 : " + text); } }
启动项目,访问:
http://localhost:8080/topic?msg=666
打印如下
topicconsumer : receiver1 : 666 topicconsumer : receiver3 : 666 topicconsumer : receiver2 : 666
那么点对点和发布订阅模型可以一起使用吗?
不可以
如何配置?
1.注释掉 #spring.jms.pub-sub-domain=true
2.加入bean:给topic定义独立的jmslistenercontainer
@bean public jmslistenercontainerfactory<?> jmslistenercontainertopic(connectionfactory activemqconnectionfactory) { defaultjmslistenercontainerfactory bean = new defaultjmslistenercontainerfactory(); bean.setpubsubdomain(true); bean.setconnectionfactory(activemqconnectionfactory); return bean; }
3.@jmslistener如果不指定独立的containerfactory的话是只能消费queue消息
@jmslistener(destination = "demo.topic", containerfactory = "jmslistenercontainertopic") public void receiver1(string text) { system.out.println("topicconsumer : receiver1 : " + text); }
上一篇: 2019 WebRtc AudioMixer混音流程
下一篇: MySQL 几种调式分析利器
推荐阅读
-
SpringBoot 2.x 开发案例之 Shiro 整合 Redis
-
SpringBoot整合ActiveMQ,看这篇就够了
-
SpringBoot 2.x 整合Mybatis三:tk.mybatis
-
SpringBoot整合ActiveMQ
-
SpringBoot2.x系列二:整合第三方组件Mybatis、JPA、Redis、Elasticsearch、ActiveMQ、Kafka、Logback
-
SpringBoot2.0源码分析(二):整合ActiveMQ分析
-
SpringBoot 2.x (12):整合Elasticsearch
-
activemq整合springboot使用(个人微信小程序用)
-
SpringCloud 教程 | 第七篇: 服务消费者整合(Feign+Ribbon)设置超时时间和重试机制进行服务熔断降级(SpringBoot)(2.X版本)
-
SpringBoot 2.x (13):整合ActiveMQ