RabbitMQ 基础知识总结
- Hello World 模型
在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的盒子是一个队列——RabbitMQ代表使用者保存的消息缓冲区
生产者
public static void main(String[] args) throws IOException, TimeoutException { //创建工厂 ConnectionFactory con=new ConnectionFactory(); //设置连接那个主机 con.setHost("47.115.44.94"); //设置端口 con.setPort(5672); //设置连接虚拟主机 con.setVirtualHost("sem"); //设置虚拟主机的用户名和密码 con.setUsername("sem"); con.setPassword("123"); //获取连接对象 Connection connection=con.newConnection(); //Connection connection= RabbitmqConn.getConnection();//连接对象封装 //获取连接通道 Channel channel=connection.createChannel(); //通道绑定对应消息队列 // 参数1:队列客称 如果队列不存在自动创建 // 参数2:用来定义队列特性是否夏持久化true持久化队列 false不持久化 // 参数数3: exclusive是否独占队列 true独古队列 false 不独占 // 参数数4: autoDelete:是否在消质完成后自动制除认列 true自动副除 false不自动删除 // 参数数5:部外附加参数 channel.queueDeclare("hello",false,false,false,null); //发布消息 //参数1:交换机名称 参数2:队列名称 参数3:传递消息额外设置(例如设置:MessageProperties.PERSISTENT_TEXT_PLAIN 持久化队列的消息) // 参数4:消息的具体内容 channel.basicPublish("","hello", null,"第一个".getBytes()); channel.close(); connection.close(); }
消费者
public static void main(String[] args) throws IOException, TimeoutException { //创建工厂 ConnectionFactory con=new ConnectionFactory(); //设置连接那个主机 con.setHost("47.115.44.94"); //设置端口 con.setPort(5672); //设置连接虚拟主机 con.setVirtualHost("sem"); //设置虚拟主机的用户名和密码 con.setUsername("sem"); con.setPassword("123"); //获取连接对象 Connection connection=con.newConnection(); // Connection connection= RabbitmqConn.getConnection(); 连接对象封装 //获取连接通道 Channel channel=connection.createChannel(); //通道绑定对应消息队列 // 参数1:队列客称 如果队列不存在自动创建 // 参数2:用来定义队列特性是否夏持久化true持久化队列 false不持久化 // 参数数3: exclusive是否独占队列 true独古队列 false 不独占 // 参数数4: autoDelete:是否在消质完成后自动制除认列 true自动副除 false不自动删除 // 参数数5:部外附加参数 channel.queueDeclare("hello",false,false,false,null); //消费消息 //参数1:消费那个 队列的消息队列名称 //参数2:开启消息的直动确认机 //参数3:消费时的回调调接口 channel.basicConsume("hello",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("---------->"+new String(body)); } }); }
连接对象封装
public class RabbitmqConn { private static ConnectionFactory connectionFactory; static { //创建工厂 connectionFactory=new ConnectionFactory(); //设置连接那个主机 connectionFactory.setHost("47.115.44.94"); //设置端口 connectionFactory.setPort(5672); //设置连接虚拟主机 connectionFactory.setVirtualHost("sem"); //设置虚拟主机的用户名和密码 connectionFactory.setUsername("sem"); connectionFactory.setPassword("123"); } public static Connection getConnection() throws IOException, TimeoutException { //获取连接对象 return connectionFactory.newConnection(); } }
2. Work queues 模型
在下图中,“P”是我们的生产者,“C1”,“C2”是我们的两个消费者,。中间的盒子是一个队列——RabbitMQ代表使用者保存的消息缓冲区;
没有设置这两句代码channel.basicQos(1)和channel.basicAck(envelope.getDeliveryTag(),false)之前,消费者消费消息是平均消费的,无论某个消费者消费的消息有多慢,也是平均消费,但是这种方式有时候不能够满足业务需求,我们需要的是能者多劳,那个消费者消费的快就消费多点,这样大大提高效了。
生产者:
public static void main(String[] args) throws IOException, TimeoutException { //创建连接对象 Connection connection = RabbitmqConn.getConnection(); //连接通道 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("work",false,false,false,null); for (int i= 0; i< 10; i++) { channel.basicPublish("","work",null,(i+"您好呀").getBytes()); } //关闭连接 channel.close(); connection.close(); }
消费者1
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConn.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1);//每一次只能消费一个消息 channel.queueDeclare("work",false,false,false,null); //第二个参数 channel.basicConsume("work",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1--------->"+new String(body)); try { Thread.sleep(1000); }catch (Exception e){ e.printStackTrace(); } //参数1:确认队列中那个具体消息 参数2:是否开启3个消意同时确实 channel.basicAck(envelope.getDeliveryTag(),false); } }); }
消费者2
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConn.getConnection();//连接对象 Channel channel = connection.createChannel();//连接通道 channel.basicQos(1);//每一次只能消费一个消息 //创建队列 channel.queueDeclare("work",false,false,false,null); channel.basicConsume("work",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2--------->"+new String(body)); //参数1:确认队列中那个具体消息 参数2:是否开启3个消意同时确实 channel.basicAck(envelope.getDeliveryTag(),false); } }); }
3.Publish/Subscribe 模型(又称广播模型)
在下图中,“P”是我们的生产者,“X”代表转换机,“C1”,“C2”是我们的两个消费者。中间的盒子是队列
在广播模式下:
消息发送流程是这样的:
1.可以有多个消费者
2.每个消费者有自己的queue(队列)
3.每个队列都要排定到Echange(交换机)
4.生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
5.交换机把消息发送给绑定过的所有队
6.列队列的消费者都能率到消息,实现一条消息被多个消费者消费
生产者
public static void main(String[] args) throws IOException, TimeoutException { //获取连接对象 Connection connection = RabbitmqConn.getConnection(); //创建连接通道 Channel channel=connection.createChannel(); //指定交换机,参数1:交换机名称,参数2 :交换机类型 ,fanout 广播类型 channel.exchangeDeclare("logs","fanout"); //发送消息 channel.basicPublish("logs","",null,"logs out to for".getBytes()); channel.close(); connection.close(); }
消费者(你可以模拟多个消费者测试,这里就不一一写了)
public static void main(String[] args) throws IOException, TimeoutException { //创建连接对象 Connection connection = RabbitmqConn.getConnection(); //创建连接通道 Channel channel = connection.createChannel(); //通道绑定交换机 channel.exchangeDeclare("logs","fanout"); //临时通道 String queueName = channel.queueDeclare().getQueue(); //绑定交换机与队列 channel.queueBind(queueName,"logs",""); channel.basicConsume(queueName,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费1---》"+ new String(body)); } }); }
4.Routing 模型
在fanout模式中,一条消息,会被所有订购的队列都消费,但是,在 某些 场景下我们希望不同的消息被不同的消息队列消费这时就要用到Direct类型的Exchange;
在Direct模型下:
1.队列与交换机的绑定,不能是任意讲定了,而是要指定一个Routingkey (路由key),
2.消息的发送方在向Echange发送消息时,也必须指定消息的Routingkey
3.Exchang不再把消息交给每一个排定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息;
生产者
public static void main(String[] args) throws IOException, TimeoutException { //创建连接对象 Connection connection = RabbitmqConn.getConnection(); //创建连接通道 Channel channel = connection.createChannel(); //交换机 channel.exchangeDeclare("hell_direct","direct"); //发送消息 String routingkey="error"; channel.basicPublish("hell_direct",routingkey,null,"这是direst".getBytes()); channel.close(); connection.close(); }
消费者1
public static void main(String[] args) throws IOException, TimeoutException { //创建连接对象 Connection connection = RabbitmqConn.getConnection(); //创建通道连接 Channel channel = connection.createChannel(); //连接的交换机 channel.exchangeDeclare("hell_direct","direct"); //临时队列 String queue = channel.queueDeclare().getQueue(); //交换机与临时队列连接 channel.queueBind(queue,"hell_direct","error"); //消费消息 channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1--->"+new String(body)); } }); }
消费者2
public static void main(String[] args) throws IOException, TimeoutException { //创建连接对象 Connection connection = RabbitmqConn.getConnection(); //创建通道连接 Channel channel = connection.createChannel(); //连接的交换机 channel.exchangeDeclare("hell_direct","direct"); //临时队列 String queue = channel.queueDeclare().getQueue(); //交换机与临时队列连接 channel.queueBind(queue,"hell_direct","error"); channel.queueBind(queue,"hell_direct","info"); channel.queueBind(queue,"hell_direct","warning"); //消费消息 channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者 2--->"+new String(body)); } }); }
输出结果:
都可以消费者1和消费者2都可以消费消息,因为生产者rountingkey设置为error,
如果生产者rountingkey设置为infor,那么消费者1不能消费消息,而消费者可以消费消息
5.topic 模型
Topic类型的Exchange与Direct相比,都是可以根据Routingkey把消息路由到不同的队列,
只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配特“*”和“*”
这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以"."分割,例如: stes.insert
*(星号)只能代替一个单词。
#(井号)可以替换零个或多个单词。
生产者
public static void main(String[] args) throws IOException, TimeoutException { //创建连接对象 Connection connection = RabbitmqConn.getConnection(); //创建连接通道 Channel channel = connection.createChannel(); //交换机 channel.exchangeDeclare("topics","topic"); //发送消息 String routingkey="user.save.hello"; channel.basicPublish("topics",routingkey,null,"这是topic".getBytes()); channel.close(); connection.close(); }
消费者1
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConn.getConnection(); //创建通道连接 Channel channel = connection.createChannel(); //连接的交换机 channel.exchangeDeclare("topics","topic"); //临时队列 String queue = channel.queueDeclare().getQueue(); //交换机与临时队列连接 channel.queueBind(queue,"topics","user.*"); //消费消息 channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("topic--->"+new String(body)); } }); }
消费者2
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConn.getConnection(); //创建通道连接 Channel channel = connection.createChannel(); //连接的交换机 channel.exchangeDeclare("topics","topic"); //临时队列 String queue = channel.queueDeclare().getQueue(); //交换机与临时队列连接 channel.queueBind(queue,"topics","user.#"); //消费消息 channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("topic--->"+new String(body)); } }); }
输出结果:
消费者1 不能消费消息(因为*只能代替一个单词),而消费者2可以消费消息(因为#可以替换零个或多个单词)
本文地址:https://blog.csdn.net/weixin_44544678/article/details/109578831