Spring Boot 入门之消息中间件的使用
一、前言
在消息中间件中有 2 个重要的概念:消息代理和目的地。当消息发送者发送消息后,消息就被消息代理接管,消息代理保证消息传递到指定目的地。
我们常用的消息代理有 jms 和 amqp 规范。对应地,它们常见的实现分别是 activemq 和 rabbitmq。
二、整合 activemq
2.1 添加依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-activemq</artifactid> </dependency> <!-- 如果需要配置连接池,添加如下依赖 --> <dependency> <groupid>org.apache.activemq</groupid> <artifactid>activemq-pool</artifactid> </dependency>
2.2 添加配置
# activemq 配置 spring.activemq.broker-url=tcp://192.168.2.12:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=false spring.activemq.pool.max-connections=50 # 使用发布/订阅模式时,下边配置需要设置成 true spring.jms.pub-sub-domain=false
此处 spring.activemq.pool.enabled=false,表示关闭连接池。
2.3 编码
配置类:
@configuration public class jmsconfirguration { public static final string queue_name = "activemq_queue"; public static final string topic_name = "activemq_topic"; @bean public queue queue() { return new activemqqueue(queue_name); } @bean public topic topic() { return new activemqtopic(topic_name); } }
负责创建队列和主题。
消息生产者:
@component public class jmssender { @autowired private queue queue; @autowired private topic topic; @autowired private jmsmessagingtemplate jmstemplate; public void sendbyqueue(string message) { this.jmstemplate.convertandsend(queue, message); } public void sendbytopic(string message) { this.jmstemplate.convertandsend(topic, message); } }
消息消费者:
@component public class jmsreceiver { @jmslistener(destination = jmsconfirguration.queue_name) public void receivebyqueue(string message) { system.out.println("接收队列消息:" + message); } @jmslistener(destination = jmsconfirguration.topic_name) public void receivebytopic(string message) { system.out.println("接收主题消息:" + message); } }
消息消费者使用 @jmslistener 注解监听消息。
2.4 测试
@runwith(springrunner.class) @springboottest public class jmstest { @autowired private jmssender sender; @test public void testsendbyqueue() { for (int i = 1; i < 6; i++) { this.sender.sendbyqueue("hello activemq queue " + i); } } @test public void testsendbytopic() { for (int i = 1; i < 6; i++) { this.sender.sendbytopic("hello activemq topic " + i); } } }
打印结果:
接收队列消息:hello activemq queue 1
接收队列消息:hello activemq queue 2
接收队列消息:hello activemq queue 3
接收队列消息:hello activemq queue 4
接收队列消息:hello activemq queue 5
测试发布/订阅模式时,设置 spring.jms.pub-sub-domain=true
接收主题消息:hello activemq topic 1
接收主题消息:hello activemq topic 2
接收主题消息:hello activemq topic 3
接收主题消息:hello activemq topic 4
接收主题消息:hello activemq topic 5
三、整合 rabbitmq
3.1 添加依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency>
3.2 添加配置
spring.rabbitmq.host=192.168.2.30 spring.rabbitmq.port=5672 spring.rabbitmq.username=light spring.rabbitmq.password=light spring.rabbitmq.virtual-host=/test
3.3 编码
配置类:
@configuration public class amqpconfirguration { //=============简单、工作队列模式=============== public static final string simple_queue = "simple_queue"; @bean public queue queue() { return new queue(simple_queue, true); } //===============发布/订阅模式============ public static final string ps_queue_1 = "ps_queue_1"; public static final string ps_queue_2 = "ps_queue_2"; public static final string fanout_exchange = "fanout_exchange"; @bean public queue psqueue1() { return new queue(ps_queue_1, true); } @bean public queue psqueue2() { return new queue(ps_queue_2, true); } @bean public fanoutexchange fanoutexchange() { return new fanoutexchange(fanout_exchange); } @bean public binding fanoutbinding1() { return bindingbuilder.bind(psqueue1()).to(fanoutexchange()); } @bean public binding fanoutbinding2() { return bindingbuilder.bind(psqueue2()).to(fanoutexchange()); } //===============路由模式============ public static final string routing_queue_1 = "routing_queue_1"; public static final string routing_queue_2 = "routing_queue_2"; public static final string direct_exchange = "direct_exchange"; @bean public queue routingqueue1() { return new queue(routing_queue_1, true); } @bean public queue routingqueue2() { return new queue(routing_queue_2, true); } @bean public directexchange directexchange() { return new directexchange(direct_exchange); } @bean public binding directbinding1() { return bindingbuilder.bind(routingqueue1()).to(directexchange()).with("user"); } @bean public binding directbinding2() { return bindingbuilder.bind(routingqueue2()).to(directexchange()).with("order"); } //===============主题模式============ public static final string topic_queue_1 = "topic_queue_1"; public static final string topic_queue_2 = "topic_queue_2"; public static final string topic_exchange = "topic_exchange"; @bean public queue topicqueue1() { return new queue(topic_queue_1, true); } @bean public queue topicqueue2() { return new queue(topic_queue_2, true); } @bean public topicexchange topicexchange() { return new topicexchange(topic_exchange); } @bean public binding topicbinding1() { return bindingbuilder.bind(topicqueue1()).to(topicexchange()).with("user.add"); } @bean public binding topicbinding2() { return bindingbuilder.bind(topicqueue2()).to(topicexchange()).with("user.#"); } }
rabbitmq 有多种工作模式,因此配置比较多。想了解相关内容的读者可以查看《rabbitmq 工作模式介绍》或者自行百度相关资料。
消息生产者:
@component public class amqpsender { @autowired private amqptemplate amqptemplate; /** * 简单模式发送 * * @param message */ public void simplesend(string message) { this.amqptemplate.convertandsend(amqpconfirguration.simple_queue, message); } /** * 发布/订阅模式发送 * * @param message */ public void pssend(string message) { this.amqptemplate.convertandsend(amqpconfirguration.fanout_exchange, "", message); } /** * 路由模式发送 * * @param message */ public void routingsend(string routingkey, string message) { this.amqptemplate.convertandsend(amqpconfirguration.direct_exchange, routingkey, message); } /** * 主题模式发送 * * @param routingkey * @param message */ public void topicsend(string routingkey, string message) { this.amqptemplate.convertandsend(amqpconfirguration.topic_exchange, routingkey, message); } }
消息消费者:
@component public class amqpreceiver { /** * 简单模式接收 * * @param message */ @rabbitlistener(queues = amqpconfirguration.simple_queue) public void simplereceive(string message) { system.out.println("接收消息:" + message); } /** * 发布/订阅模式接收 * * @param message */ @rabbitlistener(queues = amqpconfirguration.ps_queue_1) public void psreceive1(string message) { system.out.println(amqpconfirguration.ps_queue_1 + "接收消息:" + message); } @rabbitlistener(queues = amqpconfirguration.ps_queue_2) public void psreceive2(string message) { system.out.println(amqpconfirguration.ps_queue_2 + "接收消息:" + message); } /** * 路由模式接收 * * @param message */ @rabbitlistener(queues = amqpconfirguration.routing_queue_1) public void routingreceive1(string message) { system.out.println(amqpconfirguration.routing_queue_1 + "接收消息:" + message); } @rabbitlistener(queues = amqpconfirguration.routing_queue_2) public void routingreceive2(string message) { system.out.println(amqpconfirguration.routing_queue_2 + "接收消息:" + message); } /** * 主题模式接收 * * @param message */ @rabbitlistener(queues = amqpconfirguration.topic_queue_1) public void topicreceive1(string message) { system.out.println(amqpconfirguration.topic_queue_1 + "接收消息:" + message); } @rabbitlistener(queues = amqpconfirguration.topic_queue_2) public void topicreceive2(string message) { system.out.println(amqpconfirguration.topic_queue_2 + "接收消息:" + message); } }
消息消费者使用 @rabbitlistener 注解监听消息。
3.4 测试
@runwith(springrunner.class) @springboottest public class amqptest { @autowired private amqpsender sender; @test public void testsimplesend() { for (int i = 1; i < 6; i++) { this.sender.simplesend("test simplesend " + i); } } @test public void testpssend() { for (int i = 1; i < 6; i++) { this.sender.pssend("test pssend " + i); } } @test public void testroutingsend() { for (int i = 1; i < 6; i++) { this.sender.routingsend("order", "test routingsend " + i); } } @test public void testtopicsend() { for (int i = 1; i < 6; i++) { this.sender.topicsend("user.add", "test topicsend " + i); } } }
测试结果略过。。。
踩坑提醒1:access_refused – login was refused using authentication mechanism plain
解决方案:
1) 请确保用户名和密码是否正确,需要注意的是用户名和密码的值是否包含空格或制表符(笔者测试时就是因为密码多了一个制表符导致认证失败)。
2) 如果测试账户使用的是 guest,需要修改 rabbitmq.conf 文件。在该文件中添加 “loopback_users = none” 配置。
踩坑提醒2:cannot prepare queue for listener. either the queue doesn't exist or the broker will not allow us to use it
解决方案:
我们可以登陆 rabbitmq 的管理界面,在 queue 选项中手动添加对应的队列。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。