欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【RabbitMQ】订阅模式(publish/subscribe)

程序员文章站 2022-06-04 10:33:55
...

订阅模式(publish/subscribe)

【RabbitMQ】订阅模式(publish/subscribe)

模型

X:交换机
解读
1.还是一个生产者,多个消费者
2.但是每一个消费者都有自己的队列
3.生产者没有直接把消息发送到队列,而是发到了交换机或者转发器(exchange)
4.每个队列都要绑定到交换机上
5.生产者发送的消息,经过交换机到达队列,就能实现,一个消息被多个消息者消费

P生产者

Send.class

package com.springrabbitmq.ps;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.springrabbitmq.util.RabbitMQConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Title:订阅模式-生产者
 * @author: fly
 * @date: 2020-3-11
 */
public class Send {

    private static final String EXCHANGE_NAME="test_exchange_fanout";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection= RabbitMQConnectionUtil.getConnection();

        Channel channel=connection.createChannel();

        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分发

        //发送消息
        String msg="hello ps!";

        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());

        System.out.println("Fanout Send"+msg);

        channel.close();
        connection.close();
    }
}

启动后,到RabbitMQ管理后台就能看见该exchang
【RabbitMQ】订阅模式(publish/subscribe)
消息哪去了??——>丢失了!!
因为交换机没有存储的能力,在RabbitMQ里面只有队列有存储能力。现在就去写两个消费者的Queue来绑定交换机,接收存储消息:

C1消费者

Rev1.class

package com.springrabbitmq.ps;

import com.rabbitmq.client.*;
import com.springrabbitmq.util.RabbitMQConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Title:订阅模式-消费者
 * @author: fly
 * @date: 2020-3-11
 */
public class Rev1 {

    private static final String QUEUE_NAME_REV="test_queue_bind_exchange_email";
    private static final String EXCHANGE_NAME="test_exchange_fanout";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection= RabbitMQConnectionUtil.getConnection();

        final Channel channel=connection.createChannel();

        //声明队列
        channel.queueDeclare(QUEUE_NAME_REV,false,false,false,null);//分发

        //绑定队列到交换机/转发器
        channel.queueBind(QUEUE_NAME_REV,EXCHANGE_NAME,"");

        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body, "UTF-8");
                System.out.println("[1]Recv msg" + msg);

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1]Done");
                    //消息应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //手动消息应答
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME_REV,autoAck,defaultConsumer);
    }
}

C2消费者

Rev2.class

package com.springrabbitmq.ps;

import com.rabbitmq.client.*;
import com.springrabbitmq.util.RabbitMQConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Title:订阅模式-消费者
 * @author: fly
 * @date: 2020-3-11
 */
public class Rev2 {

    //一个消费者一个队列
    private static final String QUEUE_NAME_REV="test_queue_bind_exchange_sms";
    private static final String EXCHANGE_NAME="test_exchange_fanout";



    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection= RabbitMQConnectionUtil.getConnection();

        final Channel channel=connection.createChannel();

        //声明队列
        channel.queueDeclare(QUEUE_NAME_REV,false,false,false,null);//分发

        //绑定队列到交换机/转发器
        channel.queueBind(QUEUE_NAME_REV,EXCHANGE_NAME,"");

        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body, "UTF-8");
                System.out.println("[2]Recv msg" + msg);

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2]Done");
                    //消息应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        //手动消息应答
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME_REV,autoAck,defaultConsumer);
    }
}

注意
刚刚启动Send类发送的消息已经丢失!所以先启动两个消费者,再启动Send生产者,就可以得到:
【RabbitMQ】订阅模式(publish/subscribe)
【RabbitMQ】订阅模式(publish/subscribe)
再去RabbitMQ管理界面查看:发现Rabbit MQ自动解绑!!
【RabbitMQ】订阅模式(publish/subscribe)

相关标签: Java#消息中间件