欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMq Consumer和Producer端使用

程序员文章站 2022-05-23 07:55:46
...

一、Producer端
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "hello_exchange";
        String routingKey = "hello_routingKey";
        channel.exchangeDeclare(exchangeName, "direct", true); //定义exchange
        channel.queueBind(QUEUE_NAME, exchangeName, routingKey); //通过routingKey把queue和exchange绑定

        byte[] messageBodyBytes = "Hello, world!".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); //发布消息

    流控
    RabbitMQ 服务端有自我保护机制——流控,流控分两种:
    1. 全局流控
        当内存/磁盘使用量达到流控阀值,会触发消息发送全局流控。发生流控后该实例所有队列都无法投递消息只能消费消息;
    2. 队列流控
        队列由于生产速率远远超过消费速率,同时造成该队列消息堆积严重会发生队列级别流控。发生流控后当前队列将无法投递消息,只能消费消息。
        当堆积的消息数量下降后,队列流控状态会消失,消息投递将恢复正常。
        所以用户当收到流控报警后要及时查看实例消费者消费能力,通过增加消费者,或者清理可丢弃消息数据等措施尽快减少消息堆积,同时应用需要对因为流控消息投递失败的情况进行处理,消息先放在其他介质或者等待重试等机制避免消息因为投递失败后消息丢失。

二、Consumer端
  • Push模式:客户端被动收数据,Broker会把message轮流发给不同的Consumer,客户端接受消息不可控。即使没有ack,也会继续轮流发送。客户端默认1000大小的队列VariableLinkedBlockingQueue收取消息,然后Runtime.getRuntime().availableProcessors() * 2大小的固定大小ThreadPoolExecutor消费消息。
  • Pull模式:客户端主动收数据,客户端接受消息可控。无论有没有ack,还是会继续收到message。
  • 相同点:都是长链接

    Push模式
        即Consumer订阅queue,当有message发布到queue时,会马上push到Consumer。
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [y] Received '" + message + "'");
                this.getChannel().basicAck(envelope.getDeliveryTag(), false); //手动ack
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);  //第二个参数设置是否自动ack

    Pull模式
        即Consumer主动去queue拉取message。
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        while (true) {
            GetResponse response = channel.basicGet(QUEUE_NAME, false);   //第二个参数设置是否自动ack
            if (response == null) {
                // No message retrieved.
            } else {
                byte[] body = response.getBody();
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                long deliveryTag = response.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag, false); // acknowledge receipt of the message
            }
        }