欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ工作队列

程序员文章站 2022-05-17 08:48:25
...
如果一个生产者对应多个消费者就代表一个工作队列,工作队列最大的特点在于,一个生产者对应多个消费者
队列会自动进行负载均衡处理,每个消费者消费的信息均衡

模型实例
RabbitMQ工作队列

此时候需要建立3个消费者和1个生产者进行测试

(1)生产者

public class MessageProducer {
    //RabbitMQ服务所在地址
    public final static String HOST="192.168.74.142";
    //RabbitMQ端口
    public final static int PORT=5672;
    //RabbitMQ登陆用户名
    public final static String USERNAME="sjw";
    //RabbitMQ登陆密码
    public final static String PASSWORD="123";
    //队列名称
    public final static String QUEUE_NAME="sjw.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        //获取连接
        Connection connection=factory.newConnection();
        //获取信道,可以有多个信道
        Channel channel=connection.createChannel();
        //信道指定队列已经队列设置
        //queueDeclare(名字,是否持久化,独占的queue, 不使用时是否自动删除,其他参数)
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        //在开始前获取一下当前时间,方便统计消息全部进入队列所需的时间
        long start=System.currentTimeMillis();
        for (int i=0;i<1000;i++){
            String message="sjw"+i;
            //basicPublish(exchange,队列名称,属性,参数.getbyte())
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        }
        //结束时间
        long end=System.currentTimeMillis();
        //输出所需时间
        System.out.println("进入队列总共耗时:"+(end-start));
        //关闭信道
        channel.close();
        //关闭连接
        connection.close();
    }
}

(2)消费者

public class MessageConsumerA {
    //RabbitMQ服务所在地址
    public final static String HOST="192.168.74.142";
    //RabbitMQ端口
    public final static int PORT=5672;
    //RabbitMQ登陆用户名
    public final static String USERNAME="sjw";
    //RabbitMQ登陆密码
    public final static String PASSWORD="123";
    //队列名称
    public final static String QUEUE_NAME="sjw.queue";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        //获取连接
        Connection connection = factory.newConnection();
        //获取信道,可以有多个信道
        Channel channel = connection.createChannel();
        //信道设置,必须与要对应接收的队列设置一模一样,有差别则无法接收你想要的信道
        channel.queueDeclare(QUEUE_NAME, true, false, true, null);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //信道交给consumer进行内容接收处理
        channel.basicConsume(QUEUE_NAME,consumer);
        while (true) {  //消费者程序运行开着 如果生产者新增了数据会自动获取
            // nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[消费者A:]"+"[消息]" + message);
        }

    }
}

启动ABC三个消费者程序,最后启动生产者,然后查看消费者程序控制台
(1)消费者A
RabbitMQ工作队列
(2)消费者B
RabbitMQ工作队列
(3)消费者C
RabbitMQ工作队列

发现工作队列可以进行负载均衡消费

相关标签: RabbitMQ