初识RabbitMQ
好久没有更新技术博客了,今天正好有点时间,自学了一下rabbitmq,现在就将我所学到的东西分享给大家,让你们也能同我一起进步,在程序员的技术道路上,一直前进下去。
首先学习技术,我觉得还是能在官网去学习,这样才能学到最权威,最新的技术。毕竟是初学者,如果就一味的通过百度去学习技术,有可能会遇到不断的坑,只看别人写的博客,对于初学者来说,又不知对否,只能一味的接纳,最终会遇到不可预料的坑,都不知道如何下手,所以我建议大家能去官网学习,虽然说官网都是英文的,对于一些英语能力不好的同学,可能看的会有点头疼,但是只要自己坚持下去,一直看英文文档,遇到不懂的词,可以查一下,这样日积月累,达到从量变到质变的过程,最终再看什么英文文档,都不在话下了。好了,接下来,进入正题。
一、安装rabbitmq
链接: https://pan.baidu.com/s/1zsnla1ib0o05aehnjijxga 提取码: 2sph
可以在这里下载安装文档
二、6种队列模式
rabbitmq的官网地址是http://www.rabbitmq.com,进入官网教程(rabbitmq tutorials),看到有6个模式:
1、简单的队列模式
这个图是官网上面的图,p是生产者,是发送消息的一方,c是消费者,是接收消息的一方,可以把rabbitmq和邮局类比,当你想要寄信的时候,你会把信放在邮箱里,然后等待邮差把信收走,然后送到想要寄给的那个人,唯一不同点就是邮局是处理纸质信件,然而rabbitmq是处理二进制数据。有三个术语,生产者(producer)是发送消息的一方,队列(queue)相当于邮箱,存储消息,消费者(consumer)是接收消息的一方,注意:生产者、消费者、队列可以不再同一台机器上,可以分布在不同的机器,一个应用既可以是生产者,也可以是消费者。
接下来开始用代码实现简单队列模式:
使用maven项目完成这个例子,先在pom.xml文件里添加rabbitmq的依赖
<dependency>
<groupid>com.rabbitmq</groupid>
<artifactid>amqp-client</artifactid>
<version>3.4.4</version>
</dependency>
①建一个工具类rabbitmqconnection,方便连接rabbitmq
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
/**
* 连接rabbitmq
*/
public class rabbitmqconnection {
private static connection connection;
static {
connectionfactory connectionfactory = new connectionfactory();
try{
/**
* rabbitmq服务器
*/
connectionfactory.sethost("127.0.0.1");
/**
* 虚拟主机名
*/
connectionfactory.setvirtualhost("/testrabbitmq");
/**
* 用户名
*/
connectionfactory.setusername("renruibin");
/**
* 密码
*/
connectionfactory.setpassword("111111");
/**
* 默认端口
*/
connectionfactory.setport(5672);
connection = connectionfactory.newconnection();
} catch (exception e){
e.printstacktrace();
}
}
/**
* 获取连接
* @return
*/
public static connection getconnection(){
return connection;
}
}
②建立生产者,发送消息
/**
* 生产者
*/
public class sendmessage {
/**
* 定义队列名称
*/
private static final string queue_name = "simple_queue";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
string message = "第"+new random().nextint()+"条消息";
channel.basicpublish("",queue_name,null,message.getbytes());
system.out.println("发送"+message+"成功");
}
}
执行这段程序之后,会发送一条消息到rabbitmq服务器的队列里,可以访问localhost:15672查看
目前有一个连接
目前有一个管道
目前有一个队列,而且有一条消息等待被消费
③建立消费者,去消费这条消息
public class receivemessage {
/**
* 定义队列名称
*/
private static final string queue_name = "simple_queue";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,true,consumer);
/**
* 6、开始消费数据
*/
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("收到"+new string(delivery.getbody())+"成功");
}
}
执行这段程序之后,会消费simple_queue队列里的一条消息,看一下控制台的情况
可以看到消息已经被消费了,simple_queue队列里的消息已经空了
总结:简单队列模式就是生产者发送一条消息到队列里,消费者从队列里获取消息,不会经过交换机,也没有路由规则,这种模式是p2p的,也就是点对点,一个生产者只能对应一个消费者,不支持一个生产者对应多个消费者。
2、工作模式
顾名思义:工作模式的意思是这个模式跟工作一样,员工分工合作,每人干的活都是一样多的,没有竞争,就相当于多个消费者都是获取的消息是不一样的,所有消费者消费的消息之和为总的消息,与kafka的consumer group很类似,也是同一个组内的消费者获取的消息互斥,之和为总的消息。
①建立生产者,去生产一批消息
/**
* 生产者
*/
public class sendmessage {
/**
* 定义队列名称
*/
private static final string queue_name = "work_queue";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,true,false,false,null);
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
for(int i=0;i<100;i++){
string message = "第"+i+"条消息";
channel.basicpublish("",queue_name,messageproperties.persistent_text_plain,message.getbytes());
system.out.println("发送"+message+"成功");
}
channel.close();
connection.close();
}
}
②建立消费者1,睡眠10毫秒,模拟工作能力快
public class receivemessage1 {
/**
* 定义队列名称
*/
private static final string queue_name = "work_queue";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,true,false,false,null);
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
/**
* 6、开始消费数据
*/
while (true){
thread.sleep(10);
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者1收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
③建立消费者2,睡眠1000毫秒,模拟工作能力差
public class receivemessage2 {
/**
* 定义队列名称
*/
private static final string queue_name = "work_queue";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,true,false,false,null);
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
/**
* 6、开始消费数据
*/
while (true){
thread.sleep(1000);
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者2收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
3、发布订阅模式
此种模式是生产者把消息发送到交换机(x)上,不用管是发送那哪个队列上了,消费者得把自己消费的那个队列与生产者的交换机进行绑定,通过一定的路由规则,才能把消息获取到。
①建立生产者,发送消息
/**
* 生产者(发布订阅模式)
*/
public class sendmessage {
/**
* 定义交换机名称
*/
private static final string exchange_name = "ps_exchange";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明交换机,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:交换机名称
* 第二个参数:交换机类型 fanout direct headers topic
*/
channel.exchangedeclare(exchange_name,"fanout");
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key,简单队列模式里,队列的名字就是路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
for(int i=0;i<100;i++){
string message = "第"+i+"条消息";
channel.basicpublish(exchange_name,"",null,message.getbytes());
system.out.println("发送"+message+"成功");
}
channel.close();
connection.close();
}
}
执行这段程序以后,会发送100条消息,但是该交换机没有任何队列绑定,所以消息是会丢失的
②建立消费者1
public class receivemessage1 {
/**
* 定义交换机名称
*/
private static final string exchange_name = "ps_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "ps_queue_1";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
/**
* 6、开始消费数据
*/
while (true){
thread.sleep(10);
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者1收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
③建立消费者2public class receivemessage2 {
/**
* 定义交换机名称
*/
private static final string exchange_name = "ps_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "ps_queue_2";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
/**
* 6、开始消费数据
*/
while (true){
thread.sleep(10);
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者2收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
先把两个消费者启动,然后再启动一次生产者,可以看到两个消费者消费了同样的消息。消费者这块的代码与之前的不同点就是,增加了队列的绑定 channel.queuebind(queue_name,exchange_name,"");
消费者1消费的消息:
消费者1收到第0条消息成功
消费者1收到第1条消息成功
消费者1收到第2条消息成功
消费者1收到第3条消息成功
消费者1收到第4条消息成功
消费者1收到第5条消息成功
消费者1收到第6条消息成功
消费者1收到第7条消息成功
消费者1收到第8条消息成功
消费者1收到第9条消息成功
消费者1收到第10条消息成功
消费者2消费的消息:
消费者2收到第0条消息成功
消费者2收到第1条消息成功
消费者2收到第2条消息成功
消费者2收到第3条消息成功
消费者2收到第4条消息成功
消费者2收到第5条消息成功
消费者2收到第6条消息成功
消费者2收到第7条消息成功
消费者2收到第8条消息成功
消费者2收到第9条消息成功
消费者2收到第10条消息成功
可以在管控台看到多了一个交换机
看到多了两个队列
4、路由模式
这种模式是路由模式,有一定的路由规则,只有符合这个路由规则的队列,才可以消费消息,可以达到消费者自主选择想要消费的消息,不用一味的获取生产者发送的所有消息。
①建立生产者,发送一条消息
/**
* 生产者(路由模式)
*/
public class sendmessage {
/**
* 定义交换机名称
*/
private static final string exchange_name = "direct_exchange";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明交换机,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:交换机名称
* 第二个参数:交换机类型 fanout direct headers topic
*/
channel.exchangedeclare(exchange_name,"direct");
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
string message = "warn日志";
channel.basicpublish(exchange_name,"warn",null,message.getbytes());
system.out.println("发送"+message+"成功");
channel.close();
connection.close();
}
}
执行完这段程序,会建立一个direct类型的交换机,并且指定的路由key是warn,在管控台可以看到多了一个交换机
此时还没有队列绑定,所以发送的消息是会丢失的
②建立一个消费者1,消费error的消息
public class receivemessage1 {
/**
* 定义交换机名称
*/
private static final string exchange_name = "direct_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "direct_queue_1";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"error");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
while (true){
/**
* 6、开始消费数据
*/
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者1收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
③建立消费者2,消费info和warn的消息
public class receivemessage2 {
/**
* 定义交换机名称
*/
private static final string exchange_name = "direct_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "direct_queue_2";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"info");
channel.queuebind(queue_name,exchange_name,"warn");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
while (true){
/**
* 6、开始消费数据
*/
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者2收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
重新执行生产者,生产一条warn的消息
生产者:发送warn日志成功
消费者1:
消费者2:消费者2收到warn日志成功
重新执行生产者,生产一条info的消息
生产者:发送info日志成功
消费者1:
消费者2:消费者2收到info日志成功
重新执行生产者,生产一条error的消息
生产者:发送error日志成功
消费者1:消费者1收到error日志成功
消费者2:
5、主题模式
此种模式是之前两种模式的一种升级,消费者可以很*的获取到自己想要的消息
- * (star) can substitute for exactly one word. 一个
- # (hash) can substitute for zero or more words. 0个或多个
如果把绑定key设为#,此种模式就相当于fanout,如果在绑定key里没有使用*和#,此种模式就相当于direct,所以topic模式非常的灵活
①建立生产者,发送消息
/**
* 生产者(路由模式)
*/
public class sendmessage {
/**
* 定义交换机名称
*/
private static final string exchange_name = "topic_exchange";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明交换机,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:交换机名称
* 第二个参数:交换机类型 fanout direct headers topic
*/
channel.exchangedeclare(exchange_name,"topic");
/**
* 4、发送消息
* 第一个参数:交换机名字
* 第二个参数:路由key
* 第三个参数:可以添加额外的配置
* 第四个参数:二进制的消息数据
*/
string message = "quick.orange.rabbit";
channel.basicpublish(exchange_name,"quick.orange.rabbit",null,message.getbytes());
system.out.println("发送"+message+"成功");
channel.close();
connection.close();
}
}
执行这段程序,会建立一个topic类型的交换机,在管控台可以看到多了一个交换机
但是此时没有队列绑定,所以消息会丢失。
②建立消费者1,路由key为*.orange.*
public class receivemessage1 {
/**
* 定义交换机名称
*/
private static final string exchange_name = "topic_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "topic_queue_1";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"*.orange.*");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
while (true){
/**
* 6、开始消费数据
*/
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者1收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
③建立消费者2,路由key是*.*.rabbit,lazy.#
public class receivemessage2 {
/**
* 定义交换机名称
*/
private static final string exchange_name = "topic_exchange";
/**
* 定义队列名称
*/
private static final string queue_name = "topic_queue_2";
public static void main(string[] args) throws exception{
/**
* 1、获取连接,相当于数据库连接
*/
connection connection = rabbitmqconnection.getconnection();
/**
* 2、创建通道,相当于statement
*/
channel channel = connection.createchannel();
/**
* 3、声明队列,如果没有,就创建(declaring a queue is idempotent - it will only be created if it doesn't exist already. )
* 第一个参数:队列名称
* 第二个参数:是否持久化
* 第三个参数:是否排外(设置了排外为true的队列只可以在本次的连接中被访问,新建的连接不可以访问)
* 第四个参数:是否自动删除(当最后一个连接断开时,就会删除)、第五个参数是一些配置
*/
channel.queuedeclare(queue_name,false,false,false,null);
/**
* 4、队列和交换机绑定
* 第一个参数:队列名称
* 第二个参数:交换机名称
* 第三个参数:路由key
*/
channel.queuebind(queue_name,exchange_name,"*.*.rabbit");
channel.queuebind(queue_name,exchange_name,"lazy.#");
/**
* 4、定义队列的消费者
*/
queueingconsumer consumer = new queueingconsumer(channel);
/**
* 5、监听队列,设置成自动确认接收成功
* 第一个参数:队列名称
* 第二个参数:是否自动ack
* 第三个参数:消费者
*/
channel.basicconsume(queue_name,false,consumer);
/**
* 避免不公平,实现多劳多得
* 意思是每次获取一条消息,确认以后,才可以再次获取消息
*/
channel.basicqos(1);
while (true){
/**
* 6、开始消费数据
*/
queueingconsumer.delivery delivery = consumer.nextdelivery();
system.out.println("消费者1收到"+new string(delivery.getbody())+"成功");
/**
* 7、确认消息已被消费
* 第一个参数:deliverytag
* 第二个参数:multiple
*/
channel.basicack(delivery.getenvelope().getdeliverytag(),false);
}
}
}
重新执行生产者发送消息,生产一条quick.orange.rabbit消息
生产者:发送quick.orange.rabbit成功
消费者1:消费者1收到quick.orange.rabbit成功
消费者2:消费者2收到quick.orange.rabbit成功
重新执行生产者发送消息,生产一条lazy.orange.elephant消息
生产者:发送lazy.orange.elephant成功
消费者1:消费者1收到lazy.orange.elephant成功
消费者2:消费者2收到lazy.orange.elephant成功
重新执行生产者发送消息,生产一条quick.orange.fox消息
生产者:发送quick.orange.fox成功
消费者1:消费者1收到quick.orange.fox成功
消费者2:
重新执行生产者发送消息,生产一条lazy.pink.rabbit消息
生产者:发送lazy.pink.rabbit成功
消费者1:
消费者2:消费者2收到lazy.pink.rabbit成功
重新执行生产者发送消息,生产一条quick.brown.fox消息
生产者:发送quick.brown.fox成功
消费者1:
消费者2:
6、rpc模式
这种模式不常用,因为有专业的技术,去实现,所以就没有必要去学习这个模式了。过几天再研究一下rabbitmq的集群模式,再来跟大家分享。
下一篇: JavaScript 系列博客(五)