Spring Boot与RabbitMQ
Spring Boot与消息
一、概述
-
大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力
-
消息服务中两个重要概念: 消息代理(message broker)和目的地(destination)
当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目 的地。
-
消息队列主要有两种形式的目的地
- 队列(queue):点对点消息通信(point-to-point)
- 主题(topic):发布(publish)/订阅(subscribe)消息通信
什么是异步处理和消息队列?
当没有使用异步处理时(图一),注册信息写入数据库–>发送注册邮件–>发送注册邮件,这三件事只能一件一件顺序执行;
加入了异步处理后(图二),当我们完成“注册信息写入数据库“”这个操作后,我们可以同时执行“发送注册邮件“和“发送注册短信这两个操作
加入了异步处理和消息队列后(图三),当我们完成“注册信息写入数据库“”这个操作后,把数据写入消息队列,“发送注册邮件“和“发送注册短信这两个操作直接在消息队列中读取消息
如图所示,执行完三个操作,图一耗时150ms,图二耗时100ms,图三耗时55ms
什么是应用解耦?
这里引用别人的一段话来解释:
在软件工程中, 耦合指的就是就是对象之间的关联性。对象之间的耦合越高,维护成本越高。因此对象的设计应使类和构件之间的耦合最小。 软件设计中通常用耦合度和内聚度作为衡量模块独立程度的标准。 划分模块的一个准则就是高内聚低耦合。
高内聚,低耦合:简单来说,就是类内部的关系越紧密越好,类与类之间的关系越少越好。
在下面图中所示:
在图一中,订单系统直接调用库存系统,耦合度高,当我们订单数一旦很大库存系统响应不过来时,订单系统会一直等待库存系统响应,这样导致两个系统都处于无法使用的状态
图二中,我们在订单系统和库存系统之间加入了消息队列,库存系统把某些数据缓存到消息队列中,订单系统想要这部分数据可以直接在消息队列取信息,当库存系统出现问题而无法响应时,短时间内也不会影响订单系统的运行
什么是流量削峰?
- 点对点式: – 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容, 消息读取后被移出队列 – 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
- 发布订阅式: – 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么 就会在消息到达时同时收到消息
- JMS(Java Message Service)JAVA消息服务: – 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
- AMQP(Advanced Message Queuing Protocol) – 高级消息队列协议,也是一个消息代理的规范,兼容JMS – RabbitMQ是AMQP的实现
JMS与AMQP对比:
-
Spring支持
– spring-jms提供了对JMS的支持
– spring-rabbit提供了对AMQP的支持
– 需要ConnectionFactory的实现来连接消息代理
– 提供JmsTemplate、RabbitTemplate来发送消息
– @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
– @EnableJms、@EnableRabbit开启支持
-
Spring Boot自动配置
– JmsAutoConfiguration
– RabbitAutoConfiguration
二、RabbitMQ
2.1 RabbitMQ简介:
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
2.2 核心概念介绍 :
概述 | |
---|---|
Message | 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组 成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出 该消息可能需要持久性存储)等。 |
Publisher | 消息的生产者,也是一个向交换器发布消息的客户端应用程序。 |
Exchange | 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有 所区别 |
Queue | 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息 可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 |
Binding | 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连 接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定可以是多对多的关系。 |
Connection | 网络连接,比如一个TCP连接。 |
Channel | 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚 拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这 些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所 以引入了信道的概念,以复用一条 TCP 连接。 |
Consumer | 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 |
Virtual Host | 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加 密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有 自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定, RabbitMQ 默认的 vhost 是 / 。 |
Broker | 表示消息队列服务器实体 |
各大核心关系:
2.3 RabbitMQ运行机制
2.3.1 AMQP 中的消息路由
• AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。
生产者把消息发布到 Exchange 上,消息最终到达队列并被 消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
]
2.3.2 Exchange 的四种类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型: direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多, 目前几乎用不到了,所以直接看另外三种类型:
- Direct Exchange
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列(Queues)中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为 “dog”,则只转发 routing key 标记为“dog”的消息,不会转 发“dog.puppy”,也不会转发“dog.guard”等等。它是完全 匹配、单播的模式。(类似于jms里的队列通信)
- Fanout Exchange
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键, 只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的 主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
- Topic Exchange
topic 交换器通过模式匹配分配消息的路由键属 性,将路由键和某个模式进行匹配,此时队列 需要绑定到一个模式上。它将路由键和绑定键 的字符串切分成单词,这些单词之间用点隔开。 它同样也会识别两个通配符:符号“#”和符号 “***** ” 。 # 匹配 0 个或多个单词 , *****匹配一个单词。
2.4 springboot整合RabbitMQ
2.4.1 环境搭建
1、在centos7中使用dockers下载rabbitmq镜像
在centos7拉取rabbitmq镜像
docker pull rabbitmq:3-management
运行rabbitmq
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq (IMAGE ID)
访问rabbitmq的web管理页面,浏览器输入
http://192.168.0.160:15672 #192.168.0.160为主机ip
账号密码默认都为: guest
按照此图所示关系信息利用web管理后台在rabbitmq里面添加Exchanges和Queues,并绑定他们.(具体步骤这里就不详细写出来了)
添加完后,web管理后台显示:
2、创建java工程
a、创建工程(使用spring initalizr创建工程),选择导入web、RabbitMQ模块
在pom.xml文件中自动导入相关依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
b、在application.properties中配置rabbitMQ的相关配置
#配置rabbitmq
spring.rabbitmq.host=192.168.0.160
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#spring.rabbitmq.port=5672
#spring.rabbitmq.virtual-host=/
2.4.2 简单测试发送和获取消息
- (单播)发送消息给指定消息队列
@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 1、单播(点对点),发送一条消息给指定消息队列
*/
@Test
public void directTest() {
//Message需要自己构造一个;定义消息体内容和消息头
//rabbitTemplate.send(exchage,routeKey,message);
//object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq;
//rabbitTemplate.convertAndSend(exchage,routeKey,object);
Map<String,Object> map = new HashMap<>();
map.put("msg","这是第一个消息");
map.put("data", Arrays.asList("helloworld",123,true));
//对象被默认序列化以后发送出去
//发送的是自定义javabean要注意导入有参、无参构造器、get、set方法
rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",map);
}
执行该测试类后,在rabbitMQ的web管理后台的atguigu.news队列中能查到发送过去的信息
此时我们可以看到,在web管理后台上显示的数据是经过序列化的
我们查看RabbitTemplate的源码里有一条属性是来进行消息的转换
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
implements BeanFactoryAware, RabbitOperations, MessageListener,
ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
.....................
private MessageConverter messageConverter = new SimpleMessageConverter();
.....................
.....................
默认使用的是SimpleMessageConverter
要想在web管理后台看到传入消息的 json 格式的数据我们需要往容器中加入一个MessageConverter组件,我们使用MessageConverter的实现类Jackson2JsonMessageConverter来实现数据转换
@Configuration
public class MyRabbitmqConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
执行测试方法给“atguigu.news”队列发消息,查看后台 ,发现数据已经转换成json格式显示
-
(fanout多播)发送消息给所有队列
@Test public void fanoutTest(){ rabbitTemplate.convertAndSend("exchange.fanout","",new Book("西游记","吴承恩")); }
web管理后台:所有序列都受到了消息
- 从指定消息队列获取到消息
@Test
public void recive(){
//从指定队列中获取消息
//该方法把获取到的消息自动转换成对相应的对象
Object o = rabbitTemplate.receiveAndConvert("atguigu.news");
System.out.println(o.getClass());
System.out.println(o);
}
控制台打印结果
注意:消息队列里的数据被“customer”获取后便会删除
2.4.3 springboot里rabbitMQ的自动配置
RabbitAutoConfiguration源码探析
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
//1、RabbitProperties 封装了 RabbitMQ的配置
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
//2、自动配置了连接工厂ConnectionFactory
@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {
@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
throws Exception {
...........
}
}
// 3、自动配置了RabbitTemplate :给RabbitMQ发送和接受消息;
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitTemplate.class)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = this.messageConverter.getIfUnique();
if (messageConverter != null) {
rabbitTemplate.setMessageConverter(messageConverter);
}
rabbitTemplate.setMandatory(determineMandatoryFlag());
RabbitProperties.Template templateProperties = this.properties.getTemplate();
RabbitProperties.Retry retryProperties = templateProperties.getRetry();
if (retryProperties.isEnabled()) {
rabbitTemplate.setRetryTemplate(createRetryTemplate(retryProperties));
}
if (templateProperties.getReceiveTimeout() != null) {
rabbitTemplate.setReceiveTimeout(templateProperties.getReceiveTimeout());
}
if (templateProperties.getReplyTimeout() != null) {
rabbitTemplate.setReplyTimeout(templateProperties.getReplyTimeout());
}
return rabbitTemplate;
}
// 4、 AmqpAdmin : RabbitMQ系统管理功能组件;
// AmqpAdmin:创建和删除 Queue,Exchange,Binding
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean(AmqpAdmin.class)
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
// 5、@EnableRabbit + @RabbitListener 监听消息队列的内容
}
2.4.4 @EnableRabbit + @RabbitListener 监听消息队列的内容
@RabbitListener`注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。
可以在配置文件中设置RabbitListenerAnnotationBeanPostProcessor并通过rabbit:annotation-driven来设置@RabbitListener
的执行,当然也可以通过@EnableRabbit注解来启用@RabbitListener
。
@EnableRabbit注解来启用@RabbitListener
- 使用@EnableRabbit 开启基于注解的RabbitMQ模式
@EnableRabbit //开启基于注解的RabbitMQ模式
@SpringBootApplication
public class Springboot02AmqpApplication {
public static void main(String[] args) {
SpringApplication.run(Springboot02AmqpApplication.class, args);
}
}
-
编写service层代码,使用 @RabbitListener注解监听指定队列
注意: @RabbitListener 监听到指定队列有消息进入,会立即消费掉这条信息,队列会被清空
@Service
public class BookService {
@RabbitListener(queues = "atguigu.news")
public void receive(Book book){//把监听到的消息封装成Book对象
System.out.println("收到消息:"+book);
}
@RabbitListener(queues = "atguigu")
public void receive02(Message message){//把监听到的消息封装成Message
System.out.println(message.getBody());
System.out.println(message.getMessageProperties(2.));
}
}
2.4.5 使用代码生成Exchange、Queues,指定Binding
AmqpAdmin : RabbitMQ系统管理功能组件;
使用AmqpAdmin 类 创建和删除 Queue,Exchange,Binding
- 创建Exchange
@SpringBootTest
class SpringBoot02AmqbApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void creatExchange(){
//这里使用Exchange的实现类去创建对应类型的Exchange
amqpAdmin.declareExchange(new FanoutExchange("excheang.amqpAdmin"));
}
代码执行后,在web管理后台可以查看到新建的Exchange
- 创建Queues
@Test
public void creatQueues(){
//传入队列名和durable参数
amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
}
代码执行后,在web管理后台可以查看到新建的Queues
- 绑定规则
@Test
public void bindRule(){
amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"excheang.amqpAdmin","amqpadmin.routingKey",null));
}
// Binding的构造器
public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
@Nullable Map<String, Object> arguments) {
代码执行后,在web管理后台在"excheang.amqpAdmin"可以查看到绑定规则
- AmqpAdmin的其他方法说明
上一篇: Mysql数据库乱码总结_MySQL
推荐阅读
-
spring boot静态变量注入配置文件详解
-
详解spring boot jpa整合QueryDSL来简化复杂操作
-
Spring Boot 配置文件详解(小结)
-
spring boot整合Cucumber(BDD)的方法
-
使用Spring Boot集成FastDFS的示例代码
-
spring boot aop 记录方法执行时间代码示例
-
Spring Boot与Spark、Cassandra系统集成开发示例
-
Spring MVC源码(一) ----- 启动过程与组件初始化
-
.NET CORE与Spring Boot编写控制台程序应有的优雅姿势
-
十、Spring boot 简单优雅的整合 Swagger2