欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

消息中间件之RabbitMQ

程序员文章站 2024-03-15 23:26:36
...

1.MQ引言

1.1 什么是MQ

MQ (Message Quene) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,
消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻
辑的侵入,轻松的实现系统间解耦。别名为 消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交
流,并基于数据通信来进行分布式系统的集成。

1.2 MQ有哪些

当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的RocketMQ等。

1.3 不同MQ特点

# 1.ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间
件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
# 2.Kafka
Kafka是LinkedIn开源的分布式发布‐订阅消息系统,目前归属于Apache*项目。Kafka主要特点是基于
Pull的模式来处理消息消费,
追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重
复、丢失、错误没有严格要求,
适合产生大量数据的互联网服务的数据收集业务。
# 3.RocketMQ
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应
用的特点。RocketMQ思路起
源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应
用于交易、充值、流计算、消
息推送、日志流式处理、binglog分发等场景。
# 4.RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、
队列、路由(包括点对点和
发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场
景,对性能和吞吐量的要求还在
其次。

RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢失数据)要求稍低的场景使用,比如ELK日志收集。

2.RabbitMQ的引言

2.1 RabbitMQ

基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

消息中间件之RabbitMQ

官网:https://www.rabbitmq.com/

官方教程:https://www.rabbitmq.com/#getstarted

# AMQP 协议
AMQP(advanced message queuing protocol)`在2003年时被提出,最早用于解决金融领不同平台之间的
消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire‐level protocol(链接协议)。这
是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider
天然性就是跨平台的。以下是AMQP协议模型:

消息中间件之RabbitMQ

2.2 RabbitMQ的安装

2.2.1 安装Erlang

下载地址:http://erlang.org/download/otp_win64_21.3.exe

步骤:

消息中间件之RabbitMQ

消息中间件之RabbitMQ

消息中间件之RabbitMQ

2.2.2 安装RabbitMQ

下载地址:https://dl.bintray.com/rabbitmq/all/rabbitmq-server/3.7.14/rabbitmq-server-3.7.14.exe

步骤:

消息中间件之RabbitMQ

消息中间件之RabbitMQ

2.2.3 进入RabbitMQ安装目录下的sbin目录

消息中间件之RabbitMQ

输入以下命令启动管理功能

rabbitmq-plugins enable rabbitmq_management

消息中间件之RabbitMQ

消息中间件之RabbitMQ

2.2.4 访问地址查看是否安装成功

http://localhost:15672/

输入账号密码并登录:guest guest

消息中间件之RabbitMQ

消息中间件之RabbitMQ

3. RabbitMQ配置

3.1 RabbitMQ管理命令行

# 1.服务启动相关
systemctl start|restart|stop|status rabbitmq‐server
# 2.管理命令行 用来在不使用web管理界面情况下命令操作RabbitMQ
rabbitmqctl help 可以查看更多命令
# 3.插件管理命令行
rabbitmq‐plugins enable|list|disable

3.2 web管理界面介绍

3.2.1 overview概览

消息中间件之RabbitMQ

  • connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况。
  • channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
  • Exchanges:交换机,用来实现消息的路由。
  • Queues:队列,及消息队列,消息存放在队列中,等待消费,消费后被移除队列。

3.2.2 Admin用户和虚拟主机管理

1. 添加用户

消息中间件之RabbitMQ

上面的Tags选项,其实是指定用户的角色,可选的有以下几个:

  • 超级管理员(administrator)
  • 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  • 监控者(monitoring)
  • 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
  • 策略制定者(policymaker)
  • 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
  • 普通管理者(management)
  • 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
  • 其他
  • 无法登陆管理控制台,通常就是普通的生产者和消费者。

2. 创建虚拟主机

为了让各个用户可以互补干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。

消息中间件之RabbitMQ

3. 绑定虚拟主机和用户

创建好虚拟主机,我们还要给用户添加访问权限:

点击添加好的虚拟主机:

消息中间件之RabbitMQ

进入虚拟机设置界面

消息中间件之RabbitMQ

4. RabbitMQ的第一个程序

4.1 AMQP协议回顾

消息中间件之RabbitMQ

Virtual Host虚拟主机:所谓虚拟主机,就类似于数据库中的库,用来为项目做一一的映射,日后不同的项目建立不同的虚拟主机。

4.2 RabbitMQ支持的消息模型

消息中间件之RabbitMQ

消息中间件之RabbitMQ

4.3 建Module引依赖

消息中间件之RabbitMQ

消息中间件之RabbitMQ

消息中间件之RabbitMQ

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.baizhi</groupId>
  <artifactId>rabbitmq_day1</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>war</packaging>

  <name>rabbitmq_day1 Maven Webapp</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <!-- https://mvnrepository.com/artifact/junit/junit -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
    </dependency>

    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.7.2</version>
    </dependency>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter</artifactId>
      <version>RELEASE</version>
    </dependency>
  </dependencies>

  <build>
    <finalName>rabbitmq_day1</finalName>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-war-plugin</artifactId>
          <version>3.2.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

 4.4 第一种模型(直连)

消息中间件之RabbitMQ

在上图的模型中,有以下概念:

P:生产者,也就是要发送消息的程序

C:消费者,消息的接受者,会一直等待消息到来。

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

 

消息中间件之RabbitMQ

package helloword;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author by Jak
 * @date 2020/7/6
 */
public class Provider {

    //生产消息
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
/*
        //创建连接mq的连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("10.67.167.183");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接哪个虚拟主机
        connectionFactory.setVirtualHost("/springCloud");
        //设置访问虚拟主机的用户名和密码
        connectionFactory.setUsername("jak");
        connectionFactory.setPassword("123456");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
*/

        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中通道
        Channel channel = connection.createChannel();


        //通道绑定对应消息队列
        //参数1:队列名称,如果队列不存在自动创建
        //参数2:用来定义队列特性是否要持久化 true 持久化队列 false 不持久化,RabbitMQ重启,保证队列的持久化,但不能保证消息持久化
        //参数3:exclusive 是否独占队列 true 独占队列 false 不独占
        //参数4:autoDelete:是否在消费完成后自动删除队列 true 自动删除 false 不自动删除,消费者彻底断掉
        //参数5:额外附加参数
        channel.queueDeclare("hello", true, false, true, null);

        //发布消息
        //参数1:交换机名称 参数2:队列名称 参数3:传递消息额外设置 参数4:消息的具体内容
        //rabbitMQ重启,保证消息的持久化
        channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "hello rabbitmq".getBytes());

/*        channel.close();
        connection.close();*/
        //调用工具类
        RabbitMQUtils.closeConnectionAndChanel(channel, connection);

    }
}

启动程序,再次启动,再发一条消息,变为2

http://localhost:15672/#/queues

消息中间件之RabbitMQ

消息中间件之RabbitMQ

4.5 开发消费者

package helloword;

import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author by Jak
 * @date 2020/7/6
 */
public class Consumer {

    public static void test() throws IOException, TimeoutException {
/*        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("10.67.167.183");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/springCloud");
        connectionFactory.setUsername("jak");
        connectionFactory.setPassword("123456");

        //创建连接对象
        Connection connection = connectionFactory.newConnection();*/

        //通过工具类获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //创建通道
        Channel channel = connection.createChannel();

        //通道绑定对象, 生产者和消费者要对应,既持久化,都持久化
        channel.queueDeclare("hello", true, false, true, null);

        //消费消息
        //参数1:消费哪个队列的消息 队列名称
        //参数2:开启消息的自动确认机制
        //参数3:消费时的回调接口
        channel.basicConsume("hello", true, new DefaultConsumer(channel) {
            @Override //最后一个参数:消息队列中取出的消息
            public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                System.out.println("new String(body) = " + new String(body));
            }
        });
    }

    public static void main(String[] args) throws IOException, TimeoutException{
        test();
    }
}

工具类

package utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


/**
 * @author by Jak
 * @date 2020/7/6
 */
public class RabbitMQUtils {

    private static ConnectionFactory connectionFactory;
    static {
        connectionFactory = new ConnectionFactory();
        //重量级资源  静态代码块,类加载执行一次
        connectionFactory.setHost("10.67.167.183");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/springCloud");
        connectionFactory.setUsername("jak");
        connectionFactory.setPassword("123456");
    }
    //定义提供连接对象的方法
    public static Connection getConnection() {
        try {

            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    //关闭通道和关闭连接工具方法
    public static void closeConnectionAndChanel(Channel channel, Connection conn) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (conn != null) {
                conn.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

消息中间件之RabbitMQ

消息中间件之RabbitMQ

 

 

 

 

相关标签: RabbitMQ