欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

初识RabbitMQ

程序员文章站 2022-03-24 12:21:05
好久没有更新技术博客了,今天正好有点时间,自学了一下RabbitMQ,现在就将我所学到的东西分享给大家,让你们也能同我一起进步,在程序员的技术道路上,一直前进下去。 首先学习技术,我觉得还是能在官网去学习,这样才能学到最权威,最新的技术。毕竟是初学者,如果就一味的通过百度去学习技术,有可能会遇到不断 ......

  好久没有更新技术博客了,今天正好有点时间,自学了一下rabbitmq,现在就将我所学到的东西分享给大家,让你们也能同我一起进步,在程序员的技术道路上,一直前进下去。

  首先学习技术,我觉得还是能在官网去学习,这样才能学到最权威,最新的技术。毕竟是初学者,如果就一味的通过百度去学习技术,有可能会遇到不断的坑,只看别人写的博客,对于初学者来说,又不知对否,只能一味的接纳,最终会遇到不可预料的坑,都不知道如何下手,所以我建议大家能去官网学习,虽然说官网都是英文的,对于一些英语能力不好的同学,可能看的会有点头疼,但是只要自己坚持下去,一直看英文文档,遇到不懂的词,可以查一下,这样日积月累,达到从量变到质变的过程,最终再看什么英文文档,都不在话下了。好了,接下来,进入正题。

一、安装rabbitmq

链接: https://pan.baidu.com/s/1zsnla1ib0o05aehnjijxga 提取码: 2sph 

可以在这里下载安装文档

二、6种队列模式

rabbitmq的官网地址是http://www.rabbitmq.com,进入官网教程(rabbitmq tutorials),看到有6个模式:

1、简单的队列模式

初识RabbitMQ

这个图是官网上面的图,p是生产者,是发送消息的一方,c是消费者,是接收消息的一方,可以把rabbitmq和邮局类比,当你想要寄信的时候,你会把信放在邮箱里,然后等待邮差把信收走,然后送到想要寄给的那个人,唯一不同点就是邮局是处理纸质信件,然而rabbitmq是处理二进制数据。有三个术语,生产者(producer)是发送消息的一方,队列(queue)相当于邮箱,存储消息,消费者(consumer)是接收消息的一方,注意:生产者、消费者、队列可以不再同一台机器上,可以分布在不同的机器,一个应用既可以是生产者,也可以是消费者。

接下来开始用代码实现简单队列模式:

使用maven项目完成这个例子,先在pom.xml文件里添加rabbitmq的依赖

<dependency>
<groupid>com.rabbitmq</groupid>
<artifactid>amqp-client</artifactid>
<version>3.4.4</version>
</dependency>

①建一个工具类rabbitmqconnection,方便连接rabbitmq
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;

/**
* 连接rabbitmq
*/
public class rabbitmqconnection {

private static connection connection;

static {
connectionfactory connectionfactory = new connectionfactory();
try{
/**
* rabbitmq服务器
*/
connectionfactory.sethost("127.0.0.1");
/**
* 虚拟主机名
*/
connectionfactory.setvirtualhost("/testrabbitmq");
/**
* 用户名
*/
connectionfactory.setusername("renruibin");
/**
* 密码
*/
connectionfactory.setpassword("111111");
/**
* 默认端口
*/
connectionfactory.setport(5672);
connection = connectionfactory.newconnection();
} catch (exception e){
e.printstacktrace();
}
}

/**
* 获取连接
* @return
*/
public static connection getconnection(){
return connection;
}
}
②建立生产者,发送消息
/**
* 生产者
*/
public class sendmessage {

/**
* 定义队列名称
*/
private static final string queue_name = "simple_queue";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
string message = "第"+new random().nextint()+"条消息";
channel.basicpublish("",queue_name,null,message.getbytes());
system.out.println("发送"+message+"成功");
}
}
执行这段程序之后,会发送一条消息到rabbitmq服务器的队列里,可以访问localhost:15672查看
目前有一个连接

初识RabbitMQ

目前有一个管道

初识RabbitMQ

目前有一个队列,而且有一条消息等待被消费

初识RabbitMQ

③建立消费者,去消费这条消息

public class receivemessage {

/**
* 定义队列名称
*/
private static final string queue_name = "simple_queue";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,true,consumer);
/**
* 6、开始消费数据
*/
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("收到"+new string(delivery.getbody())+"成功");
}
}
执行这段程序之后,会消费simple_queue队列里的一条消息,看一下控制台的情况

初识RabbitMQ

可以看到消息已经被消费了,simple_queue队列里的消息已经空了

总结:简单队列模式就是生产者发送一条消息到队列里,消费者从队列里获取消息,不会经过交换机,也没有路由规则,这种模式是p2p的,也就是点对点,一个生产者只能对应一个消费者,不支持一个生产者对应多个消费者。

2、工作模式

顾名思义:工作模式的意思是这个模式跟工作一样,员工分工合作,每人干的活都是一样多的,没有竞争,就相当于多个消费者都是获取的消息是不一样的,所有消费者消费的消息之和为总的消息,与kafka的consumer group很类似,也是同一个组内的消费者获取的消息互斥,之和为总的消息。

初识RabbitMQ

①建立生产者,去生产一批消息

/**
* 生产者
*/
public class sendmessage {

/**
* 定义队列名称
*/
private static final string queue_name = "work_queue";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,true,false,false,null);
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
for(int i=0;i<100;i++){
string message = "第"+i+"条消息";
channel.basicpublish("",queue_name,messageproperties.persistent_text_plain,message.getbytes());
system.out.println("发送"+message+"成功");
}
channel.close();
connection.close();
}
}
②建立消费者1,睡眠10毫秒,模拟工作能力快
public class receivemessage1 {

/**
* 定义队列名称
*/
private static final string queue_name = "work_queue";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,true,false,false,null);
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
/**
* 6、开始消费数据
*/
while (true){
thread.sleep(10);
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者1收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
③建立消费者2,睡眠1000毫秒,模拟工作能力差
public class receivemessage2 {

/**
* 定义队列名称
*/
private static final string queue_name = "work_queue";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,true,false,false,null);
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
/**
* 6、开始消费数据
*/
while (true){
thread.sleep(1000);
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者2收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
3、发布订阅模式

初识RabbitMQ

此种模式是生产者把消息发送到交换机(x)上,不用管是发送那哪个队列上了,消费者得把自己消费的那个队列与生产者的交换机进行绑定,通过一定的路由规则,才能把消息获取到。

①建立生产者,发送消息

/**
* 生产者(发布订阅模式)
*/
public class sendmessage {

/**
* 定义交换机名称
*/
private static final string exchange_name = "ps_exchange";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明交换机,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:交换机名称
* 第二个参数:交换机类型 fanout direct headers topic
*/
channel.exchangedeclare(exchange_name,"fanout");
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
for(int i=0;i<100;i++){
string message = "第"+i+"条消息";
channel.basicpublish(exchange_name,"",null,message.getbytes());
system.out.println("发送"+message+"成功");
}
channel.close();
connection.close();
}
}
执行这段程序以后,会发送100条消息,但是该交换机没有任何队列绑定,所以消息是会丢失的
②建立消费者1
public class receivemessage1 {

/**
* 定义交换机名称
*/
private static final string exchange_name = "ps_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "ps_queue_1";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
/**
* 6、开始消费数据
*/
while (true){
thread.sleep(10);
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者1收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
③建立消费者2public class receivemessage2 {

/**
* 定义交换机名称
*/
private static final string exchange_name = "ps_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "ps_queue_2";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
/**
* 6、开始消费数据
*/
while (true){
thread.sleep(10);
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者2收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
先把两个消费者启动,然后再启动一次生产者,可以看到两个消费者消费了同样的消息。消费者这块的代码与之前的不同点就是,增加了队列的绑定 channel.queuebind(queue_name,exchange_name,"");
消费者1消费的消息:
                        

消费者1收到第0条消息成功
消费者1收到第1条消息成功
消费者1收到第2条消息成功
消费者1收到第3条消息成功
消费者1收到第4条消息成功
消费者1收到第5条消息成功
消费者1收到第6条消息成功
消费者1收到第7条消息成功
消费者1收到第8条消息成功
消费者1收到第9条消息成功
消费者1收到第10条消息成功

  

 消费者2消费的消息:

消费者2收到第0条消息成功
消费者2收到第1条消息成功
消费者2收到第2条消息成功
消费者2收到第3条消息成功
消费者2收到第4条消息成功
消费者2收到第5条消息成功
消费者2收到第6条消息成功
消费者2收到第7条消息成功
消费者2收到第8条消息成功
消费者2收到第9条消息成功
消费者2收到第10条消息成功

可以在管控台看到多了一个交换机

初识RabbitMQ

看到多了两个队列

初识RabbitMQ

4、路由模式

初识RabbitMQ

这种模式是路由模式,有一定的路由规则,只有符合这个路由规则的队列,才可以消费消息,可以达到消费者自主选择想要消费的消息,不用一味的获取生产者发送的所有消息。

①建立生产者,发送一条消息

/**
* 生产者(路由模式)
*/
public class sendmessage {

/**
* 定义交换机名称
*/
private static final string exchange_name = "direct_exchange";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明交换机,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:交换机名称
* 第二个参数:交换机类型 fanout direct headers topic
*/
channel.exchangedeclare(exchange_name,"direct");

/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
string message = "warn日志";
channel.basicpublish(exchange_name,"warn",null,message.getbytes());
system.out.println("发送"+message+"成功");
channel.close();
connection.close();
}
}
执行完这段程序,会建立一个direct类型的交换机,并且指定的路由key是warn,在管控台可以看到多了一个交换机

初识RabbitMQ

此时还没有队列绑定,所以发送的消息是会丢失的

②建立一个消费者1,消费error的消息

public class receivemessage1 {

/**
* 定义交换机名称
*/
private static final string exchange_name = "direct_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "direct_queue_1";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"error");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
while (true){
/**
* 6、开始消费数据
*/
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者1收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
③建立消费者2,消费info和warn的消息
public class receivemessage2 {

/**
* 定义交换机名称
*/
private static final string exchange_name = "direct_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "direct_queue_2";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"info");
channel.queuebind(queue_name,exchange_name,"warn");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
while (true){
/**
* 6、开始消费数据
*/
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者2收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
重新执行生产者,生产一条warn的消息
生产者:发送warn日志成功
消费者1:
消费者2:消费者2收到warn日志成功
重新执行生产者,生产一条info的消息
生产者:发送info日志成功
消费者1:
消费者2:消费者2收到info日志成功

重新执行生产者,生产一条error的消息
生产者:发送error日志成功
消费者1:消费者1收到error日志成功
消费者2:
5、主题模式

初识RabbitMQ

 此种模式是之前两种模式的一种升级,消费者可以很*的获取到自己想要的消息

  • * (star) can substitute for exactly one word.      一个
  • # (hash) can substitute for zero or more words. 0个或多个

 初识RabbitMQ

如果把绑定key设为#,此种模式就相当于fanout,如果在绑定key里没有使用*和#,此种模式就相当于direct,所以topic模式非常的灵活

①建立生产者,发送消息

/**
* 生产者(路由模式)
*/
public class sendmessage {

/**
* 定义交换机名称
*/
private static final string exchange_name = "topic_exchange";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明交换机,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:交换机名称
* 第二个参数:交换机类型 fanout direct headers topic
*/
channel.exchangedeclare(exchange_name,"topic");

/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
string message = "quick.orange.rabbit";
channel.basicpublish(exchange_name,"quick.orange.rabbit",null,message.getbytes());
system.out.println("发送"+message+"成功");
channel.close();
connection.close();
}
}
执行这段程序,会建立一个topic类型的交换机,在管控台可以看到多了一个交换机

初识RabbitMQ但是此时没有队列绑定,所以消息会丢失。

②建立消费者1,路由key为*.orange.*

public class receivemessage1 {

/**
* 定义交换机名称
*/
private static final string exchange_name = "topic_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "topic_queue_1";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"*.orange.*");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
while (true){
/**
* 6、开始消费数据
*/
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者1收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
③建立消费者2,路由key是*.*.rabbit,lazy.#
public class receivemessage2 {

/**
* 定义交换机名称
*/
private static final string exchange_name = "topic_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "topic_queue_2";

public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"*.*.rabbit");
channel.queuebind(queue_name,exchange_name,"lazy.#");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
while (true){
/**
* 6、开始消费数据
*/
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者1收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
重新执行生产者发送消息,生产一条quick.orange.rabbit消息
生产者:发送quick.orange.rabbit成功
消费者1:消费者1收到quick.orange.rabbit成功
消费者2:消费者2收到quick.orange.rabbit成功

重新执行生产者发送消息,生产一条lazy.orange.elephant消息
生产者:发送lazy.orange.elephant成功
消费者1:消费者1收到lazy.orange.elephant成功
消费者2:消费者2收到lazy.orange.elephant成功

重新执行生产者发送消息,生产一条quick.orange.fox消息
生产者:发送quick.orange.fox成功
消费者1:消费者1收到quick.orange.fox成功
消费者2:

重新执行生产者发送消息,生产一条lazy.pink.rabbit消息
生产者:发送lazy.pink.rabbit成功
消费者1:
消费者2:消费者2收到lazy.pink.rabbit成功

重新执行生产者发送消息,生产一条quick.brown.fox消息
生产者:发送quick.brown.fox成功
消费者1:
消费者2:
6、rpc模式

初识RabbitMQ

这种模式不常用,因为有专业的技术,去实现,所以就没有必要去学习这个模式了。过几天再研究一下rabbitmq的集群模式,再来跟大家分享。