消息中间件MQ之RabbitMQ
消息中间件MQ之RabbitMQ
MQ
-
Message Queue(消息队列),是在消息的的传输过程中保存消息的容器,多用于分布式系统之间进行通信。
-
优势:
- 应用解耦:提高系统容错性和可维护性
- 异步提速:提升用户体验和系统吞吐量
- 削峰填谷:提高系统稳定性
-
劣势:
- 系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响
- 系统复杂度提高:以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用
- 一致性问题:A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,BCD系统可能处理失败
RabbitMQ简介
-
是一个基于AMQP协议实现的一款消息中间件
-
AMQP:Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制
-
基础架构图
-
RabbitMQ 中的相关概念:
- Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之间的 TCP 连接
- Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最终被送到这里等待 consumer 取走
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ的7种工作模式与JMS
-
简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing路由模式、Topics 主题模式、RPC 远程调用模式、Publisher Confirms
-
JMS:即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件的API,是 JavaEE 规范中的一种,类比JDBC
-
导入依赖:
<dependencies> <!--rabbitmq java 客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> </dependencies>
RabbitMQ之简单模式
-
producer
public class ProducerHello { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.33.129");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/it");//虚拟机 默认值/ factory.setUsername("gmx");//用户名 默认 guest factory.setPassword("gmx");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("hello",true,false,false,null); /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数: 1. exchange:交换机名称。简单模式下交换机会使用默认的 "" 2. routingKey:路由名称 3. props:配置信息 4. body:发送消息数据 */ String body = "hello rabbitmq~~~"; //6. 发送消息 channel.basicPublish("","hello",null,body.getBytes()); //7.释放资源 channel.close(); connection.close(); } }
-
consumer
public class ConsumerHello { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.33.129");//ip 默认值 localhost factory.setPort(5672); //端口 默认值 5672 factory.setVirtualHost("/it");//虚拟机 默认值/ factory.setUsername("gmx");//用户名 默认 guest factory.setPassword("gmx");//密码 默认值 guest //3. 创建连接 Connection Connection connection = factory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); //5. 创建队列Queue /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数: 1. queue:队列名称 2. durable:是否持久化,当mq重启之后,还在 3. exclusive: * 是否独占。只能有一个消费者监听这队列 * 当Connection关闭时,是否删除队列 4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉 5. arguments:参数。 */ //如果没有一个名字叫hello的队列,则会创建该队列,如果有则不会创建 channel.queueDeclare("hello",true,false,false,null); /* basicConsume(String queue, boolean autoAck, Consumer callback) 参数: 1. queue:队列名称 2. autoAck:是否自动确认 3. callback:回调对象 */ // 接收消息 Consumer consumer = new DefaultConsumer(channel){ /* 回调方法,当收到消息后,会自动执行该方法 1. consumerTag:标识 2. envelope:获取一些信息,交换机,路由key... 3. properties:配置信息 4. body:数据 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; //监听消息队列 channel.basicConsume("hello",true,consumer); //关闭资源?不要 } }
RabbitMQ之work queues模式
- 代码同上,需创建两个consumer来同时接收消息(默认是轮询机制)
RabbitMQ之Publish/Subscribe 发布与订阅模式
-
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
-
producer
public class Producer_PubSub { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置链接信息 factory.setHost("192.168.33.129"); factory.setPort(5672); factory.setVirtualHost("/it"); factory.setUsername("gmx"); factory.setPassword("gmx"); //创建连接 Connection connection = factory.newConnection(); //创建channel Channel channel = connection.createChannel(); //创建交换机 String exchangeName = "test_fanout"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); //创建队列 channel.queueDeclare("fanout_queue1",true,false,false,null); channel.queueDeclare("fanout_queue2",true,false,false,null); //绑定队列和交换机 channel.queueBind("fanout_queue1",exchangeName,""); channel.queueBind("fanout_queue2",exchangeName,""); //发送消息 String body = "log......."; channel.basicPublish(exchangeName,"",null,body.getBytes()); //释放消息 channel.close(); connection.close(); } }
-
consumer 1 接受处理fanout_queue1的消息
public class Counstumer_PubSub1 { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //2. 设置参数 connectionFactory.setHost("192.168.33.129");//ip 默认值 localhost connectionFactory.setPort(5672);//端口 默认值 5672 connectionFactory.setVirtualHost("/it");//虚拟机 默认值/ connectionFactory.setUsername("gmx"); connectionFactory.setPassword("gmx"); //3. 创建连接 Connection Connection connection = connectionFactory.newConnection(); //4. 创建Channel Channel channel = connection.createChannel(); String queueName1 = "fanout_queue1"; // 接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { System.out.println(new String(bytes)); } }; channel.basicConsume(queueName1,true,consumer); } }
-
consumer 2 接受处理fanout_queue2的消息,将fanout_queue1改为fanout_queue2
RabbitMQ之Routing路由模式
-
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
-
producer
public class Producer_Routing { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置链接信息 factory.setHost("192.168.33.129"); factory.setPort(5672); factory.setVirtualHost("/it"); factory.setUsername("gmx"); factory.setPassword("gmx"); //创建连接 Connection connection = factory.newConnection(); //创建channel Channel channel = connection.createChannel(); //创建交换机 String exchangeName = "test_direct"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null); //创建队列 channel.queueDeclare("direct_queue1",true,false,false,null); channel.queueDeclare("direct_queue2",true,false,false,null); //绑定队列和交换机 channel.queueBind("direct_queue1",exchangeName,"error"); channel.queueBind("direct_queue2",exchangeName,"error"); channel.queueBind("direct_queue2",exchangeName,"info"); channel.queueBind("direct_queue2",exchangeName,"warning"); //发送消息 String body = "error......."; String body1 = "info......."; String body2 = "warning......."; channel.basicPublish(exchangeName,"error",null,body.getBytes()); channel.basicPublish(exchangeName,"info",null,body1.getBytes()); channel.basicPublish(exchangeName,"warning",null,body2.getBytes()); //释放消息 channel.close(); connection.close(); } }
-
consumer 通过队列名字获取值
RabbitMQ之Topics 主题模式
-
在配置routing key 的时候可以使用通配符
-
*表示一个词,#表示n个词,以.号为分隔符,如*.haha表示匹配a.haha、不匹配a.b.haha
-
producer
public class Producer_Topic { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置链接信息 factory.setHost("192.168.33.129"); factory.setPort(5672); factory.setVirtualHost("/it"); factory.setUsername("gmx"); factory.setPassword("gmx"); //创建连接 Connection connection = factory.newConnection(); //创建channel Channel channel = connection.createChannel(); //创建交换机 String exchangeName = "test_TOPIC"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null); //创建队列 channel.queueDeclare("TOPIC_queue1",true,false,false,null); channel.queueDeclare("TOPIC_queue2",true,false,false,null); //绑定队列和交换机 channel.queueBind("TOPIC_queue1",exchangeName,"order.*"); channel.queueBind("TOPIC_queue2",exchangeName,"#.error"); channel.queueBind("TOPIC_queue2",exchangeName,"#.info.*"); //发送消息 String body = "error......."; String body1 = "order....... "; channel.basicPublish(exchangeName,"error",null,body.getBytes()); // channel.basicPublish(exchangeName,".error",null,body.getBytes()); channel.basicPublish(exchangeName,"order.hah",null,body1.getBytes()); //释放消息 channel.close(); connection.close(); } }