欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ的5种模式,并使用java进行模拟操作

程序员文章站 2022-03-15 20:59:26
2. RabbitMQ的5种工作模式首先把官网介绍放在这里:https://www.rabbitmq.com/getstarted.html然后就开始记录一下学习到的吧!多多指教2.1 简单模式就是将一个生产者绑定到一个消息队列,然后消费者从这个消息队列中取消息进行消费在生产者与队列之间,使用的有默认的交换机流程:生产者:新建Connection工厂 ,需要设置一些属性(ConnectionFactory)使用Connection工厂获取Connection对象使用Connect...

2. RabbitMQ的5种工作模式

首先把官网介绍放在这里:https://www.rabbitmq.com/getstarted.html

然后就开始记录一下学习到的吧!多多指教

2.1 简单模式

RabbitMQ的5种模式,并使用java进行模拟操作

  1. 就是将一个生产者绑定到一个消息队列,然后消费者从这个消息队列中取消息进行消费
  2. 在生产者与队列之间,使用的有默认的交换机

流程

生产者:

  1. 新建Connection工厂 ,需要设置一些属性(ConnectionFactory)
  2. 使用Connection工厂获取Connection对象
  3. 使用Connection对象获取Channel(信道)对象
  4. 通过Channel声明一个消息队列
  5. 发送消息
  6. 关闭连接资源

代码实现

public class HelloProduct {
    public static void main(String[] args)  throws  Exception{
         //1.得到ConnectionFactory
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setHost("localhost");//默认就是localhost 可以不加该代码
        connectionFactory.setPort(5672);//默认就是5672
        // 这里说明一下,这个路径,还有账户密码,是自己提前建好的
        connectionFactory.setVirtualHost("/lh");//默认是 /
        connectionFactory.setUsername("ldh");//默认guest
        connectionFactory.setPassword("ldh"); //默认guest
        //2. 获取Connection
        Connection connection=connectionFactory.newConnection();
        //3.获取Channel
        Channel channel = connection.createChannel();
        //4.创建队列
        /**
         * queue(队列的名称),  
         * durable(是否持久化),  
         * exclusive (是否是独享的,表示被该信道独享),
         * autoDelete (是否自动删除 如果没有消费者哪个该队列就会自动删除),
         * Map<String, Object> arguments: 队列的参数设置
         */
        channel.queueDeclare("hello-queue",false,false,false,null);
        // 5.发送消息,这里使用四个参数的方法
        //exchange交换机的名称, 如果你使用的为Hello模式 那么交换机的名称为”“
        //routingKey:路由key    如果你使用的为Hello模式 那么路由key必须为队列的名称
        //props: 消息的属性信息  null
        //body: 表示消息内容
        String msg = "今天第一天学习RabbitMQ所以速度很慢:"+i;
        channel.basicPublish("", "hello-queue", null, msg.getBytes());
		// 关闭资源
        channel.close();
        connection.close();
    }
}

消费者:

  1. 新建Connection工厂 ,需要设置一些属性(ConnectionFactory)
  2. 使用Connection工厂获取Connection对象
  3. 使用Connection对象获取Channel(信道)对象
  4. 获取信道中的消息并通过回调函数进行处理

代码实现

public class HelloConsumer {
    public static void main(String[] args) throws Exception {
        //1.得到ConnectionFactory
        ConnectionFactory connectionFactory=new ConnectionFactory();
        connectionFactory.setVirtualHost("/lh");//默认 /
        connectionFactory.setUsername("ldh");//默认guest
        connectionFactory.setPassword("ldh"); //默认guest
        //2. 获取Connection
        Connection connection=connectionFactory.newConnection();
        //3.获取Channel
        Channel channel = connection.createChannel();
        //4. 消费消息
        //String queue, Consumer callback
        //queue:队列的名称
        //callback: 回调函数。
        //boolean autoAck:是否自动回复
        Consumer consumer=new DefaultConsumer(channel){
            //当获取到消息后触发该方法的执行
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        };
        // 接收消息
        channel.basicConsume("hello-queue",true,consumer);
    }
}

1.2 工作模式

RabbitMQ的5种模式,并使用java进行模拟操作
跟上边的模式相差不大,只是增加了一个消费者。
只需要将上边的消费者代码复制一份即可,这里不在多说了。

1.3 广播模式(fanout)

RabbitMQ的5种模式,并使用java进行模拟操作
这个模式和简单模式有一些不同的地方:

  1. 这个模式需指定使用 fanout 类型的交换机

除此之外其他大同小异,这里写一下流程和代码实现

流程

生产者:

  1. 新建ConnctionFactory
  2. 通过ConnectionFactory获取Connection
  3. 通过Connection对象获取Channel
  4. 通过Channel对象定义交换机,指定类型为fanout
  5. 通过Channel对象定义消息队列
  6. 通过Channel对象对交换机和队列进行绑定
  7. 发送消息
  8. 关闭资源

代码

public class PublishProduct {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setUsername("ldh");
        factory.setPassword("ldh");
        factory.setVirtualHost("/lh");
        
        Connection connection=factory.newConnection();
        Channel channel = connection.createChannel();

        //创建交换机
        /**
         * String exchange: 交换机的名称
         * BuiltinExchangeType type: 交换机的类型
         * 		fanout:广播模式:只要是绑定到交换机上的队列都可以收到消息
         * 		direct:路由模式:  只要符合指定路由key的才可以收到消息
         *		topic:主题模式;  只要符合指定路由key的才可以收到消息
         * boolean durable, 是否持久化
         * boolean autoDelete, 是否自动删除
         * Map<String, Object> arguments 参数
         */
        channel.exchangeDeclare("fanout-exchange", BuiltinExchangeType.FANOUT,false,true,null);

		// 定义消息队列
        channel.queueDeclare("fanout_queue1",false,false,false,null);
        channel.queueDeclare("fanout_queue2",false,false,false,null);

        //队列和交互机进行绑定
        channel.queueBind("fanout_queue1","fanout-exchange","");
        channel.queueBind("fanout_queue2","fanout-exchange","");
        for(int i=0;i<10;i++) {
            channel.basicPublish("fanout-exchange", "", null, (i+"hello pulish exchange").getBytes());
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}

消费者只需要改变一下绑定的队列即可,其他不需要改变。这里不在给出代码。

1.4 路由模式 (routing)

RabbitMQ的5种模式,并使用java进行模拟操作
通过图可以看出,这个模式相较于广播模式而言,在交换机和队列消息的发送上多了一些控制。广播模式中,交换机发送一条消息,所有的绑定的队列都可以接收到。而在这种模式下,交换机发送一条消息,只有满足条件的队列,交换机才会发送。

在代码层面,相较于广播模式,发生了一些改变:

  1. 需要指定交换机模式为 direct 模式
  2. 在交换机和队列绑定时需要指定 routingkey,不能在是 空字符串了
    同一个队列要绑定多个条件时,需要多次绑定
  3. 发送消息的时候需要带上 routingkey。

这里只列出不同的代码:

// 通过 Channel 对象声明交换机,类型为 direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false,true,null);

// 声明队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);

// 队列和交换机进行绑定
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"debug");

// 发送消息
channel.basicPublish(EXCHANGE_NAME, "info", null, (i+"hello pulish exchange").getBytes());

1.5 通配符模式(topics)

RabbitMQ的5种模式,并使用java进行模拟操作
从图中可以看出,通配符模式与路由模式十分相似,不同的地方在于:
交换机与队列的 “路由key” 形式发生了变化。 使用方法与路由模式大同小异
* 和 # 的区别,我在这里没有感受到有什么不同, 都是表示匹配0个或多个字符

与路由模式的不同:

  1. 需要指定交换机模式为 topic 模式
  2. 在于队列进行绑定的时候需要指定 routingKey 的规则
  3. 在发送消息时,需要指定 routingKey

核心代码

// 4. 声明使用的交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);
// 5. 新建队列
channel.queueDeclare(queueName1, false, false, false, null);
channel.queueDeclare(queueName2, false, false, false, null);
// 6. 绑定交换机和队列
channel.queueBind(queueName1, exchangeName, "*.orange");
channel.queueBind(queueName2, exchangeName, "*.*.rabbite");
channel.queueBind(queueName2, exchangeName, "lazy.#");
// 7. 发送消息给队列
String msg = "这是通过topic模式发送的消息";
channel.basicPublish(exchangeName, "abc.orange", null, msg.getBytes());

记录结束!

本文地址:https://blog.csdn.net/weixin_43852058/article/details/110251496