Spring Boot 配置多源的 RabbitMQ
简介
mq
是开发中很平常的中间件,本文讲述的是怎么在一个spring boot
项目中配置多源的rabbitmq
,这里不过多的讲解rabbitmq
的相关知识点。如果你也有遇到需要往多个rabbitmq
中发送消息的需求,希望本文可以帮助到你。
环境
- rabbitmq 3.7.12
- spring boot 2.1.6.release
当然软件的版本不是硬性要求,只是我使用的环境而已,唯一的要求是需要启动两个rabbitmq
,我这边是在kubernetes
集群中使用helm
官方提供的charts
包快速启动的两个rabbitmq-ha
高可用rabbitmq
集群。
想要了解 kubernetes
或者helm
,可以参看以下 github仓库:
- kubernetes :
- helm:
- charts:
springboot中配置两个rabbitmq源
在springboot 中配置单个rabbitmq是极其简单的,我们只需要使用springboot为我们自动装配的rabbitmq相关的配置就可以了。但是需要配置多个源时,第二个及其以上的就需要单独配置了,这里我使用的都是单独配置的。
代码:
/** * @author innerpeacez * @since 2019/3/11 */ @data public abstract class abstractrabbitconfiguration { protected string host; protected int port; protected string username; protected string password; protected connectionfactory connectionfactory() { cachingconnectionfactory connectionfactory = new cachingconnectionfactory(); connectionfactory.sethost(host); connectionfactory.setport(port); connectionfactory.setusername(username); connectionfactory.setpassword(password); return connectionfactory; } }
第一个源的配置代码
package com.zhw.study.springbootmultirabbitmq.config; import org.springframework.amqp.rabbit.config.simplerabbitlistenercontainerfactory; import org.springframework.amqp.rabbit.connection.connectionfactory; import org.springframework.amqp.rabbit.core.rabbitadmin; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.boot.autoconfigure.amqp.simplerabbitlistenercontainerfactoryconfigurer; import org.springframework.boot.context.properties.configurationproperties; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.context.annotation.primary; /** * @author innerpeacez * @since 2019/3/8 */ @configuration @configurationproperties("spring.rabbitmq.first") public class firstrabbitconfiguration extends abstractrabbitconfiguration { @bean(name = "firstconnectionfactory") @primary public connectionfactory firstconnectionfactory() { return super.connectionfactory(); } @bean(name = "firstrabbittemplate") @primary public rabbittemplate firstrabbittemplate(@qualifier("firstconnectionfactory") connectionfactory connectionfactory) { return new rabbittemplate(connectionfactory); } @bean(name = "firstfactory") public simplerabbitlistenercontainerfactory firstfactory(simplerabbitlistenercontainerfactoryconfigurer configurer, @qualifier("firstconnectionfactory") connectionfactory connectionfactory) { simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory(); configurer.configure(factory, connectionfactory); return factory; } @bean(value = "firstrabbitadmin") public rabbitadmin firstrabbitadmin(@qualifier("firstconnectionfactory") connectionfactory connectionfactory) { return new rabbitadmin(connectionfactory); } }
第二个源的配置代码
package com.zhw.study.springbootmultirabbitmq.config; import org.springframework.amqp.rabbit.config.simplerabbitlistenercontainerfactory; import org.springframework.amqp.rabbit.connection.connectionfactory; import org.springframework.amqp.rabbit.core.rabbitadmin; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.boot.autoconfigure.amqp.simplerabbitlistenercontainerfactoryconfigurer; import org.springframework.boot.context.properties.configurationproperties; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; /** * @author innerpeacez * @since 2019/3/8 */ @configuration @configurationproperties("spring.rabbitmq.second") public class secondrabbitconfiguration extends abstractrabbitconfiguration { @bean(name = "secondconnectionfactory") public connectionfactory secondconnectionfactory() { return super.connectionfactory(); } @bean(name = "secondrabbittemplate") public rabbittemplate secondrabbittemplate(@qualifier("secondconnectionfactory") connectionfactory connectionfactory) { return new rabbittemplate(connectionfactory); } @bean(name = "secondfactory") public simplerabbitlistenercontainerfactory secondfactory(simplerabbitlistenercontainerfactoryconfigurer configurer, @qualifier("secondconnectionfactory") connectionfactory connectionfactory) { simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory(); configurer.configure(factory, connectionfactory); return factory; } @bean(value = "secondrabbitadmin") public rabbitadmin secondrabbitadmin(@qualifier("secondconnectionfactory") connectionfactory connectionfactory) { return new rabbitadmin(connectionfactory); } }
配置信息
spring: application: name: multi-rabbitmq rabbitmq: first: host: 192.168.10.76 port: 30509 username: admin password: 123456 second: host: 192.168.10.76 port: 31938 username: admin password: 123456
测试
这样我们的两个rabbitmq源就配置好了,接下来我们进行测试使用,为了方便使用,我写了一个multirabbittemplate.class 方便我们使用不同的源。
/** * @author innerpeacez * @since 2019/3/8 */ @component public abstract class multirabbittemplate { @autowired @qualifier(value = "firstrabbittemplate") public amqptemplate firstrabbittemplate; @autowired @qualifier(value = "secondrabbittemplate") public amqptemplate secondrabbittemplate; }
第一个消息发送者类 testfirstsender.class
/** * @author innerpeacez * @since 2019/3/11 */ @component @slf4j public class testfirstsender extends multirabbittemplate implements messagesender { @override public void send(object msg) { log.info("rabbitmq1 , msg: {}", msg); firstrabbittemplate.convertandsend("rabbitmq1", msg); } public void rabbitmq1sender() { this.send("innerpeacez1"); } }
第二个消息发送者类 testsecondsender.class
/** * @author innerpeacez * @since 2019/3/11 */ @component @slf4j public class testsecondsender extends multirabbittemplate implements messagesender { @override public void send(object msg) { log.info("rabbitmq2 , msg: {}", msg); secondrabbittemplate.convertandsend("rabbitmq2", msg); } public void rabbitmq2sender() { this.send("innerpeacez2"); } }
动态创建queue的消费者
/** * @author innerpeacez * @since 2019/3/11 */ @slf4j @component public class testfirstconsumer implements messageconsumer { @override @rabbitlistener(bindings = @queuebinding(value = @queue("rabbitmq1") , exchange = @exchange("rabbitmq1") , key = "rabbitmq1") , containerfactory = "firstfactory") public void receive(object obj) { log.info("rabbitmq1 , {}", obj); } }
/** * @author innerpeacez * @since 2019/3/11 */ @slf4j @component public class testsecondconsumer implements messageconsumer { @override @rabbitlistener(bindings = @queuebinding(value = @queue("rabbitmq2") , exchange = @exchange("rabbitmq2") , key = "rabbitmq2") , containerfactory = "secondfactory") public void receive(object obj) { log.info("rabbitmq2 , {}", obj); } }
测试类
@runwith(springrunner.class) @springboottest @slf4j public class springbootmultirabbitmqapplicationtests extends multirabbittemplate { @autowired private testfirstsender firstsender; @autowired private testsecondsender secondsender; /** * 一百个线程向 first rabbitmq 的 rabbitmq1 queue中发送一百条消息 */ @test public void testfirstsender() { for (int i = 0; i < 100; i++) { new thread(() -> firstsender.rabbitmq1sender() ).start(); } try { thread.sleep(1000 * 10); } catch (interruptedexception e) { e.printstacktrace(); } } /** * 一百个线程向 second rabbitmq 的 rabbitmq2 queue中发送一百条消息 */ @test public void testsecondsender() { for (int i = 0; i < 100; i++) { new thread(() -> secondsender.rabbitmq2sender() ).start(); } try { thread.sleep(1000 * 10); } catch (interruptedexception e) { e.printstacktrace(); } } }
测试结果:
总结
这样配置好之后我们就可向两个rabbitmq中发送消息啦。这里只配置了两个源,当然如果你需要更多的源,仅仅只需要配置*rabbitconfiguration.class
就可以啦。本文没有多说关于rabbitmq的相关知识,如果未使用过需要自己了解一下相关知识。
源码:
- github:
- 个人blog:
日拱一卒,不期速成
上一篇: MySQL之information_schema数据库详细讲解
下一篇: 等我有钱了就立马娶你
推荐阅读
-
详解配置spring-boot-actuator时候遇到的一些小问题
-
spring boot的maven配置依赖详解
-
Spring Boot的properties配置文件读取
-
浅谈spring-boot-rabbitmq动态管理的方法
-
spring boot使用sharding jdbc的配置方式
-
Spring Boot自定义配置属性源(PropertySource)
-
Spring Boot 配置和使用多线程池的实现
-
spring boot 集成shiro的配置方法
-
Spring Boot中自动化配置的利弊以及解决方法
-
Spring boot中@Conditional和spring boot的自动配置实例详解