欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java初识RabbitMQ一交换机(topic exchange)

程序员文章站 2022-05-18 14:58:26
...

Java初识RabbitMQ一交换机(topic exchange)

首先看看AMQP协议,对RabbitMQ的架构会更了解。

深入理解AMQP协议

创建一个Maven项目,根据自己服务器RabbitMQ的版本导入相应的包。
Java初识RabbitMQ一交换机(topic exchange)

		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>3.6.5</version>
		</dependency>

主题交换机

前面提到的 direct exchange的路由规则是严格意义上的匹配,换言之 Routing Key 必须与 Binding Key 相匹配的时候才将消息传送给 Queue。

而topic exchange的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送。

它的约定是:

1)binding key 中可以存在两种特殊字符 “*”“#”,用于做模糊匹配,其中 “*” 用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。

2)routing key 为一个句点号 “.” 分隔的字符串(我们将被句号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”,binding key 与 routing key 一样也是句点号 “.” 分隔的字符串。

Java初识RabbitMQ一交换机(topic exchange)
当生产者发送消息 Routing Key=F.C.E 的时候,这时候只满足 Queue1,所以会被路由到 Queue1 中,如果 Routing Key=A.C.E 这时候会被同时路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中。

主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者 / 多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。

生产端

package com.kaven.rabbitmq.exchange.topicExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicProducer {

    // 自己服务器的IP
    private static String ip = "IP";
    // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一个 "/" 的虚拟主机
    private static String virtualHost = "/";

    // topic exchange ,RabbitMQ提供的topic exchange
    private static String exchangeName = "amq.topic";
    // exchange type
    private static String exchangeType= "topic";

    // 交换机路由的routingKey
    private static String[] routingKey = {"test.kaven.wyy" , "test" , "test.topic"};

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 创建连接
        Connection connection = connectionFactory.newConnection();

        // 3 创建Channel
        Channel channel = connection.createChannel();

        // 4 发送消息
        for (int i = 0; i < routingKey.length; i++) {
            String msg = "RabbitMQ:Topic Exchange 发送数据 , routingKey:"+routingKey[i];
            channel.basicPublish(exchangeName ,routingKey[i] ,null, msg.getBytes());
        }

        // 5 关闭连接
        channel.close();
        connection.close();
    }
}

消费端

这里创建两个消费者,为每个消费者创建一个线程,以便监听生产端发送过来的消息。

package com.kaven.rabbitmq.exchange.topicExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicConsumer {

    // 自己服务器的IP
    private static String ip = "IP";
    // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一个 "/" 的虚拟主机
    private static String virtualHost = "/";

    // topic exchange ,RabbitMQ提供的topic exchange
    private static String exchangeName = "amq.topic";
    // exchange type
    private static String exchangeType= "topic";
    // 队列名
    private static String[] queueName = {"queue_#" , "queue_*"};
    // 队列与交换机绑定的routingKey
    private static String[] routingKey = {"test.#" , "test.*"};

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 创建连接
        Connection connection = connectionFactory.newConnection();

        // 3 创建Channel
        Channel channel = connection.createChannel();

        for (int i = 0; i < queueName.length; i++) {
            // 4 定义Queue ,将Queue绑定到direct exchange
            channel.queueDeclare(queueName[i],true , false , false , null);
            channel.queueBind(queueName[i] , exchangeName , routingKey[i]);
        }


        // 5 创建消费者
        QueueingConsumer consumer0 = new QueueingConsumer(channel);
        QueueingConsumer consumer1 = new QueueingConsumer(channel);

        // 6 设置
        channel.basicConsume(queueName[0] , true , consumer0);
        channel.basicConsume(queueName[1] , true , consumer1);

        // 7 接收消息
        Thread thread0 = new Thread(new MyRunnable(consumer0 , routingKey[0]));
        Thread thread1 = new Thread(new MyRunnable(consumer1 , routingKey[1]));

        thread0.start();
        thread1.start();

        thread0.join();
        thread1.join();
    }
}
package com.kaven.rabbitmq.exchange.topicExchange;

import com.rabbitmq.client.QueueingConsumer;

public class MyRunnable implements Runnable {

    private QueueingConsumer consumer;
    private String routingKey;

    public MyRunnable(QueueingConsumer consumer , String routingKey) {
        this.consumer = consumer;
        this.routingKey = routingKey;
    }

    @Override
    public void run() {
        while(true){
            try {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println(routingKey+"[收到]-"+msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

测试

因为这里使用的是RabbitMQ提供给我们的topic exchange,所以我们无需自己定义。
因为交换机已经定义好了,所以无论先启动生产端还是消费端,消费端都可以成功收到消息。
消费端输出如下:

test.#[收到]-RabbitMQ:Topic Exchange 发送数据 , routingKey:test.kaven.wyy
test.*[收到]-RabbitMQ:Topic Exchange 发送数据 , routingKey:test.topic
test.#[收到]-RabbitMQ:Topic Exchange 发送数据 , routingKey:test
test.#[收到]-RabbitMQ:Topic Exchange 发送数据 , routingKey:test.topic

很明显输出结果符合上面对topic exchange路由规则的解释。

看看RabbitMQ Management
Java初识RabbitMQ一交换机(topic exchange)
Java初识RabbitMQ一交换机(topic exchange)
Java初识RabbitMQ一交换机(topic exchange)

相关标签: 消息中间件