RabbitMQ消息中间件
摘要
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
然后消息中间件实际上有很多种,RabbitMQ是各种收费的也好开源的也好中应用的最广泛,功能最为强大的一个,当然其他消息中间件对于某些功能所偏重的是不一样的。
本文主要前半部分主要围绕RabbitMQ展开,就中间件技术的作用、体系结构、原理和开发应用等方面进行概述。
消息中间件理解
RabbitMQ是一种消息中间件,主要解决进程间的通信问题,当消息比较少的时候两个进程之间,使用普通的TCP, udp等协议就可以了,但是当数据量比较大时候,自己写的东西很难保证稳定,可监控,不崩溃,这个时候就有必要把消息的接收与发送单独抽象成一个服务的形式或者另外一个程序单独去处理,而让我们的业务程序不至于崩溃。
关于消息中间件最主要的例子就是咱们的双十一,秒杀抢购这样的大型活动,如果没有消息中间件的技术去接收来自上亿次很短时间内的请求,这样很可能导致整个京东淘宝后台系统的完全崩溃,如果使用了消息中间件的技术,就可以把这上亿次的请求汇总到消息中间件里,这样就可以和前台的主要的系统进行分离,如果崩溃了,那也是消息中间件崩溃,而和我们直接面对客户的服务不至于终止,在界面上没有反应。而消息中间件可以以时间换空间的方式,上亿条消息过来,它一点点的处理,直到所有的都处理完了,再反馈给主要业务系统服务,这样就可以形成一个连续可持续的分布式系统架构了,所以呢消息中间件就是给进程(微服务)提供消息通信的桥梁,但他最主要的功能还是不是这个,提供消息通信桥梁功能的东西多了,tcp udp也能做到,最主要的作用是解决数据高峰的作用,而且有高可靠性的保证。
消息中间件的原理
假设我们国家没有快递公司和营业厅,当我们想买手机的时候,我们需要到厂家去直接购买,当购买的入数特别多的时候,厂家肯定是处理不过来的,那厂家肯定乱成一锅粥,必会有人从中干些坏事偷东西之类的,而且用户也会等的很久不耐烦,这样厂家还怎么做生意。消费者和厂家的这种直接对接的关系就是直连型。
一旦有某一方的消息特别大的时候,就有可能导致另外一方程序崩溃或者是处理不过来,这个时候快递公司就出现了,也就是消息中间件。这些消费者发出请求以后,厂家就会把货交给快递公司再发给厂家,这样的就不会出现厂家爆满的的状况了,这就是消息中间件起到的桥梁作用。
那所谓的高级特性在哪里呢,那就是可以接收不同类型的消息然后对货架进行一个再分类,各找各妈就提高了效率。
除此之外它还有更高级的特性,RabbitMQ中提出了“交换机”这样一种概念,交换机类似一种传送带,它可以非常快捷的指定交换机得到生产端的产品到底放在哪个货架上。而且当小米货架满了的时候,只需要改变下规则就可以把小米手机放到红米货架上了。
同时还可以进一步进行扩展,刚才说交换机可以灵活放到不同消息队列里,其实还可以定义不同的虚拟主机。比如说这个快递公司不仅接受了小米的产品,华为的产品也交给了它处理,那么小米产品和华为产品就不能随随便便混在一起了,这是就可以定义两个不同的虚拟机,下图中上面的虚拟机专门供给小米,下面的专门供给华为。
还有一种情况,消息在发往客户的过程可能会因为各种问题不了了之了,比如邮件队列长度已满,消费者拒绝消息,或者消息已经过期,这称之为死信,就是死亡的信息,这种情况也需要及时的处理,要么捕获,要么让消费者知道到底出了什么样的问题,总之不能不了了之,这就是死信队列为整个系统带来的特殊功能,这也是消息队列必须要有的一个功能。
体系结构
中间件平台必须兼容应用和所依赖环境带来的广泛需求,必须能够吸取这些需求在设计时和系统运行时的改变及对已有技术的集成。为此,有些研究者引入可重配置,但它们是典型的初步 Ad Hoc,通常只是在一系列可选项中进行选择,因此,需要一个更加系统和规范的解决方案。在中间件体系结构设计中,通常采用以下机制支持进一步的集成[1-5]:
(1)构件框架(CF)
CF 是管理一个构件集交互的规则和契约的集合。CF 将一个特定领域的问题处理为一个构件,比如将通信协议作为一个构件,于是一个构件系统中的多个 CF 需要集成。CF 的首要作用是通过限制扩展构件的设计空间提供内置的结构特性和常量。此外,CF 简化了构件的开发和装配,实现了轻量级构件,增加了系统的可理解性与可维护性。
CF 由基础构件和合成构件组成,每一个构件都由与之相联系的工厂对象支持(可以通过简单地引入一个新的工厂对象来扩展 CF)。它基本能体现上述抽象体系结构,其构件能用于反身映射体系结构中的所有层次中。
(2)构件框架元接口
CF 通过元接口相联系。通过元接口,基于 CF 的子系统的执行可以在一定控制和规则下敞开。元接口通常被 CFR 执行,它保留关于当前配置的信息并用该信息实现监测和调整。由于元接口被设计成 CF 的一个完整部分,可以容易地表达特定领域的知识,实现理想的一致性和完整性,因此元接口所提供的灵活性的等级要与有效性、一致性、集成性、可理解性相平衡。通过应用特定的元接口,可以为不同领域实现不同的交易。
(3)统一的构件模型
在上面的体系结构中,应用和中间件构件、CF 全部用相同的构件模型。这有利于增强互操作性、灵活性和节约开发成本,因为开发人员只需熟悉单一的编程模型。另外,它还消除了应用和中间件之间的差异,解决了应用开发和系统开发之间的传统障碍,必要时,应用开发可以再使用系统开发的开发人员。
消息中间件作用
解耦
冗余〈存储):有些情况下处理数据的过程会失败,造成数据丢失,可使用消息中间件进行数据持久化;
扩展性:消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数。
削峰: 在访问量剧增的情况下,程序不会因为突发的超负荷请求而崩溃。
可恢复性: 当系统一部分组件失效时,不会影响到整个系,消息中间件降低了进程间的耦合度,所以即使 个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理。
顺序保证: 在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持 定程度上的顺序性。
缓冲: 在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过 个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速 该缓冲层有助于控制和优化数据流经过系统的速度。
异步通信: 在很多时候应用不想也不需要立即处理消息 消息中间件提供了异步处理机制,允许应用把 些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理。
安装部署
Jdk1.8以上,Java环境的要求,至少要在8版本,或者更高版本,因为会使用lambda表达式
安装otp_win64_21.3.exe
安装rabbitmq-server-3.7.14.exe
安装并启动UI插件的命令
rabbitmq-plugins.bat enable rabbitmq_management
启动服务器的命令
rabbitmq-server.bat
访问地址
http://127.0.0.1:15672/
管理员帐号
guest/guest
IDEA添加RabbitMQ的jar包依赖
配置Maven,添加依赖。把下面的依赖包加入到dependency
com.rabbitmq
amqp-client
简单的消息接收发送
生产者productor.java
,发送一条Message内容是“一部手机”。
package com.zsj.testrabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Productor {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,用来修高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建高速公路
Connection connection = connectionFactory.newConnection();
//划定双向车道
Channel channel = connection.createChannel();
//声明一个消息队列
channel.queueDeclare("Queue小米货架",true,false,false,null);
//channel.queueDeclare("Queue货架-非持久化-重启丢失",false,false,false,null);
//channel.queueDeclare("Queue货架-独占",false,true,false,null);
//发送数据
String oneData = "一部手机";
channel.basicPublish("","Queue小米货架",null,oneData.getBytes());
channel.close();
connection.close();
}
}
消费者Consumer.java
,接收生产者的消息。
package com.zsj.testrabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,用来修高速公路
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//修建高速公路
Connection connection = connectionFactory.newConnection();
//划定双向车道
Channel channel = connection.createChannel();
//接收数据
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
String oneData = new String(body);
oneData = "我收到了" + oneData;
System.out.println(oneData);
}
};
channel.basicConsume("Queue小米货架",true,consumer);
}
}
运行productor.java,发现队列中新增了一条队列,同时有一个信息准备好了。
运行Consumer.java,接收到了生产者发的消息。
本文地址:https://blog.csdn.net/qq_45187522/article/details/109633492
上一篇: cartographer 代码思想解读-算法思想总结
下一篇: Zabbix实战--监控Nginx