欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

用Docker 构建RabbitMQ shovel(动态)

程序员文章站 2022-07-13 15:03:33
...

<div class="iteye-blog-content-contain" style="font-size: 14px">

在有了构建RabbitMQ 静态Shovel的经验,构建动态Shevol有显得容易多了

首先和构建静态Shovel一样,先运行两个RabbitMQ节点rabbitmq_a和rabbitmq_b,然后给rabbitmq_a设置shevol:

docker exec rabbitmq_a bash -c "rabbitmqctl set_parameter shovel my-shovel '{\"src-uri\": \"amqp://guest:guest@172.17.0.2:5672/%2f\", \"src-queue\": \"my-queue\", \"dest-uri\": \"amqp://guest:guest@172.17.0.3:5672/%2f\", \"dest-queue\": \"another-queue\", \"ack-mode\": \"on-confirm\"}'"

注意需要用转义符"\"

Consumer类

public static void main( String[] args ) throws IOException {
        init();
        boolean autoAck = false;
        channel.basicConsume("another-queue", autoAck, "myConsumerTag",
                new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body)
                            throws IOException
                    {
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        LOGGER.info("routingKey is:" + routingKey);
                        LOGGER.info("contentType is:" + contentType);
                        long deliveryTag = envelope.getDeliveryTag();
                        // (process the message components here ...)
                        LOGGER.info("content is:" + new String(body,"UTF-8"));
                        channel.basicAck(deliveryTag, false);
                    }
                });
    }

    public static void init() {
        factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        factory.setHost("172.17.0.3");
        factory.setPort(5672);
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

 Producer

@Test
    public void direct() throws IOException {
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().contentType("text/plain").build();
        byte[] messageBodyBytes;
        for (int i = 0; i < 3; i++) {
            messageBodyBytes = ("Dynamic Shovel Test! No." + i).getBytes();
            channel.basicPublish("", "my-queue", properties, messageBodyBytes);
        }
    }

 

</div>

 

相关标签: RabbitMQ Shovel