SpringBoot集成RabbitMQ---三种交换机实现
一、什么是消息队列?
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
为什么要使用消息队列
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。
RabbitMQ特点
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗 具体特点包括:
1、可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
2、灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
3、 消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
4、 高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
5、多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
6、多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
7、 管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
8、 跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
二、RabbitMQ安装
一般来说安装 RabbitMQ 之前要安装 Erlang ,可以去Erlang官网下载。接着去RabbitMQ官网下载安装包,之后解压缩即可。
Erlang官方下载地址:https://www.erlang.org/downloads
RabbitMQ官方下载地址:https://www.rabbitmq.com/download.html
Erlang 就相当于Java的jdk环境一样
2.1安装前的准备
依赖包安装
安装RabbitMQ之前必须要先安装所需要的依赖包可以使用下面的一次性安装命令
yum install gcc glibc-devel make ncurses-devel openssl-devel xmlto -y
安装Erlang
1、将Erlang源代码包otp_src_19.3.tar.gz上传到Linux的/home目录下
2、解压erlang 源码包
tar -zxvf otp_src_19.3.tar.gz
3、手动创建erlang 的安装目录
mkdir /usr/local/erlang
4、进入erlang的解压目录
cd otp_src_19.3
5、配置erlang的安装信息
./configure --prefix=/usr/local/erlang --without-javac
6、编译并安装
make && make install
7、配置环境变量
vim /etc/profile
8、将这些配置填写到profile文件的最后
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH
9、启动环境变量配置文件(这一步非常重要)
source /etc/profile
安装RabbitMQ
1、将RabbitMQ安装包
rabbitmq-server-3.7.2-1.el7.noarch.rpm
上传到/home目录
2、安装RabbitMQ
rpm -ivh --nodeps rabbitmq-server-3.7.2-1.el7.noarch.rpm
RabbitMQ常用命令
在这里我们知道RabbitMQ的启动和关闭就可以了,其他的操作我们在客户端就可以完成。
1、启动RabbitMQ
任意目录下执行:
rabbitmq-server start &
注意:这里可能会出现错误,错误原因是/var/lib/rabbitmq/.erlang.cookie文件权限不够。
解决方案对这个文件授权
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie
chmod 400 /var/lib/rabbitmq/.erlang.cookie
2、停止服务
任意目录下执行:
rabbitmqctl stop
插件管理
为什么要进行插件管理呢?
在我们没有进行安装插件之前,我们启动RabbitMQ如下图;
然后我们使用浏览器访问管控台http://RabbitMQ服务器IP:15672
http://192.168.64.128:15672
出现无法连接,这是因为我们没有安装插件。
现在我们进行插件的安装命令:
rabbitmq-plugins enable rabbitmq_management
输入命令:
现在我们再次连接浏览器访问管控台http://RabbitMQ服务器IP:15672
http://192.168.64.128:15672
出现:
怎么登录呢?现在我们来添加用户和密码用于登录。
1、添加用户:rabbitmqctl add_user {username} {password}
rabbitmqctl add_user root root
2.删除用户:rabbitmqctl delete_user {username}
执行添加用户,此时用户名就是root 密码也是root
然后我们进行登录:
又出现了没有权限,登录不成功,现在我们来设置权限
3、设置用户角色:rabbitmqctl set_user_tags {username} {tag}
rabbitmqctl set_user_tags root administrator
然后我们再次登录:
登录成功。
三、三种交换机
1.direct交换机
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
如图:
Direct交换机:RabbitMQ的交换机会根据消息中的RoutingKey的内容精准的匹配将消息发送与RoutingKey完全一致的Queue中,而队列需要指定好BindingKey,BindingKey必须与消息中的RoutingKey完全一致。
消费者只需要监听某个队列以后就会获取队列中的信息中的消息。
所以在springboot集成direct交换机的时候,先运行发送的send再运行接收recive。
SpringBoot集成direct交换机
1.新建direct交换机消息发送者send工程
2.配置application.properties
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
3.工程发送者的结构如下
创建sendservice接口
package com.zy.service;
public interface sendservice {
void sendMessage(String message);
}
创建sendservice实现类
package com.zy.service.impl;
import com.zy.service.sendservice;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendMessage")
public class sendserviceImpl implements sendservice {
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(String message) {
/*
参数1是交换机名称
参数2是RountingKey
参数3是我们具体发送的数据
*/
amqpTemplate.convertAndSend("bootDirectExchange","bootDirectRouting",message);
}
}
创建config包下的配置类rabbitmq注入交换机 队列Bean
package com.zy.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class rabbitmq {
//配置一个Direct类型的交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("bootDirectExchange");
}
//配置一个队列
@Bean
public Queue directQueue(){
return new Queue("bootDirectQueue");
}
//配置一个队列和交换机的绑定
@Bean
public Binding directBinding(Queue directQueue,DirectExchange directExchange){
//完成绑定 参数1是需要绑定的队列 参数2是需要绑定的交换机 参数3是sengserviceImpl里面的交换机的routingKey
return BindingBuilder.bind(directQueue).to(directExchange).with("bootDirectRouting");
}
}
发送者send的application启动类
package com.zy;
import com.zy.service.sendservice;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(Application.class, args);
sendservice service =
(sendservice) ac.getBean("sendMessage");
service.sendMessage("Boot的测试数据");
}
}
4.新建direct交换机消息接收者recive工程
工程结构图如下:
配置application.properties
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
reciveservice接口
package com.zy.service;
public interface reciveservice {
// void reciveMessage();
void directRecive(String message);
}
reciveservice接口实现类
package com.zy.service.impl;
import com.zy.service.reciveservice;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendMessage")
public class reciveserviceImpl implements reciveservice {
@Resource
private AmqpTemplate amqpTemplate;
@Override
//下面这个方法 每次只能接收到一个消息,
//也就是说 每次发送一条消息,这里运行一次,然后接收一个消息
//如果再次发送一条消息,就不会再接收,除非再次运行这个项目
//不能做到 只运行一次就可以接收源源不断的消息 所以我们采用下面的方法
//用来源源不断的接收每次发送的消息,而且只用运行一次接收消息的项目
//public void reciveMessage() {
// String message = (String) amqpTemplate.receiveAndConvert("bootDirectQueue");
//}
/*
RabbitListener 这个注解用来标记当前方法是Rabbitmq的消息监听方法,作用是持续性的接收消息
queues这个属性用来指定一个已经存在的队列名 用于进行队列的监听
*/
@RabbitListener(queues = {"bootDirectQueue"})
public void directRecive(String message){
System.out.println("监听器接收的消息"+message);
}
}
配置类和上面的配置类代码一样
application启动类
package com.zy;
import com.zy.service.reciveservice;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(Application.class, args);
reciveservice service =
(reciveservice) ac.getBean("sendMessage");
// service.reciveMessage();
}
}
现在开始测试
1.启动rabbit服务:
2.启动发送者send的application启动类
启动成功,现在我们查看rabbitmq的控制端
发现Exchange端多了我们刚才加入的客户机bootDirectExchange
我们也发现队列也多了,Ready就是我们发送的消息,变为1,证明我们发送了一条消息,现在这条消息在我们绑定的队列里面存放着,等着我们启动消息接收类来识别 获取消息,一旦识别成功,获取到消息,Ready就变为0。
3.启动接收者recive的application启动类
消息接收成功,此时我们看rabbitmq控制端
Ready已经变为0,此时证明消息获取成功。
至此:springboot集成direct交换机到此结束。
2.Fanout交换机(是一种广播模式 消息是一对多 没有两个key)
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
如图:
Fanout是一种广播模式,消息是一对多,在这种模式中,没有RoutingKey和BingdingKey,Bindings知识简单的将消息和交换机进行绑定,如果消息进入到了交换机中,那么消息就会转发到所有与当前交换机绑定的所有队列中。但是这种模式就和我们收看电视直播一样,必须要先在消费者中建立监听队列,就像我们观看直播前必须要打开电视的某个频道一样,不然就永远错过这场直播了。
这种模式的交换机适合用在群发一些不重要的消息,用户接收或接收不到消息都不重要,例如手机消息app推送。
SpringBoot集成fanout换机
1.新建direct交换机消息接收者recive工程
工程的结构如下:
配置application.properties
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
reciveservice接口(创建两个监听队列,实现群接收)
package com.zy.service;
public interface reciveservice {
void FanoutRecive01(String meaasge);
void FanoutRecive02(String meaasge);
}
reciveservice接口实现类
package com.zy.service.impl;
import com.zy.service.reciveservice;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendMessage")
public class reciveserviceImpl implements reciveservice {
@Resource
private AmqpTemplate amqpTemplate;
@RabbitListener(bindings = {
@QueueBinding(///完成队列和交换机的绑定
value = @Queue(),//@Queue() 创建一个队列 没有参数则表示创建一个随机队列
exchange = @Exchange(name = "FanoutExchange",type = "fanout"))//创建一个交换机
})
public void FanoutRecive01(String meaasge){
System.out.println("监听器接收"+meaasge);
}
@RabbitListener(bindings = {
@QueueBinding(///完成队列和交换机的绑定
value = @Queue(),//@Queue() 创建一个队列 没有参数则表示创建一个随机队列
exchange = @Exchange(name = "FanoutExchange",type = "fanout"))//创建一个交换机
})
public void FanoutRecive02(String meaasge){
System.out.println("监听器接收"+meaasge);
}
}
application启动类
package com.zy.service.impl;
import com.zy.service.reciveservice;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendMessage")
public class reciveserviceImpl implements reciveservice {
@Resource
private AmqpTemplate amqpTemplate;
@RabbitListener(bindings = {
@QueueBinding(///完成队列和交换机的绑定
value = @Queue(),//@Queue() 创建一个队列 没有参数则表示创建一个随机队列
exchange = @Exchange(name = "FanoutExchange",type = "fanout"))//创建一个交换机
})
public void FanoutRecive01(String meaasge){
System.out.println("监听器接收"+meaasge);
}
@RabbitListener(bindings = {
@QueueBinding(///完成队列和交换机的绑定
value = @Queue(),//@Queue() 创建一个队列 没有参数则表示创建一个随机队列
exchange = @Exchange(name = "FanoutExchange",type = "fanout"))//创建一个交换机
})
public void FanoutRecive02(String meaasge){
System.out.println("监听器接收"+meaasge);
}
}
现在新建发送者工程
工程结构图如下:
application.properties配置和上面的配置文件中的代码一样
sendservice接口类
package com.zy.service;
public interface sendservice {
void sendFanoutMessage(String message);
}
sendservice接口实现类
package com.zy.service.impl;
import com.zy.service.sendservice;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendMessage")
public class sendserviceImpl implements sendservice {
@Resource
private AmqpTemplate amqpTemplate;
@Override
public void sendFanoutMessage(String message) {
amqpTemplate.convertAndSend("FanoutExchange","",message);
}
}
send发送者的配置类config 目的是确定绑定好交换机
package com.zy.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class rabbitmq {
//配置一个Fount类型的交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("FanoutExchange");
}
}
send启动类applicaion
package com.zy;
import com.zy.service.sendservice;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(Application.class, args);
sendservice service =
(sendservice) ac.getBean("sendMessage");
service.sendFanoutMessage("Boot的测试数据");
}
}
现在开始测试
1.启动rabbit服务:
2.启动接收recive的application启动类
启动成功,现在我们查看rabbitmq的控制端
我们发现已经多了两个随机监听队列,用来获取消息。
3.启动发送者send的application启动类
然后我们看接收者的控制台发现消息接收成功:
至此:springboot集成fanout交换机到此结束。
2.topic交换机
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,“”匹配不多不少一个单词。
如图:
消息Routingkey键aa和队列1和2的BindKey键都可以匹配上,因此消费者1和2可以接收到消息。
李四的rountkey是aa.bb 下面与之匹配的bingingkey只有第二个和第三个 所以第二个和第三个可以让下面的消费者拿到消息。
SpringBoot集成topic交换机
topic交换机和fanout交换机一样,必须先创建消息监听队列,也就是先创建消息接收者recive
新建消息接收者recive工程
工程结构图如下:
配置application.properties
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
reciveservice接口(创建三个监听队列,实现匹配接收)
package com.zy.service;
public interface reciveservice {
void TopicRecive01(String message);
void TopicRecive02(String message);
void TopicRecive03(String message);
}
reciveservice接口实现类
package com.zy.service.impl;
import com.zy.service.reciveservice;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendMessage")
public class reciveserviceImpl implements reciveservice {
@Resource
private AmqpTemplate amqpTemplate;
@RabbitListener(bindings = {
@QueueBinding(///完成队列和交换机的绑定
value = @Queue("topic01"),key = {"aa"},//@Queue() 创建一个队列 没有参数则表示创建一个随机队列
exchange = @Exchange(name = "topicExchange",type = "topic"))//创建一个交换机
})
public void TopicRecive01(String message){
System.out.println("topic01监听器接收"+message);
}
@RabbitListener(bindings = {
@QueueBinding(///完成队列和交换机的绑定
value = @Queue("topic02"),key = {"aa.*"},//@Queue() 创建一个队列 没有参数则表示创建一个随机队列
exchange = @Exchange(name = "topicExchange",type = "topic"))//创建一个交换机
})
public void TopicRecive02(String message){
System.out.println("topic02监听器接收"+message);
}
@RabbitListener(bindings = {
@QueueBinding(///完成队列和交换机的绑定
value = @Queue("topic03"),key = {"aa.#"},//@Queue() 创建一个队列 没有参数则表示创建一个随机队列
exchange = @Exchange(name = "topicExchange",type = "topic"))//创建一个交换机
})
public void TopicRecive03(String message){
System.out.println("topic03监听器接收"+message);
}
}
启动类application
package com.zy;
import com.zy.service.reciveservice;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(Application.class, args);
reciveservice service =
(reciveservice) ac.getBean("sendMessage");
}
}
新建消息发送者工程
工程结构图如下;
配置application.properties
spring.rabbitmq.host=192.168.64.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
sendservice接口
package com.zy.service;
public interface SendService {
void sendTopicMessage(String message);
}
sendservice接口实现类(在这里我们给交换机的是aa.bb)
package com.zy.service.impl;
import com.zy.service.SendService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service("sendMessage")
public class SendServiceImpl implements SendService {
//注入Amqp的模板类,利用这个对象来发送和接收消息
@Resource
private AmqpTemplate amqpTemplate;
public void sendTopicMessage(String message) {
amqpTemplate.convertAndSend("topicExchange","aa.bb",message);
}
}
配置类config 确保绑定一个topic类型的交换机
package com.zy.config;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// //配置一个 Topic 类型的交换
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
}
发送者启动类
package com.zy;
import com.zy.service.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
ApplicationContext ac = SpringApplication.run(Application.class, args);
SendService service =
(SendService) ac.getBean("sendMessage");
service.sendTopicMessage("Boot的测试数据");
}
}
现在开始测试
1.启动rabbit服务:
2.启动接收recive的application启动类
查看rabbitmq的控制端
发现多了交换机topicExchange且类型是topic的
发现也多了刚才创建的消息接收队列
2.启动发送send的application启动类
接下来就留给你们来做咯~ 和上面两个交换机的步骤一样,你们自己修改数据去测吧。
本文地址:https://blog.csdn.net/CSDg166/article/details/109641280
上一篇: 【Redis实现系列】初始化服务器
下一篇: Mysql大厂高频面试题
推荐阅读
-
springboot集成schedule实现定时任务
-
SpringBoot集成Swagger2实现Restful(类型转换错误解决办法)
-
浅谈SpringBoot集成Redis实现缓存处理(Spring AOP实现)
-
详解SpringBoot集成Redis来实现缓存技术方案
-
SpringBoot集成Swagger2实现Restful(类型转换错误解决办法)
-
SpringBoot集成JWT实现权限认证
-
SpringBoot2.0 基础案例(08):集成Redis数据库,实现缓存管理
-
SpringBoot框架集成ElasticSearch实现过程示例详解
-
详解springboot集成websocket的两种实现方式
-
SpringBoot2.0集成WebSocket实现后台向前端推送信息