欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ原生API

程序员文章站 2024-01-12 13:07:34
...

使用RabbitMQ原生的API来操作消息的生产和消费。

首先先介绍一个简单的一个消息推送到接收的流程,提供一个简单的图:
RabbitMQ原生API

黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是RabbitMQ的服务器,然后经过服务器里面的交换机、队列等各种关系将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。

常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种:

Direct Exchange

直连型交换机,根据消息携带的路由值将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键routing key
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

Fanout Exchange

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

Topic Exchange

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键是有规则的。
简单地介绍下规则:
*(星号) 用来表示一个单词 (必须出现的)
#(井号) 用来表示任意数量(零个或多个)单词
通配的路由键是跟队列进行绑定的,举个小例子:
队列Q1路由键为*.TT.*,队列Q2路由键为TT.#
如果一条消息携带的路由值为A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由值为TT.AA.BB,那么队列Q2将会收到;

当一个队列的绑定键为#(井号)的时候,这个队列将会无视消息的路由键,接收所有的消息。
*(星号)和#(井号)这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机和直连交换机的功能。

常用的就是以上3种交换机,另外还有Header Exchange头交换机,Default Exchange默认交换机,Dead Letter Exchange死信交换机。

1、Maven依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.4.RELEASE</version>
</parent>
<dependencies>
    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

2、生产消息

public class Producer {
    public static final String HOST = "148.70.153.63";
    public static final String USER_NAME = "libai";

    public static void main(String[] args) throws IOException, TimeoutException {
        Producer producer = new Producer();
        // 获取connectionFactory
        ConnectionFactory connectionFactory = producer.getConnectionFactory();

        // 创建连接和通道
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "amq.direct", queueName = "apiQueue1", routingKey = "RabbitMQ";
        // 声明交换机(如果不存在才创建),交换机名称、类型、交换机是否持久化
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
        // 声明队列(如果不存在才创建),队列名称、队列是否持久化、是否排他(连接可见性)、是否自动删除(所有消费者断开连接后删除队列)、参数
        channel.queueDeclare(queueName, true, false, false, null);
        // 将队列和交换机绑定并指定路由键
        channel.queueBind(queueName, exchangeName, routingKey);

        // 发送消息
        String msg = "Hello RabbitMQ!";
        // 把消息发送到指定的交换机,交换机根据路由键推送到绑定的队列中;交换机名称、路由键、属性、消息字节
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());

        // 释放连接
        channel.close();
        connection.close();
    }

    /**
     * 获取连接工厂
     *
     * @return
     */
    public ConnectionFactory getConnectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);
        connectionFactory.setPort(AMQP.PROTOCOL.PORT);
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setPassword(System.getProperty("password"));
        connectionFactory.setVirtualHost(ConnectionFactory.DEFAULT_VHOST);
        return connectionFactory;
    }
}
  1. 创建连接Connection和通道Channel
  2. 声明交换机Exchang,如果不存在就会创建,可以指定交换机类型以及是否持久化。
  3. 声明队列Queue,如果不存在也会创建。
  4. 将队列Queue和交换机Exchang绑定,并指定绑定的路由键RoutingKey
  5. 发送消息到指定的交换机Exchang,交换机会根据路由键RoutingKey找到绑定的队列Queue,并把消息发送到队列中。
  6. 关闭连接Connection和通道Channel

3、消费消息

3.1、消费者主动拉取模式

@Slf4j
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Producer producer = new Producer();
        // 获取connectionFactory
        ConnectionFactory connectionFactory = producer.getConnectionFactory();

        // 创建连接和通道
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 消费消息-客户端主动拉取模式、自动ACK确认
        // 性能较差,每次都要创建、关闭连接和通道。
        GetResponse response = channel.basicGet("apiQueue1", true);
        String msg = Optional.ofNullable(response).map(GetResponse::getBody).map(String::new).orElse(null);
        log.info("消费消息:[{}]", msg);
        channel.close();
        connection.close();
    }
}

这种模式在创建连接和通道后,主动从server中拉取消息,效率、性能比较低下。

3.2、server推送模式

@Slf4j
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Producer producer = new Producer();
        // 获取connectionFactory
        ConnectionFactory connectionFactory = producer.getConnectionFactory();

        // 创建连接和通道
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 消费消息-server推送模式,创建连接和通道后,等待队列推送消息然后进行消费
        // 不能关闭连接和通道
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                long now = System.currentTimeMillis();
                if (now % 3 == 0) {
                    log.info("手动确认消费消息:[{}]", new String(body));
                    // 消息唯一标记、是否确认多条(true则批量确认小于当前标记号的所有消息)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } else if (now % 3 == 1) {
                    log.info("basicNack:[手动拒绝消息,重回队列]");
                    // 消息唯一标记、是否拒绝多条(true则批量拒绝小于当前标记号的所有消息)、是否重回队列
                    // 重回队列后仍旧会被当前消费者再次消费
                    channel.basicNack(envelope.getDeliveryTag(), false, true);
                } else if (now % 3 == 2) {
                    log.info("basicReject:[手动拒绝消息,重回队列]");
                    // 只能拒绝单条消息,消息唯一标记、是否重回队列
                    // 重回队列后仍旧会被当前消费者再次消费
                    channel.basicReject(envelope.getDeliveryTag(), true);
                }
            }
        };
        // 进入消费状态、手动ACK
        channel.basicConsume("apiQueue1", false, consumer);
    }
}

这种模式在创建连接和通道后就进入等待消费的状态,等待队列推送消息然后进行消费。
手动ACK模式,在成功消费消息后需要手动ACK确认,也可以拒绝当前消息并指定是否重回队列中被再次消费。

RabbitMQ原生API

参考资料

代码地址

  • github:https://github.com/senlinmu1008/spring-boot/tree/master/rabbitmq-native
  • gitee:https://gitee.com/ppbin/spring-boot/tree/master/rabbitmq-native

个人网站

相关标签: RabbitMQ rabbitmq