欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

荐 Maven写rabbitMQ教程02

程序员文章站 2023-11-10 12:17:34
上次我们已经说过了rabbitMQ的三种模式:①简单模式Hello World②工作队列模式Work Queue③发布/订阅模式Publish/Subscribe有意向的可以回顾下:Maven写ribbitMQ教程01废话不多说,直入正题接下来我们再来学习两种模式一丶Routing路由工作模式路由模式:每个消费者监听自己的队列,并且设置routingkey。生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。代码:providucer声明e...

上次我们已经说过了rabbitMQ的三种模式:

①简单模式Hello World

②工作队列模式Work Queue

③发布/订阅模式Publish/Subscribe

有意向的可以回顾下:
Maven写rabbitMQ教程01


废话不多说,直入正题
接下来我们再来学习两种模式


一丶Routing路由工作模式

荐
                                                        Maven写rabbitMQ教程02

路由模式:

  1. 每个消费者监听自己的队列,并且设置routingkey。
  2. 生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。

代码:

providucer

  • 声明exchange_routing_inform交换机。
  • 声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
  • 发送消息时需要指定routingkey
public class providucer01 {
    //    private static final String QUEUE = "helloworld";
    private static final String QUEUE_EMAIL = "queue_email";
    private static final String QUEUE_SMS = "queue_sms";
    //选择交换机类型为direct,后面同上
    private static final String EXCHANGEE_DIRECT = "exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建connectionfactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");//设置
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
        //因为最后需要关闭连接所以定义到外面
        Connection connection = null;
        Channel channel = null;
        try {
            //通过connectionfactory获取connection
            connection = connectionFactory.newConnection();
            //获取Exchange通道
            channel = connection.createChannel();
            //声明交换机 String exchange, BuiltinExchangeType type
            /**
             *参数明细
             *1、交换机名称
             *2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(EXCHANGEE_DIRECT, BuiltinExchangeType.DIRECT);

            /**
             *声明队列,如果Rabbit中没有此队列将自动创建
             *param1:队列名称
             *param2:是否持久化
             *param3:队列是否独占此连接
             *param4:队列不再使用时是否自动删除此队列
             *param5:队列参数
             */
            channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_SMS, true, false, false, null);
            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             *参数明细
             *1、队列名称
             *2、交换机名称
             *3、路由key
             */
            channel.queueBind(QUEUE_EMAIL,EXCHANGEE_DIRECT,QUEUE_EMAIL);
            channel.queueBind(QUEUE_SMS,EXCHANGEE_DIRECT,QUEUE_SMS);
            //自定义消息

            /**
             *消息发布方法
             *param1:Exchange的名称,如果没有指定,则使用Default Exchange
             *param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
             *param3:消息包含的属性
             *param4:消息体
             */
            for (int i = 0; i < 5; i++) {
                String massage = "helloMQ!第"+i+"次";
                //我在这里判断单数发给sms,双数发给email,比较容易发现区别
                if(i%2 == 0) {
                    channel.basicPublish(EXCHANGEE_DIRECT, QUEUE_EMAIL, null, massage.getBytes("utf-8"));
                }else{
                channel.basicPublish(EXCHANGEE_DIRECT, QUEUE_SMS, null, massage.getBytes("utf-8"));}
                System.out.println("send massage: ----" + massage);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {

            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

customer:

public class customer01 {
    private static final String QUEUE_EMAIL = "queue_email";
    //选择路由模式direct
    private static final String EXCHANGEE_DIRECT = "exchange_direct";


    public static void main(String[] args) throws IOException, TimeoutException {
        //创建connectionfactory
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在服务器的ip和端口
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        //获取connection
        Connection connection = factory.newConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明交换机 String exchange, BuiltinExchangeType type
        /**
         *参数明细
         *1、交换机名称
         *2、交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(EXCHANGEE_DIRECT,BuiltinExchangeType.DIRECT);
        //声明队列
        channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);
        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             *消费者接收消息调用此方法
             *@param consumerTag 消费者的标签,在channel.basicConsume()去指定
             *@param  envelope  消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             *@param properties
             *@param body
             *@throws IOException
             **/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body, "utf-8");
                System.out.println("receive message:" + msg);
            }
        };
        /**
         *监听队列String queue, boolean autoAck,Consumer callback
         *参数明细
         *1、队列名称
         *2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置  为false则需要手动回复
         *3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_EMAIL, false, consumer);
    }
}

customer02跟01代码一样,只需要在声明队列时改掉队列名称

//把customer01的属性改为这个
private static final String QUEUE_SMS = "queue_sms";

//声明队列时也要改下
//声明队列
        channel.queueDeclare(QUEUE_SMS, true, false, false, null);

改完之后就可以执行了
荐
                                                        Maven写rabbitMQ教程02荐
                                                        Maven写rabbitMQ教程02荐
                                                        Maven写rabbitMQ教程02
可以看到我们的提供者只发送了一次消息,两个消费者都通过Key接收到了自己需要的消息。

打开RabbitMQ的管理界面,观察交换机绑定情况:
荐
                                                        Maven写rabbitMQ教程02
使用生产者发送若干条消息,交换机根据routingkey转发消息到指定的队列。


二丶Topics通配符工作模式

荐
                                                        Maven写rabbitMQ教程02
Topics路由模式:
1、每个消费者监听自己的队列,并且设置带通配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

代码

providucer:

public class providucer01 {
    private static final String QUEUE_EMAIL = "queue_email";
    private static final String QUEUE_SMS = "queue_sms";
    private static final String EXCHANGEE_TOPIC = "exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建connectionfactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");//设置
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务
        //因为最后需要关闭连接所以定义到外面
        Connection connection = null;
        Channel channel = null;
        try {
            //通过connectionfactory获取connection
            connection = connectionFactory.newConnection();
            //获取Exchange通道
            channel = connection.createChannel();
            //声明交换机 String exchange, BuiltinExchangeType type
            /**
             *参数明细
             *1、交换机名称
             *2、交换机类型,fanout、topic、direct、headers
             */
            channel.exchangeDeclare(EXCHANGEE_TOPIC, BuiltinExchangeType.TOPIC);

            /**
             *声明队列,如果Rabbit中没有此队列将自动创建
             *param1:队列名称
             *param2:是否持久化
             *param3:队列是否独占此连接
             *param4:队列不再使用时是否自动删除此队列
             *param5:队列参数
             */
            channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_SMS, true, false, false, null);
            //交换机和队列绑定String queue, String exchange, String routingKey
            /**
             *参数明细
             *1、队列名称
             *2、交换机名称
             *3、路由key
             channel.queueBind(QUEUE_EMAIL, EXCHANGEE_TOPIC, QUEUE_EMAIL);
             channel.queueBind(QUEUE_SMS, EXCHANGEE_TOPIC, QUEUE_SMS);*/
            //自定义消息

            /**
             *消息发布方法
             *param1:Exchange的名称,如果没有指定,则使用Default Exchange
             *param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
             *param3:消息包含的属性
             *param4:消息体
             */
            for (int i = 0; i < 5; i++) {
                String massage = "helloMQ!第" + i + "次";
                if (i % 2 == 0 && i < 4) {
                    channel.basicPublish(EXCHANGEE_TOPIC, "inform.email", null, massage.getBytes("utf-8"));
                } else if (i % 2 != 0) {
                    channel.basicPublish(EXCHANGEE_TOPIC, "inform.sms", null, massage.getBytes("utf-8"));
                } else {
                    channel.basicPublish(EXCHANGEE_TOPIC, "inform.email.sms", null, massage.getBytes("utf-8"));
                }
                System.out.println("send massage: ----" + massage);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {

            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

先更改交换机类型为TOPIC,然后判断消息发送;

customer:

public class customer01 {
    private static final String QUEUE_EMAIL = "queue_email";
    private static final String EXCHANGEE_TOPIC = "exchange_topic";


    public static void main(String[] args) throws IOException, TimeoutException {
        //创建connectionfactory
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在服务器的ip和端口
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        //获取connection
        Connection connection = factory.newConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //声明交换机 String exchange, BuiltinExchangeType type
        /**
         *参数明细
         *1、交换机名称
         *2、交换机类型,fanout、topic、direct、headers
         */
        channel.exchangeDeclare(EXCHANGEE_TOPIC, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);
        //绑定队列
        /**
         *参数明细
         *1、队列名称
         *2、交换机名称
         *3、路由key*/
        channel.queueBind(QUEUE_EMAIL, EXCHANGEE_TOPIC, "inform.#.email.#");
        //定义消费方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             *消费者接收消息调用此方法
             *@param consumerTag 消费者的标签,在channel.basicConsume()去指定
             *@param  envelope  消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             *@param properties
             *@param body
             *@throws IOException
             **/
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body, "utf-8");
                System.out.println("receive message:" + msg);
            }
        };
        /**
         *监听队列String queue, boolean autoAck,Consumer callback
         *参数明细
         *1、队列名称
         *2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置  为false则需要手动回复
         *3、消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_EMAIL, false, consumer);
    }
}

customer02同上,只需要修改队列即可;
执行两个customer,后执行providucer;
观察输出结果
providucer:
荐
                                                        Maven写rabbitMQ教程02
customer:

荐
                                                        Maven写rabbitMQ教程02
荐
                                                        Maven写rabbitMQ教程02
发现两个customer把各自对应的消息都接收了,并全部接收第四次的消息,说明通配符生效,执行成功!

观察交换机:
荐
                                                        Maven写rabbitMQ教程02
发现Topic模式更多加强大,它可以实现Routing、publish/subscirbe模式的功能。

本文地址:https://blog.csdn.net/qq_47387336/article/details/107336616