欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RabbitMQ 入门篇之——五种工作模式

程序员文章站 2022-04-09 23:37:02
目录1.简单队列2.工作模式3.交换机-fanout(pub/sub)模式4.交换机-direct模式5.交换机-topic模式rabbitmq的官网中介绍的工作模式有七种,这里我们只介绍五种我们这里简单介绍下前面五种:导入依赖: com.rabbi...

目录

1.简单队列

2.工作队列

2.1 Round-robin dispatching(轮询调度)

2.2 Fair dispatch(公平调度)

3.交换机-fanout(pub/sub)模式

4.交换机-direct模式

5.交换机-topic模式



rabbitmq的官网中介绍的工作模式有七种,这里我们只介绍五种

我们这里简单介绍下前面五种:

导入依赖:

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

工具类:

package com.cjian;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class RabbitmqUtils {

    public static Connection getConnection() {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置参数
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("vhost_cjian");
        connectionFactory.setUsername("cjian");
        connectionFactory.setPassword("111111");
        //创建连接
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return connection;
    }

}

 

1.简单队列

一个生产者一个消费者

RabbitMQ 入门篇之——五种工作模式

package com.cjian.rabbitmq.simple;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称,如果没有则会去创建该队列
         * durable:是否持久化,当mq重启后,数据还在
         * exclusive:①是否独占,只能有一个消费者监听这个队列②当connection关闭时,是否删除队列  一般设置为false
         * autoDelete:当没有消费者时是否自动删除
         * arguments:参数信息
         */

        channel.queueDeclare("simpleQueueTest", true, false, false, null);
        //发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机名称,简单模式下会使用默认的交换机
         * routingKey:路由名称,如果使用默认的交换机,则路由名称应该和队列名称一样
         * props:配置名信息
         * body:发送的消息的字节数组
         */
        String msg = "Helle RabbitMq~";
        channel.basicPublish("", "simpleQueueTest", null, msg.getBytes());

        //释放资源
        connection.close();
        channel.close();
    }
}

启动后管理台:

RabbitMQ 入门篇之——五种工作模式

消费者:

package com.cjian.rabbitmq.simple;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class Consumer {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("simpleQueueTest", true, false, false, null);
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("simpleQueueTest",true,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

RabbitMQ 入门篇之——五种工作模式

输出:

consumerTag:amq.ctag-89i7XZ4piHud6uco4aCg9A
Exchange:
RoutingKey:simpleQueueTest
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:Helle RabbitMq~

2.工作队列

一个生产者,多个消费者,具有‘负载均衡’的功能

2.1 Round-robin dispatching(轮询调度)

这也是工作队列默认的方式,多个消费者消费消息按序来,与消费者的消费速度无关

RabbitMQ 入门篇之——五种工作模式

package com.cjian.rabbitmq.workqueue;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //创建队列
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称,如果没有则会去创建该队列
         * durable:是否持久化,当mq重启后,数据还在
         * exclusive:①是否独占,只能有一个消费者监听这个队列②当connection关闭时,是否删除队列  一般设置为false
         * autoDelete:当没有消费者时是否自动删除
         * arguments:参数信息
         */

        channel.queueDeclare("WorkQueueTest", true, false, false, null);
        //发送消息
        /**
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机名称,简单模式下会使用默认的交换机
         * routingKey:路由名称,如果使用默认的交换机,则路由名称应该和队列名称一样
         * props:配置名信息
         * body:发送的消息的字节数组
         */
        for (int i = 0; i < 10; i++) {
            String msg = "Helle RabbitMq~ "+i;
            channel.basicPublish("", "WorkQueueTest", null, msg.getBytes());
        }
        //释放资源
        connection.close();
        channel.close();
    }
}

生产者启动后:

RabbitMQ 入门篇之——五种工作模式

消费者1:

package com.cjian.rabbitmq.workqueue;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("WorkQueueTest", true, false, false, null);
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("WorkQueueTest",true,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

消费者2: 

package com.cjian.rabbitmq.workqueue;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("WorkQueueTest", true, false, false, null);
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
 try {
                    System.out.println("消费者2睡眠1s");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("body:"+new String(body));
            }
        };
        channel.basicConsume("WorkQueueTest",true,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

启动两个消费者:

RabbitMQ 入门篇之——五种工作模式

输出:

RabbitMQ 入门篇之——五种工作模式RabbitMQ 入门篇之——五种工作模式

 

2.2 Fair dispatch(公平调度)

公平调度可以确保rabbitmq消费者的可靠性:多个消费者中个别消费者宕机了,依然可以让消息可以得到消费

消费者的可靠性属于rabbitmq的高级特性,后面细说,这里先简单说下公平调度的实现

要实现公平调度,需要利用rabbitmq的ack机制,上面的轮询分发,autoack我们设置的是true,公平调度我这:

1.我们需要把autoack改成false

RabbitMQ 入门篇之——五种工作模式

 

2.指定消费者每次从队列中获取的消息数量:basicQos

3.手动返回ack

RabbitMQ 入门篇之——五种工作模式

完整的demo:

消费者3:模拟消费慢的情况

package com.cjian.rabbitmq.workqueue;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;


public class Consumer3 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("WorkQueueTest", true, false, false, null);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                try {
                    System.out.println("消费者2睡眠1s");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("body:"+new String(body));

                //手动返回ack,表示消费完了
                //basicAck(long deliveryTag, boolean multiple)
                //deliveryTag:该消息的index; multiple:是否批量true:将一次性ack所有小于deliveryTag的消息;确认收到消息。
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        //每次从队列中消费一条消息
        channel.basicQos(1);
        channel.basicConsume("WorkQueueTest",false,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

消费者4:正常消费

package com.cjian.rabbitmq.workqueue;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class Consumer4 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("WorkQueueTest", true, false, false, null);

        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
                //手动返回ack,表示消费完了
                //basicAck(long deliveryTag, boolean multiple)
                //deliveryTag:该消息的index;
                //multiple:是否批量true:将一次性ack所有小于deliveryTag的消息;确认收到消息。
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        //每次从队列中消费一条消息,直到它处理完了并且返回了前一个消息的通知标志(acknowledged),显式调用basicAck
        channel.basicQos(1);
        channel.basicConsume("WorkQueueTest",false,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

 输出结果:

RabbitMQ 入门篇之——五种工作模式RabbitMQ 入门篇之——五种工作模式

 

本来打算模拟消费者3消费消息的时候出现了异常,验证一下消费者消费消息的可靠性,但是在验证的过程中发现了一个有趣的现象,也重现了上家公司的一个问题,而且可能篇幅较长,涉及到的东西也有点多了,放到下篇博文吧

 

3.交换机-fanout(pub/sub)模式

每个消费者都能获取到同样的消息

RabbitMQ 入门篇之——五种工作模式

package com.cjian.rabbitmq.pubsub;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqUtils.getConnection();
        //创建Channel
        Channel channel = connection.createChannel();
        //创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * exchange:交换机名称
         * type:交换机的类型
         *     DIRECT("direct"),定向
         *     FANOUT("fanout"),扇形(广播),发送到消息到每一个与之绑定的队列
         *     TOPIC("topic"),通配符
         *     HEADERS("headers");参数匹配,用得少
         *
         * durable:是否持久化
         * autoDelete:自动删除
         * internal:内部使用  一般false
         * arguments:参数
         */
        String exchangeName = "exchange_fanout";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //创建队列
        String queueName1 = "exchange_fanout_queue1";
        String queueName2 = "exchange_fanout_queue2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * queue:队列名称
         * exchange交换机名称
         * routingKey:路由键(如果交换机的类型为fanout,routingKey设置为空)
         */
        channel.queueBind(queueName1,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");

        //发送消息
        String msg = "Hello rabbitmq~";
        channel.basicPublish(exchangeName, "", null, msg.getBytes());

        channel.close();
        connection.close();


    }
}

启动生产者后:

RabbitMQ 入门篇之——五种工作模式 RabbitMQ 入门篇之——五种工作模式

消费者1:

package com.cjian.rabbitmq.pubsub;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        String queueName1 = "exchange_fanout_queue1";
        channel.basicConsume(queueName1,true,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

消费者2: 

package com.cjian.rabbitmq.pubsub;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        String queueName2 = "exchange_fanout_queue2";
        channel.basicConsume(queueName2,true,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

RabbitMQ 入门篇之——五种工作模式

输出:

RabbitMQ 入门篇之——五种工作模式RabbitMQ 入门篇之——五种工作模式

4.交换机-direct模式

每个队列只能消费指定格式(routingkey)的消息

RabbitMQ 入门篇之——五种工作模式

生产者发送一条 路由键为 info的消息

package com.cjian.rabbitmq.direct;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqUtils.getConnection();
        //创建Channel
        Channel channel = connection.createChannel();
        //创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * exchange:交换机名称
         * type:交换机的类型
         *     DIRECT("direct"),定向
         *     FANOUT("fanout"),扇形(广播),发送到消息到每一个与之绑定的队列
         *     TOPIC("topic"),通配符
         *     HEADERS("headers");参数匹配,用得少
         *
         * durable:是否持久化
         * autoDelete:自动删除
         * internal:内部使用  一般false
         * arguments:参数
         */
        String exchangeName = "exchange_direct";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //创建队列
        String queueName1 = "exchange_direct_queue1";
        String queueName2 = "exchange_direct_queue2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * queue:队列名称
         * exchange交换机名称
         * routingKey:路由键(如果交换机的类型为fanout,routingKey设置为空)
         */
        //消费路由键为error 的消息
        channel.queueBind(queueName1,exchangeName,"error");
        //消费路由键为info、error、waring 的消息
        channel.queueBind(queueName2,exchangeName,"info");
        channel.queueBind(queueName2,exchangeName,"error");
        channel.queueBind(queueName2,exchangeName,"waring");

        //发送消息
        String msg = "Hello rabbitmq-info";
        channel.basicPublish(exchangeName, "info", null, msg.getBytes());

        channel.close();
        connection.close();


    }
}

启动生产者后:

RabbitMQ 入门篇之——五种工作模式 RabbitMQ 入门篇之——五种工作模式

消费者1: 

package com.cjian.rabbitmq.direct;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        String queueName1 = "exchange_direct_queue1";
        channel.basicConsume(queueName1,true,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

消费者2: 

package com.cjian.rabbitmq.direct;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        String queueName2 = "exchange_direct_queue2";
        channel.basicConsume(queueName2,true,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

RabbitMQ 入门篇之——五种工作模式

RabbitMQ 入门篇之——五种工作模式

控制台输出:

RabbitMQ 入门篇之——五种工作模式RabbitMQ 入门篇之——五种工作模式

发送一条error的:

RabbitMQ 入门篇之——五种工作模式

控制台输出:

RabbitMQ 入门篇之——五种工作模式RabbitMQ 入门篇之——五种工作模式

5.交换机-topic模式

类似于direct,匹配功能更加强大,具体细节可查看代码中的注释

RabbitMQ 入门篇之——五种工作模式

消费者 

package com.cjian.rabbitmq.topic;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitmqUtils.getConnection();
        //创建Channel
        Channel channel = connection.createChannel();
        //创建交换机
        /**
         * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * exchange:交换机名称
         * type:交换机的类型
         *     DIRECT("direct"),定向
         *     FANOUT("fanout"),扇形(广播),发送到消息到每一个与之绑定的队列
         *     TOPIC("topic"),通配符
         *     HEADERS("headers");参数匹配,用得少
         *
         * durable:是否持久化
         * autoDelete:自动删除
         * internal:内部使用  一般false
         * arguments:参数
         */
        String exchangeName = "exchange_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //创建队列
        String queueName1 = "exchange_topic_queue1";
        String queueName2 = "exchange_topic_queue2";
        channel.queueDeclare(queueName1,true,false,false,null);
        channel.queueDeclare(queueName2,true,false,false,null);
        //绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * queue:队列名称
         * exchange交换机名称
         * routingKey:路由键(如果交换机的类型为fanout,routingKey设置为空)
         */
        //#:匹配0或多个单词  ,*:匹配一个单词
        channel.queueBind(queueName1,exchangeName,"#.error");

        channel.queueBind(queueName2,exchangeName,"order.*");
        channel.queueBind(queueName2,exchangeName,"*.*");

        //发送消息
        String msg = "Hello rabbitmq-order.error1";
        channel.basicPublish(exchangeName, "order.error1", null, msg.getBytes());

        channel.close();
        connection.close();


    }
}

启动后:

RabbitMQ 入门篇之——五种工作模式 RabbitMQ 入门篇之——五种工作模式

消费者1: 消费以error结尾的消息

package com.cjian.rabbitmq.topic;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class Consumer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        String queueName1 = "exchange_topic_queue1";
        channel.basicConsume(queueName1,true,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

消费者2: 消费order开头的或者任意两个单词的

package com.cjian.rabbitmq.topic;

import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class Consumer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqUtils.getConnection();
        Channel channel = connection.createChannel();
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 回调方法,当收到消息后,会执行该方法
             * @param consumerTag  标识
             * @param envelope  获取一些信息:交换机,路由key等
             * @param properties 配置信息
             * @param body  数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        };
        String queueName2 = "exchange_topic_queue2";
        channel.basicConsume(queueName2,true,defaultConsumer);

        //消费者因为需要一直监听,所以不需要关闭资源
    }
}

RabbitMQ 入门篇之——五种工作模式

粘过来好看点:

RabbitMQ 入门篇之——五种工作模式

控制台:

RabbitMQ 入门篇之——五种工作模式RabbitMQ 入门篇之——五种工作模式

 

后面的就不一一验证了,重点也不在这,后面分析rabbitmq的一些高级特性

本文地址:https://blog.csdn.net/cj_eryue/article/details/112850634