欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java初识RabbitMQ一交换机(fanout exchange)

程序员文章站 2022-05-18 14:59:32
...

Java初识RabbitMQ一交换机(fanout exchange)

首先看看AMQP协议,对RabbitMQ的架构会更了解。

深入理解AMQP协议

创建一个Maven项目,根据自己服务器RabbitMQ的版本导入相应的包。
Java初识RabbitMQ一交换机(fanout exchange)

		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>3.6.5</version>
		</dependency>

扇型交换机

扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列。

因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以它的应用案例都极其相似:

  • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件。
  • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端。
  • 分发系统使用它来广播各种状态和配置更新。
  • 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP 没有内置 presence 的概念,因此 XMPP 可能会是个更好的选择)。

扇型交换机图例:
Java初识RabbitMQ一交换机(fanout exchange)
上图所示,生产者(P)生产消息 1 ,将消息 1 推送到 Exchange,由于 Exchange Type=fanout ,这时候会遵循 fanout exchange的路由规则,将消息推送到所有与它绑定的 Queue,也就是图上的两个 Queue, 最后由监听对应Queue的消费者消费 。

生产端

routingKey = "",因为fanout exchange的路由规则不关心routingKey的值(但是不能为空)。

package com.kaven.rabbitmq.exchange.fanoutExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FanoutProducer {

    // 自己服务器的IP
    private static String ip = "IP";
    // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一个 "/" 的虚拟主机
    private static String virtualHost = "/";

    // fanout exchange ,RabbitMQ提供的fanout exchange
    private static String exchangeName = "amq.fanout";
    // exchange type
    private static String exchangeType= "fanout";

    // 交换机路由的routingKey
    private static String routingKey = "";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 创建连接
        Connection connection = connectionFactory.newConnection();

        // 3 创建Channel
        Channel channel = connection.createChannel();

        // 4 发送消息
        String msg = "RabbitMQ:Fanout Exchange 发送数据";
        channel.basicPublish(exchangeName ,routingKey ,null, msg.getBytes());

        // 5 关闭连接
        channel.close();
        connection.close();
    }
}

消费端

routingKey = "test",和生产端的routingKey不一样,主要为了看看fanout exchange的路由规则是否如上面解释的一样,这里只创建一个消费者,大家可以试一试多个消费者的情况,其实是一样的。

package com.kaven.rabbitmq.exchange.fanoutExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FanoutConsumer {

    // 自己服务器的IP
    private static String ip = "IP";
    // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一个 "/" 的虚拟主机
    private static String virtualHost = "/";

    // fanout exchange ,RabbitMQ提供的fanout exchange
    private static String exchangeName = "amq.fanout";
    // exchange type
    private static String exchangeType= "fanout";
    // 队列名
    private static String queueName = "queue";
    // 队列与交换机绑定的routingKey
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 创建连接
        Connection connection = connectionFactory.newConnection();

        // 3 创建Channel
        Channel channel = connection.createChannel();

        // 4 定义Queue ,将Queue绑定到direct exchange
        channel.queueDeclare(queueName,true , false , false , null);
        channel.queueBind(queueName , exchangeName , routingKey);

        // 5 创建消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 6 设置
        channel.basicConsume(queueName , true , consumer);

        // 7 接收消息
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }
    }
}

测试

因为这里使用的是RabbitMQ提供给我们的fanout exchange,所以我们无需自己定义。
因为交换机已经定义好了,所以无论先启动生产端还是消费端,消费端都可以成功收到消息。
消费端输出如下:

RabbitMQ:Fanout Exchange 发送数据

看看RabbitMQ Management
Java初识RabbitMQ一交换机(fanout exchange)

相关标签: 消息中间件