RabbitMQ 入门篇之——五种工作模式
目录
2.1 Round-robin dispatching(轮询调度)
rabbitmq的官网中介绍的工作模式有七种,这里我们只介绍五种
我们这里简单介绍下前面五种:
导入依赖:
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
工具类:
package com.cjian;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitmqUtils {
public static Connection getConnection() {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置参数
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("vhost_cjian");
connectionFactory.setUsername("cjian");
connectionFactory.setPassword("111111");
//创建连接
Connection connection = null;
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return connection;
}
}
1.简单队列
一个生产者一个消费者
package com.cjian.rabbitmq.simple;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue:队列名称,如果没有则会去创建该队列
* durable:是否持久化,当mq重启后,数据还在
* exclusive:①是否独占,只能有一个消费者监听这个队列②当connection关闭时,是否删除队列 一般设置为false
* autoDelete:当没有消费者时是否自动删除
* arguments:参数信息
*/
channel.queueDeclare("simpleQueueTest", true, false, false, null);
//发送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* exchange:交换机名称,简单模式下会使用默认的交换机
* routingKey:路由名称,如果使用默认的交换机,则路由名称应该和队列名称一样
* props:配置名信息
* body:发送的消息的字节数组
*/
String msg = "Helle RabbitMq~";
channel.basicPublish("", "simpleQueueTest", null, msg.getBytes());
//释放资源
connection.close();
channel.close();
}
}
启动后管理台:
消费者:
package com.cjian.rabbitmq.simple;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("simpleQueueTest", true, false, false, null);
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("simpleQueueTest",true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
输出:
consumerTag:amq.ctag-89i7XZ4piHud6uco4aCg9A
Exchange:
RoutingKey:simpleQueueTest
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:Helle RabbitMq~
2.工作队列
一个生产者,多个消费者,具有‘负载均衡’的功能
2.1 Round-robin dispatching(轮询调度)
这也是工作队列默认的方式,多个消费者消费消息按序来,与消费者的消费速度无关
package com.cjian.rabbitmq.workqueue;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue:队列名称,如果没有则会去创建该队列
* durable:是否持久化,当mq重启后,数据还在
* exclusive:①是否独占,只能有一个消费者监听这个队列②当connection关闭时,是否删除队列 一般设置为false
* autoDelete:当没有消费者时是否自动删除
* arguments:参数信息
*/
channel.queueDeclare("WorkQueueTest", true, false, false, null);
//发送消息
/**
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* exchange:交换机名称,简单模式下会使用默认的交换机
* routingKey:路由名称,如果使用默认的交换机,则路由名称应该和队列名称一样
* props:配置名信息
* body:发送的消息的字节数组
*/
for (int i = 0; i < 10; i++) {
String msg = "Helle RabbitMq~ "+i;
channel.basicPublish("", "WorkQueueTest", null, msg.getBytes());
}
//释放资源
connection.close();
channel.close();
}
}
生产者启动后:
消费者1:
package com.cjian.rabbitmq.workqueue;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("WorkQueueTest", true, false, false, null);
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("WorkQueueTest",true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
消费者2:
package com.cjian.rabbitmq.workqueue;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("WorkQueueTest", true, false, false, null);
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
System.out.println("消费者2睡眠1s");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("WorkQueueTest",true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
启动两个消费者:
输出:
2.2 Fair dispatch(公平调度)
公平调度可以确保rabbitmq消费者的可靠性:多个消费者中个别消费者宕机了,依然可以让消息可以得到消费
消费者的可靠性属于rabbitmq的高级特性,后面细说,这里先简单说下公平调度的实现
要实现公平调度,需要利用rabbitmq的ack机制,上面的轮询分发,autoack我们设置的是true,公平调度我这:
1.我们需要把autoack改成false
2.指定消费者每次从队列中获取的消息数量:basicQos
3.手动返回ack
完整的demo:
消费者3:模拟消费慢的情况
package com.cjian.rabbitmq.workqueue;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer3 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("WorkQueueTest", true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
try {
System.out.println("消费者2睡眠1s");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("body:"+new String(body));
//手动返回ack,表示消费完了
//basicAck(long deliveryTag, boolean multiple)
//deliveryTag:该消息的index; multiple:是否批量true:将一次性ack所有小于deliveryTag的消息;确认收到消息。
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
//每次从队列中消费一条消息
channel.basicQos(1);
channel.basicConsume("WorkQueueTest",false,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
消费者4:正常消费
package com.cjian.rabbitmq.workqueue;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer4 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("WorkQueueTest", true, false, false, null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
//手动返回ack,表示消费完了
//basicAck(long deliveryTag, boolean multiple)
//deliveryTag:该消息的index;
//multiple:是否批量true:将一次性ack所有小于deliveryTag的消息;确认收到消息。
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
//每次从队列中消费一条消息,直到它处理完了并且返回了前一个消息的通知标志(acknowledged),显式调用basicAck
channel.basicQos(1);
channel.basicConsume("WorkQueueTest",false,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
输出结果:
本来打算模拟消费者3消费消息的时候出现了异常,验证一下消费者消费消息的可靠性,但是在验证的过程中发现了一个有趣的现象,也重现了上家公司的一个问题,而且可能篇幅较长,涉及到的东西也有点多了,放到下篇博文吧
3.交换机-fanout(pub/sub)模式
每个消费者都能获取到同样的消息
package com.cjian.rabbitmq.pubsub;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* exchange:交换机名称
* type:交换机的类型
* DIRECT("direct"),定向
* FANOUT("fanout"),扇形(广播),发送到消息到每一个与之绑定的队列
* TOPIC("topic"),通配符
* HEADERS("headers");参数匹配,用得少
*
* durable:是否持久化
* autoDelete:自动删除
* internal:内部使用 一般false
* arguments:参数
*/
String exchangeName = "exchange_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//创建队列
String queueName1 = "exchange_fanout_queue1";
String queueName2 = "exchange_fanout_queue2";
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//绑定队列和交换机
/**
* queueBind(String queue, String exchange, String routingKey)
* queue:队列名称
* exchange交换机名称
* routingKey:路由键(如果交换机的类型为fanout,routingKey设置为空)
*/
channel.queueBind(queueName1,exchangeName,"");
channel.queueBind(queueName2,exchangeName,"");
//发送消息
String msg = "Hello rabbitmq~";
channel.basicPublish(exchangeName, "", null, msg.getBytes());
channel.close();
connection.close();
}
}
启动生产者后:
消费者1:
package com.cjian.rabbitmq.pubsub;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName1 = "exchange_fanout_queue1";
channel.basicConsume(queueName1,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
消费者2:
package com.cjian.rabbitmq.pubsub;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName2 = "exchange_fanout_queue2";
channel.basicConsume(queueName2,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
输出:
4.交换机-direct模式
每个队列只能消费指定格式(routingkey)的消息
生产者发送一条 路由键为 info的消息
package com.cjian.rabbitmq.direct;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* exchange:交换机名称
* type:交换机的类型
* DIRECT("direct"),定向
* FANOUT("fanout"),扇形(广播),发送到消息到每一个与之绑定的队列
* TOPIC("topic"),通配符
* HEADERS("headers");参数匹配,用得少
*
* durable:是否持久化
* autoDelete:自动删除
* internal:内部使用 一般false
* arguments:参数
*/
String exchangeName = "exchange_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//创建队列
String queueName1 = "exchange_direct_queue1";
String queueName2 = "exchange_direct_queue2";
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//绑定队列和交换机
/**
* queueBind(String queue, String exchange, String routingKey)
* queue:队列名称
* exchange交换机名称
* routingKey:路由键(如果交换机的类型为fanout,routingKey设置为空)
*/
//消费路由键为error 的消息
channel.queueBind(queueName1,exchangeName,"error");
//消费路由键为info、error、waring 的消息
channel.queueBind(queueName2,exchangeName,"info");
channel.queueBind(queueName2,exchangeName,"error");
channel.queueBind(queueName2,exchangeName,"waring");
//发送消息
String msg = "Hello rabbitmq-info";
channel.basicPublish(exchangeName, "info", null, msg.getBytes());
channel.close();
connection.close();
}
}
启动生产者后:
消费者1:
package com.cjian.rabbitmq.direct;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName1 = "exchange_direct_queue1";
channel.basicConsume(queueName1,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
消费者2:
package com.cjian.rabbitmq.direct;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName2 = "exchange_direct_queue2";
channel.basicConsume(queueName2,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
控制台输出:
发送一条error的:
控制台输出:
5.交换机-topic模式
类似于direct,匹配功能更加强大,具体细节可查看代码中的注释
消费者
package com.cjian.rabbitmq.topic;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitmqUtils.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* exchange:交换机名称
* type:交换机的类型
* DIRECT("direct"),定向
* FANOUT("fanout"),扇形(广播),发送到消息到每一个与之绑定的队列
* TOPIC("topic"),通配符
* HEADERS("headers");参数匹配,用得少
*
* durable:是否持久化
* autoDelete:自动删除
* internal:内部使用 一般false
* arguments:参数
*/
String exchangeName = "exchange_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//创建队列
String queueName1 = "exchange_topic_queue1";
String queueName2 = "exchange_topic_queue2";
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//绑定队列和交换机
/**
* queueBind(String queue, String exchange, String routingKey)
* queue:队列名称
* exchange交换机名称
* routingKey:路由键(如果交换机的类型为fanout,routingKey设置为空)
*/
//#:匹配0或多个单词 ,*:匹配一个单词
channel.queueBind(queueName1,exchangeName,"#.error");
channel.queueBind(queueName2,exchangeName,"order.*");
channel.queueBind(queueName2,exchangeName,"*.*");
//发送消息
String msg = "Hello rabbitmq-order.error1";
channel.basicPublish(exchangeName, "order.error1", null, msg.getBytes());
channel.close();
connection.close();
}
}
启动后:
消费者1: 消费以error结尾的消息
package com.cjian.rabbitmq.topic;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName1 = "exchange_topic_queue1";
channel.basicConsume(queueName1,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
消费者2: 消费order开头的或者任意两个单词的
package com.cjian.rabbitmq.topic;
import com.cjian.RabbitmqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqUtils.getConnection();
Channel channel = connection.createChannel();
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
/**
* 回调方法,当收到消息后,会执行该方法
* @param consumerTag 标识
* @param envelope 获取一些信息:交换机,路由key等
* @param properties 配置信息
* @param body 数据
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
String queueName2 = "exchange_topic_queue2";
channel.basicConsume(queueName2,true,defaultConsumer);
//消费者因为需要一直监听,所以不需要关闭资源
}
}
粘过来好看点:
控制台:
后面的就不一一验证了,重点也不在这,后面分析rabbitmq的一些高级特性
本文地址:https://blog.csdn.net/cj_eryue/article/details/112850634