欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ 01 | RabbitMQ简介

程序员文章站 2022-03-05 18:30:42
...

1. 什么是消息中间件?

    消息(Message)是指在应用间传送的数据,消息队列中间件(Message Queue Middleware,MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

    消息队列中间件(也可以称为消息队列或者消息中间件)一般有两种传递模式:点对点模式(P2P)和发布/订阅模式(Pub/Sub)。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列接收消息,队列的存在使得消息的异步传输成为可能。发布/订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题,主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。

    目前开源的主流消息中间件包括:RabbitMQ、Kafka、ActiveMQ、RocketMQ等。面向消息的中间件(Message Oriented Middleware,MOM)提供了以松散耦合的灵活方式集成应用的一种机制,它们提供了基于存储和转发的应用程序之间的异步数据的发送,即应用之间彼此不直接通信,而是与作为中介的消息中间件通信,消息中间件提供了有保证的消息发送,应用程序开发人员无须了解远程过程调用(RPC)和网络通信协议的细节。

    消息中间件适用于需要可靠数据传输的分布式环境,采用消息中间件的系统中,不同对象之间通过传递消息来**对方的事件完成相应操作。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。消息中间件能在不同平台之间通信,常被用来屏蔽各种平台及协议之间的特性,实现应用程序之间的协同。其优点在于能够在客户和服务器之间提供同步和异步的连接,并且在任何时刻都可以将消息进行传送或者存储转发,这是它优于远程过程调用的原因。

2. 消息中间件的作用

    消息中间件凭借其独到的特性在不同场景下可以展现不同的作用,总体来讲消息中间件的作用可以概括如下:

① 解耦:消息中间件在处理过程中插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许开发者独立地扩展或修改两边的处理过程,只要确保遵循相同的接口约束即可。

② 冗余(存储):在某些情况下处理数据的过程会失败,消息中间件可以把数据进行持久化知道完全被处理。通过这一方式规避了数据丢失风险。在把一个消息从消息中间件删除之前,需要处理系统明确地支出该消息已经被处理完成,从而确保数据被安全的保存直到使用完毕。

③ 扩展性:消息中间件解耦了应用处理过程,提高消息入队和处理效率只需要增加额外的处理过程,不需要其他更改。

④ 削峰:消息中间件可以使得关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。

⑤ 可恢复性:当一部分失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,即使一个处理消息的进程挂掉,加入消息中间件的消息仍然可以在系统恢复后进行处理。

⑥ 顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性。

⑦ 缓冲:在任何重要的系统中,都会存在不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率的执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。

⑧ 异步通信:消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但不立即处理它,再之后需要的时候再慢慢处理

3. RabbitMQ的安装运行

① 安装Erlang:yum install erlang

② 安装RabbitMQ:yum install rabbitmq-server

③ 启动RabbitMQ:rabbitmq-server

④ 验证启动:rabbitmqctl status

⑤ 验证集群:rabbitmqctl cluster_status

4. RabbitMQ的消息生产

// 默认情况下,RabbitMQ服务的用户名和密码都是 guest(仅限本地网络访问),在实现Java端的远程访问时需要添加新用户。
// rabbitmqctl add_user root root 添加用户
// rabbitmqctl set_permissions -p / root ".*" ".*" ".*" 设置所有权限
// rabbitmqctl set_user_tags root administrator 设置管理员角色

public static void main(String[] args) throws Exception{
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(127.0.0.1);
    factory.setPort(5672);    // 默认端口
    factory.setUsername(root);
    factory.setPassword(root);
    Connection connection = factory.newConnection();    // 创建连接
    Channel channel = connection.createChannel();       // 创建信道
    channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);    // 创建一个类型为direct的持久化、非自动删除的交换器
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);             // 创建一个持久化、非排他、非自动删除的队列
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);              // 将交换器与队列通过路由键绑定
    String message = "Hello World!";
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());     // 发送一条持久化信息
    channel.close();
    connection.close();
}

5. RabbitMQ的消息消费

// 不推荐使用QueueingConsumer实现消费(即将废弃),这里采用DefaultConsumer

public static void main(String[] args) throws Exception{
    Address[] addresses = new Address[]{
        new Address("127.0.0.1", 5672)
    };
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(root);
    factory.setPassword(root);
    Connection connection = factory.newConnection(addresses); 
    final Channel channel = connection.createChannel(); 
    channel.basicQos(64);    // 设置客户端最多接收未被ack的消息个数
    Consumer consumer = new DefaultConsumer(channel){
        @override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
            System.out,println("recv message:" + new String(body));
            try {
                TimeUnit.SECONDS。sleep(1);
            } catch (InterruptedException e){
                e.printStackTrace();
            }
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    };
    channel.basicConsumer(QUEUE_NAME, consumer);
    TimeUnit.SECONDS.sleep(5);    // 等待回调函数执行完毕后关闭资源
    channel.close();
    connection.close();
}

 

相关标签: RabbitMQ