RabbitMQ的5种模式,并使用java进行模拟操作
程序员文章站
2022-03-15 20:59:26
2. RabbitMQ的5种工作模式首先把官网介绍放在这里:https://www.rabbitmq.com/getstarted.html然后就开始记录一下学习到的吧!多多指教2.1 简单模式就是将一个生产者绑定到一个消息队列,然后消费者从这个消息队列中取消息进行消费在生产者与队列之间,使用的有默认的交换机流程:生产者:新建Connection工厂 ,需要设置一些属性(ConnectionFactory)使用Connection工厂获取Connection对象使用Connect...
2. RabbitMQ的5种工作模式
首先把官网介绍放在这里:https://www.rabbitmq.com/getstarted.html
然后就开始记录一下学习到的吧!多多指教
2.1 简单模式
- 就是将一个生产者绑定到一个消息队列,然后消费者从这个消息队列中取消息进行消费
- 在生产者与队列之间,使用的有默认的交换机
流程:
生产者:
- 新建Connection工厂 ,需要设置一些属性(ConnectionFactory)
- 使用Connection工厂获取Connection对象
- 使用Connection对象获取Channel(信道)对象
- 通过Channel声明一个消息队列
- 发送消息
- 关闭连接资源
代码实现
public class HelloProduct {
public static void main(String[] args) throws Exception{
//1.得到ConnectionFactory
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("localhost");//默认就是localhost 可以不加该代码
connectionFactory.setPort(5672);//默认就是5672
// 这里说明一下,这个路径,还有账户密码,是自己提前建好的
connectionFactory.setVirtualHost("/lh");//默认是 /
connectionFactory.setUsername("ldh");//默认guest
connectionFactory.setPassword("ldh"); //默认guest
//2. 获取Connection
Connection connection=connectionFactory.newConnection();
//3.获取Channel
Channel channel = connection.createChannel();
//4.创建队列
/**
* queue(队列的名称),
* durable(是否持久化),
* exclusive (是否是独享的,表示被该信道独享),
* autoDelete (是否自动删除 如果没有消费者哪个该队列就会自动删除),
* Map<String, Object> arguments: 队列的参数设置
*/
channel.queueDeclare("hello-queue",false,false,false,null);
// 5.发送消息,这里使用四个参数的方法
//exchange交换机的名称, 如果你使用的为Hello模式 那么交换机的名称为”“
//routingKey:路由key 如果你使用的为Hello模式 那么路由key必须为队列的名称
//props: 消息的属性信息 null
//body: 表示消息内容
String msg = "今天第一天学习RabbitMQ所以速度很慢:"+i;
channel.basicPublish("", "hello-queue", null, msg.getBytes());
// 关闭资源
channel.close();
connection.close();
}
}
消费者:
- 新建Connection工厂 ,需要设置一些属性(ConnectionFactory)
- 使用Connection工厂获取Connection对象
- 使用Connection对象获取Channel(信道)对象
- 获取信道中的消息并通过回调函数进行处理
代码实现
public class HelloConsumer {
public static void main(String[] args) throws Exception {
//1.得到ConnectionFactory
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setVirtualHost("/lh");//默认 /
connectionFactory.setUsername("ldh");//默认guest
connectionFactory.setPassword("ldh"); //默认guest
//2. 获取Connection
Connection connection=connectionFactory.newConnection();
//3.获取Channel
Channel channel = connection.createChannel();
//4. 消费消息
//String queue, Consumer callback
//queue:队列的名称
//callback: 回调函数。
//boolean autoAck:是否自动回复
Consumer consumer=new DefaultConsumer(channel){
//当获取到消息后触发该方法的执行
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
// 接收消息
channel.basicConsume("hello-queue",true,consumer);
}
}
1.2 工作模式
跟上边的模式相差不大,只是增加了一个消费者。
只需要将上边的消费者代码复制一份即可,这里不在多说了。
1.3 广播模式(fanout)
这个模式和简单模式有一些不同的地方:
- 这个模式需指定使用 fanout 类型的交换机
除此之外其他大同小异,这里写一下流程和代码实现
流程:
生产者:
- 新建ConnctionFactory
- 通过ConnectionFactory获取Connection
- 通过Connection对象获取Channel
- 通过Channel对象定义交换机,指定类型为fanout
- 通过Channel对象定义消息队列
- 通过Channel对象对交换机和队列进行绑定
- 发送消息
- 关闭资源
代码
public class PublishProduct {
public static void main(String[] args) throws Exception {
ConnectionFactory factory=new ConnectionFactory();
factory.setUsername("ldh");
factory.setPassword("ldh");
factory.setVirtualHost("/lh");
Connection connection=factory.newConnection();
Channel channel = connection.createChannel();
//创建交换机
/**
* String exchange: 交换机的名称
* BuiltinExchangeType type: 交换机的类型
* fanout:广播模式:只要是绑定到交换机上的队列都可以收到消息
* direct:路由模式: 只要符合指定路由key的才可以收到消息
* topic:主题模式; 只要符合指定路由key的才可以收到消息
* boolean durable, 是否持久化
* boolean autoDelete, 是否自动删除
* Map<String, Object> arguments 参数
*/
channel.exchangeDeclare("fanout-exchange", BuiltinExchangeType.FANOUT,false,true,null);
// 定义消息队列
channel.queueDeclare("fanout_queue1",false,false,false,null);
channel.queueDeclare("fanout_queue2",false,false,false,null);
//队列和交互机进行绑定
channel.queueBind("fanout_queue1","fanout-exchange","");
channel.queueBind("fanout_queue2","fanout-exchange","");
for(int i=0;i<10;i++) {
channel.basicPublish("fanout-exchange", "", null, (i+"hello pulish exchange").getBytes());
}
// 关闭资源
channel.close();
connection.close();
}
}
消费者只需要改变一下绑定的队列即可,其他不需要改变。这里不在给出代码。
1.4 路由模式 (routing)
通过图可以看出,这个模式相较于广播模式而言,在交换机和队列消息的发送上多了一些控制。广播模式中,交换机发送一条消息,所有的绑定的队列都可以接收到。而在这种模式下,交换机发送一条消息,只有满足条件的队列,交换机才会发送。
在代码层面,相较于广播模式,发生了一些改变:
- 需要指定交换机模式为 direct 模式
- 在交换机和队列绑定时需要指定 routingkey,不能在是 空字符串了
同一个队列要绑定多个条件时,需要多次绑定 - 发送消息的时候需要带上 routingkey。
这里只列出不同的代码:
// 通过 Channel 对象声明交换机,类型为 direct
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,false,true,null);
// 声明队列
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
// 队列和交换机进行绑定
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"debug");
// 发送消息
channel.basicPublish(EXCHANGE_NAME, "info", null, (i+"hello pulish exchange").getBytes());
1.5 通配符模式(topics)
从图中可以看出,通配符模式与路由模式十分相似,不同的地方在于:
交换机与队列的 “路由key” 形式发生了变化。 使用方法与路由模式大同小异* 和 # 的区别,我在这里没有感受到有什么不同, 都是表示匹配0个或多个字符
与路由模式的不同:
- 需要指定交换机模式为 topic 模式
- 在于队列进行绑定的时候需要指定 routingKey 的规则
- 在发送消息时,需要指定 routingKey
核心代码:
// 4. 声明使用的交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC);
// 5. 新建队列
channel.queueDeclare(queueName1, false, false, false, null);
channel.queueDeclare(queueName2, false, false, false, null);
// 6. 绑定交换机和队列
channel.queueBind(queueName1, exchangeName, "*.orange");
channel.queueBind(queueName2, exchangeName, "*.*.rabbite");
channel.queueBind(queueName2, exchangeName, "lazy.#");
// 7. 发送消息给队列
String msg = "这是通过topic模式发送的消息";
channel.basicPublish(exchangeName, "abc.orange", null, msg.getBytes());
记录结束!
本文地址:https://blog.csdn.net/weixin_43852058/article/details/110251496