rabbitMq 初步
rabbitmq的工作原理
它的基本结构
组成部分说明如下:
broker:消息队列服务进程,此进程包括两个部分:exchange和queue。
exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
producer:消息生产者,即生产方客户端,生产方客户端将消息发送到mq。
consumer:消息消费者,即消费方客户端,接收mq转发的消息。
maven举例配置
<dependency> <groupid>com.rabbitmq</groupid> <artifactid>amqp‐client</artifactid> <version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring‐boot‐starter‐logging</artifactid> </dependency>
生产者举例demo
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class rabbitmqconfig { public static final string queue_inform_email = "queue_inform_email"; public static final string queue_inform_sms = "queue_inform_sms"; public static final string exchange_topics_inform="exchange_topics_inform"; public static final string routingkey_email="inform.#.email.#"; public static final string routingkey_sms="inform.#.sms.#"; //声明交换机 @bean(exchange_topics_inform) public exchange exchange_topics_inform(){ //durable(true) 持久化,mq重启之后交换机还在 return exchangebuilder.topicexchange(exchange_topics_inform).durable(true).build(); } //声明queue_inform_email队列 @bean(queue_inform_email) public queue queue_inform_email(){ return new queue(queue_inform_email); } //声明queue_inform_sms队列 @bean(queue_inform_sms) public queue queue_inform_sms(){ return new queue(queue_inform_sms); } //routingkey_email队列绑定交换机,指定routingkey @bean public binding binding_queue_inform_email(@qualifier(queue_inform_email) queue queue, @qualifier(exchange_topics_inform) exchange exchange){ return bindingbuilder.bind(queue).to(exchange).with(routingkey_email).noargs(); } //routingkey_sms队列绑定交换机,指定routingkey @bean public binding binding_routingkey_sms(@qualifier(queue_inform_sms) queue queue, @qualifier(exchange_topics_inform) exchange exchange){ return bindingbuilder.bind(queue).to(exchange).with(routingkey_sms).noargs(); } }
消费者举例demo
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class rabbitmqconfig { public static final string queue_inform_email = "queue_inform_email"; public static final string queue_inform_sms = "queue_inform_sms"; public static final string exchange_topics_inform="exchange_topics_inform"; public static final string routingkey_email="inform.#.email.#"; public static final string routingkey_sms="inform.#.sms.#"; //声明交换机 @bean(exchange_topics_inform) public exchange exchange_topics_inform(){ //durable(true) 持久化,mq重启之后交换机还在 return exchangebuilder.topicexchange(exchange_topics_inform).durable(true).build(); } //声明queue_inform_email队列 @bean(queue_inform_email) public queue queue_inform_email(){ return new queue(queue_inform_email); } //声明queue_inform_sms队列 @bean(queue_inform_sms) public queue queue_inform_sms(){ return new queue(queue_inform_sms); } //routingkey_email队列绑定交换机,指定routingkey @bean public binding binding_queue_inform_email(@qualifier(queue_inform_email) queue queue, @qualifier(exchange_topics_inform) exchange exchange){ return bindingbuilder.bind(queue).to(exchange).with(routingkey_email).noargs(); } //routingkey_sms队列绑定交换机,指定routingkey @bean public binding binding_routingkey_sms(@qualifier(queue_inform_sms) queue queue, @qualifier(exchange_topics_inform) exchange exchange){ return bindingbuilder.bind(queue).to(exchange).with(routingkey_sms).noargs(); } }
工作模式
rabbitmq有以下几种工作模式 :
1、work queues
2、publish/subscribe
3、routing
4、topics
5、header
6、rpc
work queues
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
测试:
1、使用入门程序,启动多个消费者。
2、生产者发送多个消息。
结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
publish/subscribe 发布订阅模式
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
routin
路由模式:
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
这是一种非常灵活的模式,经常被用到
topics
路由模式:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
header模式
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配
队列。
案例:
根据用户的通知设置去通知用户,设置接收email的用户只接收email,设置接收sms的用户只接收sms,设置两种
通知类型都接收的则两种通知都有效。
生产者demo:
map<string, object> headers_email = new hashtable<string, object>(); headers_email.put("inform_type", "email"); map<string, object> headers_sms = new hashtable<string, object>(); headers_sms.put("inform_type", "sms"); channel.queuebind(queue_inform_email,exchange_headers_inform,"",headers_email); channel.queuebind(queue_inform_sms,exchange_headers_inform,"",headers_sms);
通知demo :
string message = "email inform to user"+i; map<string,object> headers = new hashtable<string, object>(); headers.put("inform_type", "email");//匹配email通知消费者绑定的header //headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header amqp.basicproperties.builder properties = new amqp.basicproperties.builder(); properties.headers(headers); //email通知 channel.basicpublish(exchange_headers_inform, "", properties.build(), message.getbytes());
发送邮件消费者 :
channel.exchangedeclare(exchange_headers_inform, builtinexchangetype.headers); map<string, object> headers_email = new hashtable<string, object>(); headers_email.put("inform_email", "email"); //交换机和队列绑定 channel.queuebind(queue_inform_email,exchange_headers_inform,"",headers_email); //指定消费队列 channel.basicconsume(queue_inform_email, true, consumer);
rpc
rpc即客户端远程调用服务端的方法 ,使用mq可以实现rpc的异步调用,基于direct交换机实现,流程如下:
1、客户端即是生产者就是消费者,向rpc请求队列发送rpc调用消息,同时监听rpc响应队列。
2、服务端监听rpc请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将rpc方法 的结果发送到rpc响应队列
4、客户端(rpc调用方)监听rpc响应队列,接收到rpc调用结果。