欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

rabbitMq 初步

程序员文章站 2022-07-11 08:35:21
RabbitMQ的工作原理 它的基本结构 组成部分说明如下: Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。 Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。 Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的 ......

rabbitmq的工作原理

它的基本结构

rabbitMq 初步

组成部分说明如下:

broker:消息队列服务进程,此进程包括两个部分:exchange和queue。

exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。

queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

producer:消息生产者,即生产方客户端,生产方客户端将消息发送到mq。

consumer:消息消费者,即消费方客户端,接收mq转发的消息。

 

maven举例配置

<dependency>
<groupid>com.rabbitmq</groupid>
<artifactid>amqp‐client</artifactid>
<version>4.0.3</version><!‐‐此版本与spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring‐boot‐starter‐logging</artifactid>
</dependency>

生产者举例demo

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class rabbitmqconfig {
    public static final string queue_inform_email = "queue_inform_email";
    public static final string queue_inform_sms = "queue_inform_sms";
    public static final string exchange_topics_inform="exchange_topics_inform";
    public static final string routingkey_email="inform.#.email.#";
    public static final string routingkey_sms="inform.#.sms.#";

    //声明交换机
    @bean(exchange_topics_inform)
    public exchange exchange_topics_inform(){
        //durable(true) 持久化,mq重启之后交换机还在
        return exchangebuilder.topicexchange(exchange_topics_inform).durable(true).build();
    }

    //声明queue_inform_email队列
    @bean(queue_inform_email)
    public queue queue_inform_email(){
        return new queue(queue_inform_email);
    }
    //声明queue_inform_sms队列
    @bean(queue_inform_sms)
    public queue queue_inform_sms(){
        return new queue(queue_inform_sms);
    }

    //routingkey_email队列绑定交换机,指定routingkey
    @bean
    public binding binding_queue_inform_email(@qualifier(queue_inform_email) queue queue,
                                              @qualifier(exchange_topics_inform) exchange exchange){
        return bindingbuilder.bind(queue).to(exchange).with(routingkey_email).noargs();
    }
    //routingkey_sms队列绑定交换机,指定routingkey
    @bean
    public binding binding_routingkey_sms(@qualifier(queue_inform_sms) queue queue,
                                              @qualifier(exchange_topics_inform) exchange exchange){
        return bindingbuilder.bind(queue).to(exchange).with(routingkey_sms).noargs();
    }
}

 

消费者举例demo

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;

@configuration
public class rabbitmqconfig {
    public static final string queue_inform_email = "queue_inform_email";
    public static final string queue_inform_sms = "queue_inform_sms";
    public static final string exchange_topics_inform="exchange_topics_inform";
    public static final string routingkey_email="inform.#.email.#";
    public static final string routingkey_sms="inform.#.sms.#";

    //声明交换机
    @bean(exchange_topics_inform)
    public exchange exchange_topics_inform(){
        //durable(true) 持久化,mq重启之后交换机还在
        return exchangebuilder.topicexchange(exchange_topics_inform).durable(true).build();
    }

    //声明queue_inform_email队列
    @bean(queue_inform_email)
    public queue queue_inform_email(){
        return new queue(queue_inform_email);
    }
    //声明queue_inform_sms队列
    @bean(queue_inform_sms)
    public queue queue_inform_sms(){
        return new queue(queue_inform_sms);
    }

    //routingkey_email队列绑定交换机,指定routingkey
    @bean
    public binding binding_queue_inform_email(@qualifier(queue_inform_email) queue queue,
                                              @qualifier(exchange_topics_inform) exchange exchange){
        return bindingbuilder.bind(queue).to(exchange).with(routingkey_email).noargs();
    }
    //routingkey_sms队列绑定交换机,指定routingkey
    @bean
    public binding binding_routingkey_sms(@qualifier(queue_inform_sms) queue queue,
                                              @qualifier(exchange_topics_inform) exchange exchange){
        return bindingbuilder.bind(queue).to(exchange).with(routingkey_sms).noargs();
    }
}

工作模式 

rabbitmq有以下几种工作模式 :

1、work queues

2、publish/subscribe

3、routing

4、topics

5、header

6、rpc

 

work queues

rabbitMq 初步

work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

测试:

1、使用入门程序,启动多个消费者。

2、生产者发送多个消息。

结果:

1、一条消息只会被一个消费者接收;

2、rabbit采用轮询的方式将消息是平均发送给消费者的;

3、消费者在处理完某条消息后,才会收到下一条消息。

 

publish/subscribe 发布订阅模式

rabbitMq 初步

发布订阅模式:

1、每个消费者监听自己的队列。

2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收

到消息

 

routin

rabbitMq 初步

路由模式:

1、每个消费者监听自己的队列,并且设置routingkey。

2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。

这是一种非常灵活的模式,经常被用到

 

topics

 

 rabbitMq 初步

路由模式:

1、每个消费者监听自己的队列,并且设置带统配符的routingkey。

2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

 

header模式

header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配

队列。

案例:

根据用户的通知设置去通知用户,设置接收email的用户只接收email,设置接收sms的用户只接收sms,设置两种

通知类型都接收的则两种通知都有效。

 

生产者demo:

 

map<string, object> headers_email = new hashtable<string, object>();
headers_email.put("inform_type", "email");
map<string, object> headers_sms = new hashtable<string, object>();
headers_sms.put("inform_type", "sms");
channel.queuebind(queue_inform_email,exchange_headers_inform,"",headers_email);
channel.queuebind(queue_inform_sms,exchange_headers_inform,"",headers_sms);

通知demo :

string message = "email inform to user"+i;
map<string,object> headers = new hashtable<string, object>();
headers.put("inform_type", "email");//匹配email通知消费者绑定的header
//headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
amqp.basicproperties.builder properties = new amqp.basicproperties.builder();
properties.headers(headers);
//email通知
channel.basicpublish(exchange_headers_inform, "", properties.build(), message.getbytes());

发送邮件消费者 : 

channel.exchangedeclare(exchange_headers_inform, builtinexchangetype.headers);
map<string, object> headers_email = new hashtable<string, object>();
headers_email.put("inform_email", "email");
//交换机和队列绑定
channel.queuebind(queue_inform_email,exchange_headers_inform,"",headers_email);
//指定消费队列
channel.basicconsume(queue_inform_email, true, consumer);

 

rpc

rabbitMq 初步

 

 

rpc即客户端远程调用服务端的方法 ,使用mq可以实现rpc的异步调用,基于direct交换机实现,流程如下:

1、客户端即是生产者就是消费者,向rpc请求队列发送rpc调用消息,同时监听rpc响应队列。

2、服务端监听rpc请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果

3、服务端将rpc方法 的结果发送到rpc响应队列

4、客户端(rpc调用方)监听rpc响应队列,接收到rpc调用结果。