用Docker 构建RabbitMQ shovel(动态)
程序员文章站
2022-07-13 15:03:33
...
<div class="iteye-blog-content-contain" style="font-size: 14px">
在有了构建RabbitMQ 静态Shovel的经验,构建动态Shevol有显得容易多了
首先和构建静态Shovel一样,先运行两个RabbitMQ节点rabbitmq_a和rabbitmq_b,然后给rabbitmq_a设置shevol:
docker exec rabbitmq_a bash -c "rabbitmqctl set_parameter shovel my-shovel '{\"src-uri\": \"amqp://guest:guest@172.17.0.2:5672/%2f\", \"src-queue\": \"my-queue\", \"dest-uri\": \"amqp://guest:guest@172.17.0.3:5672/%2f\", \"dest-queue\": \"another-queue\", \"ack-mode\": \"on-confirm\"}'"
注意需要用转义符"\"
Consumer类
public static void main( String[] args ) throws IOException { init(); boolean autoAck = false; channel.basicConsume("another-queue", autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); LOGGER.info("routingKey is:" + routingKey); LOGGER.info("contentType is:" + contentType); long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) LOGGER.info("content is:" + new String(body,"UTF-8")); channel.basicAck(deliveryTag, false); } }); } public static void init() { factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("172.17.0.3"); factory.setPort(5672); try { connection = factory.newConnection(); channel = connection.createChannel(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }
Producer
@Test public void direct() throws IOException { AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().contentType("text/plain").build(); byte[] messageBodyBytes; for (int i = 0; i < 3; i++) { messageBodyBytes = ("Dynamic Shovel Test! No." + i).getBytes(); channel.basicPublish("", "my-queue", properties, messageBodyBytes); } }
</div>
上一篇: RabbitMQ知多少
下一篇: spring+websocket的使用