RabbitMQ基础个人总结篇
程序员文章站
2022-05-07 12:08:48
...
RabbitMQ个人快速总结
简介:
1.RabbitMQ:是一个消息中间件同样的产品还有kafka,activeMQ等,它是实现了AMQP应用层协议的框架,它是用ErLang写的,用于系统与系统之间的通信,应用场景有[实现商品数据的同步].
2.RabbitMQ:支持多种语言开发java,python,c,c++,.net,node.js等.
支持多种环境linux,windows,MacOX.
搭建RabbitMQ环境
windows下的环境搭建:
-
安装ErLang的环境;[相当于Java开发安装的jdk];-------------------->[一路下一步]
注意:{这里建议安装位置在C盘下,其他盘下有可能停不了}[推荐使用默认的路径. 系统的用户名必须为英文. 计算机名必须是英文] {如果出现错误,请点击忽略按钮} -
安装RabbitMQ;双击打开,必选两项RabbitMQ Service,Start Menu.—
–>next{选择默认的路径C盘}–>install—>finsh完成
注意:找到MQ的黑窗口输入:rabbitmq-plugins enable rabbitmq_management上网页输入127.0.0.1:15672输入账号密码guest/guest登陆,成功登陆,就表示安装成功.
一.[简单队列模式]生产者消费者代码
send生产者[代码]`.
// 生产者存入MQ的类
public class Send {
//声明消息队列名称
private final static String QUEUE_NAME="test_name";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接
Connection connection=ConnectionUtil.getConnection();
//创建channal通道
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消息内容
String message="wyd";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("生产者:" + message);
//关闭通道和连接
channel.close();
connection.close();
}
}
Recv消费者[代码]`.
//消费者消费的类
public class Recv {
//声明消息队列名称
private final static String QUEUE_NAME="test_name";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接
Connection connection=ConnectionUtil.getConnection();
//创建channal通道
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义队列的消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consumer.nextDelivery();
String message=new String(delivery.getBody);
System.out.println("消费者:"+message);
}
}
}
二.[Work模式一]生产者消费者代码
send生产者[代码]`.
// 生产者存入MQ的类
public class Send {
//声明消息队列名称
private final static String QUEUE_NAME="test_queue_work";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for(int i=0;i<100;i++){
//消息内容
String message=""+i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("生产者:"+message);
Thread.sleep(i * 10);
}
//关闭通道,连接
channel.close();
connection.close();
Recv1消费者[代码]`
//消费者消费的类
public class Recv1 {
//声明消息队列名称
private final static String QUEUE_NAME="test_queue_work";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
//定义队列消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consummer.nextDelivery();
String message=new String(delivery.getBody());
System.out.println("消费者1:"+message);
//休眠
Thread.sleep(10);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
Recv2消费者[代码]`
//消费者消费的类
public class Recv2 {
//声明消息队列名称
private final static String QUEUE_NAME="test_queue_work";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
//定义队列消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consummer.nextDelivery();
String message=new String(delivery.getBody());
System.out.println("消费者2:"+message);
//休眠1s
Thread.sleep(1000);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
测试work工作模式结果:
消费者1.2获取的消息数量是一样的,一个奇数一个偶数.
消费者1,2获取的内容是不同的,同一个消息只能被一个消费者获取.
这样不合理:消费者1要比消费者2获取的消息多才对.
[Work模式二"能者多劳"]生产者消费者代码
只需要把"work模式一"的:
解开注释
//同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
测试结果:
消费者1比消费者2获取的消息更多.
消息的确认模式
模式一:自动确认
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consumer.nextDelivery();
String message=new String(delivery.getBody);
System.out.println("消费者:"+message);
}
模式二:手动确认
//监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consummer.nextDelivery();
String message=new String(delivery.getBody());
System.out.println("消费者2:"+message);
//休眠1s
Thread.sleep(1000);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
三.[订阅模式]
send生产者[代码]`.
// 生产者存入MQ的类
public class Send {
//声明消息队列名称
private final static String EXCHANGE_NAME="test_exchange_fanout";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明exchange
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//消息内容
String message="商品已被更新,id=10001";
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("后台系统:"+message);
channel.close();
connection.close();
Recv1消费者[代码]`.
//消费者消费的类
public class Recv1 {
//声明消息队列名称
private final static String QUEUE_NAME="test_queue_ps_1";
//声明消息队列名称
private final static String EXCHANGE_NAME="test_exchange_fanout";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义队列消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consummer.nextDelivery();
String message=new String(delivery.getBody());
System.out.println("前台系统:"+message);
Thread.sleep(10);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
Recv2消费者[代码]`.
//消费者消费的类
public class Recv2 {
//声明消息队列名称
private final static String QUEUE_NAME="test_queue_ps_2";
//声明消息队列名称
private final static String EXCHANGE_NAME="test_exchange_fanout";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义队列消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consummer.nextDelivery();
String message=new String(delivery.getBody());
System.out.println("搜索系统:"+message);
Thread.sleep(10);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
四[路由模式]
send生产者[代码]`.
// 生产者存入MQ的类
public class Send {
//声明消息队列名称
private final static String EXCHANGE_NAME="test_exchange_direct";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明exchange
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//消息内容
//String message="商品更新,id=10002";
//channel.basicPublish(EXCHANGE_NAME,"update",null,message.getBytes());
//String message="商品删除,id=10002";
//channel.basicPublish(EXCHANGE_NAME,"delete",null,message.getBytes());
String message="商品新增,id=10003";
channel.basicPublish(EXCHANGE_NAME,"insert",null,message.getBytes());
System.out.println("后台系统:"+message);
channel.close();
connection.close();
Recv1消费者[代码]`.
//消费者消费的类
public class Recv1 {
//声明消息队列名称
private final static String QUEUE_NAME="test_queue_direct_1";
//声明消息队列名称
private final static String EXCHANGE_NAME="test_exchange_direct";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义队列消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consummer.nextDelivery();
String message=new String(delivery.getBody());
System.out.println("前台系统:"+message);
Thread.sleep(10);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
Recv2消费者[代码]`.
//消费者消费的类
public class Recv2 {
//声明消息队列名称
private final static String QUEUE_NAME="test_queue_direct_2";
//声明消息队列名称
private final static String EXCHANGE_NAME="test_exchange_direct";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"insert");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义队列消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consummer.nextDelivery();
String message=new String(delivery.getBody());
System.out.println("搜索系统:"+message);
Thread.sleep(10);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
五[通配符模式]
send生产者[代码]`.
// 生产者存入MQ的类
public class Send {
//声明消息队列名称
private final static String EXCHANGE_NAME="test_exchange_topic";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明exchange
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//消息内容
//String message="商品更新,id=10002";
//channel.basicPublish(EXCHANGE_NAME,"item.update",null,message.getBytes());
//String message="商品删除,id=10002";
//channel.basicPublish(EXCHANGE_NAME,"item.delete",null,message.getBytes());
String message="商品新增,id=10003";
channel.basicPublish(EXCHANGE_NAME,"item.insert",null,message.getBytes());
System.out.println("后台系统:"+message);
channel.close();
connection.close();
Recv1消费者[代码]`.
//消费者消费的类
public class Recv1 {
//声明消息队列名称
private final static String QUEUE_NAME="test_queue_topic_1";
//声明消息队列名称
private final static String EXCHANGE_NAME="test_exchange_topic";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.delete");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义队列消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consummer.nextDelivery();
String message=new String(delivery.getBody());
System.out.println("前台系统:"+message);
Thread.sleep(10);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
Recv2消费者[代码]`.
//消费者消费的类
public class Recv2 {
//声明消息队列名称
private final static String QUEUE_NAME="test_queue_topic_2";
//声明消息队列名称
private final static String EXCHANGE_NAME="test_exchange_topic";
//方法
public static void main(String[] args)throws Exception{
//获取MQ连接,channal通道
Connection connection=ConnectionUtil.getConnection();
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.#");
//channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.#");
//channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.#");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//定义队列消费者
QueueingConsumer consumer=new QueueingConsumer(channel);
//监听队列,手动返回完成状态
channel.basicConsume(QUEUE_NAME,false,consumer);
//获取消息
while(true){
QueueingConsumer.Delivery delivery=consummer.nextDelivery();
String message=new String(delivery.getBody());
System.out.println("搜索系统:"+message);
Thread.sleep(10);
//返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
关于Spring–Rabbit,持久化,非持久化,后台系统发送到交换机,消费者接收消息等后续会补上
尽请关注!~
上一篇: C# Http请求模拟登录
下一篇: PHP-模拟请求和操作响应