欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【RabbitMQ】——队列模式(1)

程序员文章站 2022-05-04 15:07:52
...

        MQ:Message Queue,消息队列,是系统和系统之间的通信方法。

        RabbitMQ是MQ的一种实现,较为常用的还有ActiveMQ,Kafka(分布式发布订阅消息系统)等。本篇博客是RabbitMQ的入门,简单的了解一些RabbitMQ的5中队列,分别为:简单队列、work模式、订阅模式、路由模式、通配符模式。

 

环境:windows上安装RabbitMQ


工具类:用来获取连接的,下面所有队列的中获取连接时使用,具体实现如下

public class ConnectionUtil {
 
    publicstatic Connection getConnection() throws Exception {
       //定义连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       //设置服务地址
       factory.setHost("localhost");
       //端口
       factory.setPort(5672);
       //设置账号信息,用户名、密码、vhost
       factory.setVirtualHost("/test");
       factory.setUsername("test");
       factory.setPassword("test");
       // 通过工程获取连接
       Connection connection = factory.newConnection();
       return connection;
    }
 
}

简单队列:

 【RabbitMQ】——队列模式(1)

        

        P:消息生成者

        C:消息消费者

        红色:消息队列

 

  •         最简单的模式,生产者将消息发送到队列,消费者从队列中获取消息。
  •         一条消息只能被消费一次,消费完毕后消息被删除。

  

生产者代码:

public class Send {
 
   private final static String QUEUE_NAME = "test_queue";
 
   public static void main(String[] argv) throws Exception {
       // 获取到连接以及mq通道,ConnectionUtil为封装的工具类
       Connection connection = ConnectionUtil.getConnection();
       // 从连接中创建通道
       Channel channel = connection.createChannel();
 
       // 声明(创建)队列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
        // 消息内容
       String message = "Hello World!";
       channel.basicPublish("", QUEUE_NAME, null,message.getBytes());
       System.out.println(" [x] Sent '" + message + "'");
 
       //关闭通道和连接
       channel.close();
       connection.close();
    }
}


启动完毕后从服务上查询消息队列,如下:

【RabbitMQ】——队列模式(1)

        有一个名为test_queue的队列,消息数量为1。

 

消费者代码:

public class Recv {
 
   private final static String QUEUE_NAME = "test_queue";
 
   public static void main(String[] argv) throws Exception {
 
       // 获取到连接以及mq通道
       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
 
       // 声明队列,要创建的队列已经存在则不再创建
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
       // 定义队列的消费者
       QueueingConsumer consumer = new QueueingConsumer(channel);
       // 监听队列
       channel.basicConsume(QUEUE_NAME, true, consumer);
 
       // 获取消息
       while (true) {
           QueueingConsumer.Delivery delivery = consumer.nextDelivery();
           String message = new String(delivery.getBody());
           System.out.println(" [x] Received '" + message +"'");
       }
    }
}

work模式:

【RabbitMQ】——队列模式(1)

        P:生产者

        C1、C2:消费者

        红色:消息队列

 

  •         一条消息只能被一个消费者消费

 

生产者代码:

public class Send {
 
   private final static String QUEUE_NAME = "test_queue_work";
 
   public static void main(String[] argv) throws Exception {
       // 获取到连接以及mq通道
       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
 
       // 声明队列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
       //发送50条消息
       for (int i = 0; i < 50; i++) {
           // 消息内容
           String message = "" + i;
           channel.basicPublish("", QUEUE_NAME, null,message.getBytes());
           System.out.println(" [x] Sent '" + message + "'");
 
           Thread.sleep(i * 10);
       }
 
       channel.close();
        connection.close();
    }
}

 

消费者1:获取一条消息休眠10ms后,返回确认状态

public class Recv {
 
   private final static String QUEUE_NAME = "test_queue_work";
 
   public static void main(String[] argv) throws Exception {
 
       // 获取到连接以及mq通道
       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
 
       // 声明队列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
       // 同一时刻服务器只会发一条消息给消费者
       //channel.basicQos(1);
 
       // 定义队列的消费者
       QueueingConsumer consumer = new QueueingConsumer(channel);
       // 监听队列,手动返回完成
       channel.basicConsume(QUEUE_NAME, false, consumer);
 
       // 获取消息
       while (true) {
           QueueingConsumer.Delivery delivery = consumer.nextDelivery();
           String message = new String(delivery.getBody());
           System.out.println(" [x] Received '" + message +"'");
           //休眠
           Thread.sleep(10);
           // 返回确认状态
           channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
       }
    }
}


消费者2:获取一条消息休眠1000ms后,返回确认状态

public class Recv2 {
 
   private final static String QUEUE_NAME = "test_queue_work";
 
   public static void main(String[] argv) throws Exception {
 
       // 获取到连接以及mq通道
       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
 
       // 声明队列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
       // 同一时刻服务器只会发一条消息给消费者
       //channel.basicQos(1);
 
       // 定义队列的消费者
       QueueingConsumer consumer = new QueueingConsumer(channel);
       // 监听队列,手动返回完成状态
       channel.basicConsume(QUEUE_NAME, false, consumer);
 
       // 获取消息
       while (true) {
           QueueingConsumer.Delivery delivery = consumer.nextDelivery();
           String message = new String(delivery.getBody());
           System.out.println(" [x] Received '" + message +"'");
           // 休眠1秒
           Thread.sleep(1000);
 
           channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
       }
    }
}


运行结果:

         消费者1和消费者2获得的消息数量相同

         但是这样是不合理的,消费者2获得消息后休眠的时间长,所以它得到的消息数量应该少用消费者1。

 

work模式——能者多劳

         将上面两个消费者中的注解代码 channel.basicQos(1) 解注,这就意味着同一时刻服务器只发一条消息给消费者,消费者返回确认状态后获得新的消息,这样消费者2获得的消息数量就少于消费者1.

 

 

消息确认模式:

自动确认:

         消费者从队列中获取消息,无论消费者获取到消息后是否成功消费,服务端都会认为消息已经被成功消费。

         如下代码中,消费者不需要向服务器反馈

        【RabbitMQ】——队列模式(1)

 

手动确认:

         消费者从队列获得消息,服务器会将该消息标记为不可用状态,等待消费者反馈。如果消费者一直没有反馈消息将一直处于不可用状态。

手动模式消费者需要向服务端反馈状态。

        【RabbitMQ】——队列模式(1)

 

小结:

         文章中的两种MQ模式组成包括生产者、消费者、队列,还有在代码中体现出来的Connection连接、Channel通道。MQ的使用和hibernate、mysql的流程及其相像。首先获得连接,通过连接获得通道,通过通道完成消息队列的声明、消息的发布,对消息的监听等系列工作。