用Docker 构建RabbitMQ shovel(静态)
实现RabbitMQ的远程通信和复制,可以在广域网上使用
1.用Docker启动两个RabbitMQ节点
docker run -d --name=rabbitmqa -p 5772:5672 -p 15772:15672 -e RABBITMQ_NODENAME=rabbitmqa -h rabbitmqa rabbitmq:3.6.9-management
docker run -d --name=rabbitmqb -p 5773:5672 -p 15773:15672 -e RABBITMQ_NODENAME=rabbitmqb -h rabbitmqb rabbitmq:3.6.9-management
2.开启shovel插件
docker exec rabbitmqa bash -c "rabbitmq-plugins enable rabbitmq_shovel"
docker exec rabbitmqa bash -c "rabbitmq-plugins enable rabbitmq_shovel_management"
docker exec rabbitmqb bash -c "rabbitmq-plugins enable rabbitmq_shovel"
docker exec rabbitmqb bash -c "rabbitmq-plugins enable rabbitmq_shovel_management"
3.获得两个RabbitMQ节点的IP
docker inspect --format='{{ .NetworkSettings.IPAddress }}' rabbitmqa
docker inspect --format='{{ .NetworkSettings.IPAddress }}' rabbitmqb
4.编辑rabbitmq.config文件,在附件中
5.替换Docker容器中的rabbitmq.config文件
只需替换source节点的配置文件
docker cp /home/robbie/Downloads/rabbitmq.config 017bf80d9b22:/etc/rabbitmq/rabbitmq.config
6.重启docker容器
docker restart rabbitmqa
docker restart rabbitmqb
在RabbitMQ的管理页面上可以看见两个节点上已经自动创建了对应的queue和exchange
7.在destinations RabbitMQ节点(rabbitmqb)上创建消费者
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5773); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.basicConsume("warehouse_carpinteria",true,"order_processor",new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Message is: " + new String(body)); // channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
8.在source RabbitMQ节点(rabbitmqa)上创建生产者
@Test public void producer() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5772); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("incoming_orders","direct",true); channel.basicPublish("incoming_orders", "warehouse", null, "RabbitMQ shovel test...".getBytes()); channel.close(); connection.close(); }
先运行消费者,然后再执行生产者,可以参考RabbitMQ in action这本书。