RabbitMQ 从入门到精通 (一)
目录
1. 初识rabbitmq
rabbitmq 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,rabbitmq是使用 erlang语言来编写的,并且rabbitmq是基于amqp协议的
rabbitmq的优点:
- 开源、性能优秀、稳定性保障
- 提供可靠性消息投递模式(confirm)、返回模式(return)
- 与springamqp完美的整合、api丰富
- 集群模式丰富,表达式配置,ha模式,镜像队列模型
- 保证数据不丢失的前提下做到高可靠性、可用性
rabbitmq的整体架构:
rabbitmq的消息流转:
2. amqp
amqp全称: advanced message queuing protocol
amqp翻译: 高级消息队列协议
amqp定义: 是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
amqp核心概念:
- server:又称broker,接受客户端的连接,实现amqp实体服务
- connection:连接,应用程序与broker的网络连接
- channel:网络信道,几乎所有的操作都在channel中进行,channel是进行消息读写的通道。客户端可建立多个channel,每个channel代表一个会话任务
- message:消息,服务器和应用程序之间传送的数据,由properties和body组成。properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;body则是消息体的内容
- virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。同一个virtual host里面不能有相同名称的exchange或queue
- exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
- binding:exchange和queue之间的虚拟连接,binding中可以包含routing key
- routing key:一个路由规则,虚拟机可用它确定如何路由一个特定消息
- queue:也称为message queue,消息队列,保存消息并将它们转发给消费者
3.rabbitmq的极速入门
后台启动: ./rabbitmq start &
关闭: ./rabbitmqctl stop
节点状态: ./rabbitmqctl status
管控台:
rabbitmq生产消费快速入门:
环境: springboot+jdk1.7+rabbitmq3.6.5 (maven依赖配置)
<parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>1.5.9.release</version> </parent> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>com.rabbitmq</groupid> <artifactid>amqp-client</artifactid> <version>3.6.5</version> </dependency> </dependencies>
public class procuder { public static void main(string[] args) throws exception { //1.创建一个connectionfactory 并进行配置 connectionfactory connectionfactory = new connectionfactory(); connectionfactory.sethost("192.168.244.11"); connectionfactory.setport(5672); connectionfactory.setvirtualhost("/"); connectionfactory.sethandshaketimeout(20000); //2.通过连接工厂创建连接 connection connection = connectionfactory.newconnection(); //3.通过connection 创建一个 channel channel channel = connection.createchannel(); /** * basicpublish(string exchange, string routingkey, basicproperties props, byte[] body) * exchange:指定交换机 不指定 则默认 (amqp default交换机) 通过routingkey进行匹配 * props 消息属性 * body 消息体 */ //4.通过channel发送数据 for(int i = 0; i < 5; i++){ system.out.println("生产消息:" + i); string msg = "hello rabbitmq" + i; channel.basicpublish("", "test", null, msg.getbytes()); } //5.记得关闭相关的连接 channel.close(); connection.close(); } }
public class consumer { public static void main(string[] args) throws exception{ //1.创建一个connectionfactory 并进行配置 connectionfactory connectionfactory = new connectionfactory(); connectionfactory.sethost("192.168.244.11"); connectionfactory.setport(5672); connectionfactory.setvirtualhost("/"); connectionfactory.sethandshaketimeout(20000); //2.通过连接工厂创建连接 connection connection = connectionfactory.newconnection(); //3.通过connection 创建一个 channel channel channel = connection.createchannel(); //4. 声明创建一个队列 string queuename = "test"; /** * durable 是否持久化 * exclusive 独占的 相当于加了一把锁 */ channel.queuedeclare(queuename,true,false,false,null); //5.创建消费者 queueingconsumer queueingconsumer = new queueingconsumer(channel); //6.设置channel /** * ack: 当一条消息从生产端发到消费端,消费端接收到消息后会马上回送一个ack信息给broker,告诉它这条消息收到了 * autoack: * true 自动签收 当消费者一收到消息就表示消费者收到了消息,消费者收到了消息就会立即从队列中删除。 * false 手动签收 当消费者收到消息在合适的时候来显示的进行确认,说我已经接收到了该消息了,rabbitmq可以从队列中删除该消息了 * */ channel.basicconsume(queuename, true, queueingconsumer); //7.获取消息 while(true){ delivery delivery = queueingconsumer.nextdelivery(); string msg = new string(delivery.getbody()); system.err.println("消费端:" + msg); //envelope envelope = delivery.getenvelope(); } } }
4. exchange(交换机)详解
exchange: 接收消息,并根据路由键转发消息所绑定的队列
交换机属性:
- name: 交换机名称
- type: 交换机类型 diect、topic、fanout、headers
- durability: 是否需要持久化,true为持久化
- autodelete: 当最后一个绑定到exchange的队列删除后,自动删除该exchange
- internal: 当前exchange是否用于rabbitmq内部使用,默认为false (百分之99的情况默认为false 除非对erlang语言较了解,做一些扩展)
- arguments: 扩展参数, 用于扩展amqp协议可自定化使用
4.1 direct exchange
所有发送到direct exchange的消息被转发到routekey指定的queue
注意:direct模式可以使用rabbitmq自带的exchange: default exchange,所以不需要将exchange进行任何绑定(binding)操作,消息传递时,routingkey必须完全匹配才会被队列接收,否则该消息会被抛弃
public class producerdirectexchange { public static void main(string[] args) throws exception { //1.创建connectionfactory connectionfactory connectionfactory = new connectionfactory(); connectionfactory.sethost("192.168.244.11"); connectionfactory.setport(5672); connectionfactory.setvirtualhost("/"); //2.创建connection connection connection = connectionfactory.newconnection(); //3.创建channel channel channel = connection.createchannel(); //4.声明 string exchangename = "test_direct_exchange"; string routingkey = "test.direct"; //5.发送 string msg = "hello world rabbitmq4 direct exchange message"; channel.basicpublish(exchangename, routingkey, null, msg.getbytes()); } }
public class consumerdirectexchange { public static void main(string[] args) throws exception{ connectionfactory connectionfactory = new connectionfactory(); connectionfactory.sethost("192.168.244.11"); connectionfactory.setport(5672); connectionfactory.setvirtualhost("/"); connectionfactory.sethandshaketimeout(20000); connectionfactory.setautomaticrecoveryenabled(true); connectionfactory.setnetworkrecoveryinterval(3000); connection connection = connectionfactory.newconnection(); channel channel = connection.createchannel(); //声明 string exchangename = "test_direct_exchange"; string exchangetype = "direct"; string queuename = "test_direct_queue"; string routingkey = "test.direct"; //表示声明了一个交换机 channel.exchangedeclare(exchangename, exchangetype,true,false,false,null); //表示声明了一个队列 channel.queuedeclare(queuename,false,false,false,null); //建立一个绑定关系 channel.queuebind(queuename, exchangename, routingkey); //durable 是否持久化消息 queueingconsumer consumer = new queueingconsumer(channel); //参数:队列名称,是否自动ack,consumer channel.basicconsume(queuename, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 delivery delivery = consumer.nextdelivery(); string msg = new string(delivery.getbody()); system.out.println("收到消息:" + msg); } } }
4.2 topic exchange
所有发送到topic exchange的消息被转发到所有关心routekey中指定topic的queue上
exchange将routekey和某topic进行模糊匹配,此时队列需要绑定一个topic
注意:可以使用通配符进行匹配
符号 # 匹配一个或多个词
符号 * 匹配不多不少一个词
例如: "log.#" 能够匹配到 “log.info.oa”
"log.*" 只会匹配到 "log.err"
public class producertopicexchange { public static void main(string[] args) throws exception { //1.创建connectionfactory connectionfactory connectionfactory = new connectionfactory(); connectionfactory.sethost("192.168.244.11"); connectionfactory.setport(5672); connectionfactory.setvirtualhost("/"); connectionfactory.sethandshaketimeout(20000); //2.创建connection connection connection = connectionfactory.newconnection(); //3.创建channel channel channel = connection.createchannel(); //4.声明 string exchangename = "test_topic_exchange"; string routingkey1 = "user.save"; string routingkey2 = "user.update"; string routingkey3 = "user.delete.abc"; //5.发送 string msg = "hello world rabbitmq4 direct exchange message"; channel.basicpublish(exchangename, routingkey1, null, msg.getbytes()); channel.basicpublish(exchangename, routingkey2, null, msg.getbytes()); channel.basicpublish(exchangename, routingkey3, null, msg.getbytes()); } }
public class consumertopicexchange { public static void main(string[] args) throws exception{ connectionfactory connectionfactory = new connectionfactory(); connectionfactory.sethost("192.168.244.11"); connectionfactory.setport(5672); connectionfactory.setvirtualhost("/"); connectionfactory.sethandshaketimeout(20000); connectionfactory.setautomaticrecoveryenabled(true); connectionfactory.setnetworkrecoveryinterval(3000); connection connection = connectionfactory.newconnection(); channel channel = connection.createchannel(); //声明 string exchangename = "test_topic_exchange"; string exchangetype = "topic"; string queuename = "test_topic_queue"; string routingkey = "user.#"; //表示声明了一个交换机 channel.exchangedeclare(exchangename, exchangetype,true,false,false,null); //表示声明了一个队列 channel.queuedeclare(queuename,false,false,false,null); //建立一个绑定关系 channel.queuebind(queuename, exchangename, routingkey); //durable 是否持久化消息 queueingconsumer consumer = new queueingconsumer(channel); //参数:队列名称,是否自动ack,consumer channel.basicconsume(queuename, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 delivery delivery = consumer.nextdelivery(); string msg = new string(delivery.getbody()); system.out.println("收到消息:" + msg); } } }
4.3 fanout exchange
不处理路由键,只需要简单的将队列绑定到交换机上
发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
所以fanout交换机转发消息是最快的
public class producerfanoutexchange { public static void main(string[] args) throws exception { //1.创建connectionfactory connectionfactory connectionfactory = new connectionfactory(); connectionfactory.sethost("192.168.244.11"); connectionfactory.setport(5672); connectionfactory.setvirtualhost("/"); connectionfactory.sethandshaketimeout(20000); //2.创建connection connection connection = connectionfactory.newconnection(); //3.创建channel channel channel = connection.createchannel(); //4.声明 string exchangename = "test_fanout_exchange"; //5.发送 for(int i = 0; i < 10 ; i++){ string msg = "hello world rabbitmq4 direct exchange message"; channel.basicpublish(exchangename, "", null, msg.getbytes()); } channel.close(); connection.close(); } }
public class consumerfanoutexchange { public static void main(string[] args) throws exception{ connectionfactory connectionfactory = new connectionfactory(); connectionfactory.sethost("192.168.244.11"); connectionfactory.setport(5672); connectionfactory.setvirtualhost("/"); connectionfactory.sethandshaketimeout(20000); connectionfactory.setautomaticrecoveryenabled(true); connectionfactory.setnetworkrecoveryinterval(3000); connection connection = connectionfactory.newconnection(); channel channel = connection.createchannel(); //声明 string exchangename = "test_fanout_exchange"; string exchangetype = "fanout"; string queuename = "test_topic_queue"; //无需指定路由key string routingkey = ""; //表示声明了一个交换机 channel.exchangedeclare(exchangename, exchangetype,true,false,false,null); //表示声明了一个队列 channel.queuedeclare(queuename,false,false,false,null); //建立一个绑定关系 channel.queuebind(queuename, exchangename, routingkey); //durable 是否持久化消息 queueingconsumer consumer = new queueingconsumer(channel); //参数:队列名称,是否自动ack,consumer channel.basicconsume(queuename, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 delivery delivery = consumer.nextdelivery(); string msg = new string(delivery.getbody()); system.out.println("收到消息:" + msg); } } }
5. message 消息
服务器与应用程序之间传递的数据,本质上就是一段数据,由properties和body组成
常用属性:delivery mode、headers (自定义属性)
其他属性:content_type、content_encoding、priority、expiration
消息的properties属性用法示例:
public class procuder { public static void main(string[] args) throws exception { //1.创建一个connectionfactory 并进行配置 connectionfactory connectionfactory = new connectionfactory(); connectionfactory.sethost("192.168.244.11"); connectionfactory.setport(5672); connectionfactory.setvirtualhost("/"); connectionfactory.sethandshaketimeout(20000); //2.通过连接工厂创建连接 connection connection = connectionfactory.newconnection(); //3.通过connection 创建一个 channel channel channel = connection.createchannel(); map<string,object> headers = new hashmap<>(); headers.put("my1", "111"); headers.put("my2", "222"); //10秒不消费 消息过期移除消息队列 amqp.basicproperties properties = new amqp.basicproperties().builder() .deliverymode(2) .contentencoding("utf-8") .expiration("10000") .headers(headers) .build(); //4.通过channel发送数据 for(int i = 0; i < 5; i++){ system.out.println("生产消息:" + i); string msg = "hello rabbitmq" + i; channel.basicpublish("", "test", properties, msg.getbytes()); } //5.记得关闭相关的连接 channel.close(); connection.close(); } }
public class consumer { public static void main(string[] args) throws exception{ //1.创建一个connectionfactory 并进行配置 connectionfactory connectionfactory = new connectionfactory(); connectionfactory.sethost("192.168.244.11"); connectionfactory.setport(5672); connectionfactory.setvirtualhost("/"); connectionfactory.sethandshaketimeout(20000); //2.通过连接工厂创建连接 connection connection = connectionfactory.newconnection(); //3.通过connection 创建一个 channel channel channel = connection.createchannel(); //4. 声明创建一个队列 string queuename = "test"; channel.queuedeclare(queuename,true,false,false,null); //5.创建消费者 queueingconsumer queueingconsumer = new queueingconsumer(channel); //6.设置channel channel.basicconsume(queuename, true, queueingconsumer); //7.获取消息 while(true){ delivery delivery = queueingconsumer.nextdelivery(); string msg = new string(delivery.getbody()); system.err.println("消费端:" + msg); map<string, object> headers = delivery.getproperties().getheaders(); system.err.println("headers value:" + headers.get("my1")); } } }
下一篇: scss混合(mixins)使用