欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

rabbitMQ模式

程序员文章站 2023-10-28 17:29:10
1.hello 1.hello 1.hello 1.hello 消息生产者p将消息放入队列 消费者监听队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列删除(隐患,消息可能没有被消费者正确处理,已经消失了,无法恢复) 应用场景:聊天室 案例: 1>.首先准备依赖 < ......

rabbitMQ模式 消息生产者p将消息放入队列

消费者监听队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列删除
(隐患,消息可能没有被消费者正确处理,已经消失了,无法恢复)

应用场景:聊天室 

案例:

1>.首先准备依赖

<dependency>  
     <groupid>org.springframework.boot</groupid>  
     <artifactid>spring-boot-starter-amqp</artifactid>  
</dependency>

 

2>.写一个test类

 

public class simpletest {  
   //模拟生产者将消息放入队列  
   @test  
   public void send() throws exception{  
       /*1 创建连接工厂 
        * 2 配置共创config 
        * 3 获取连接 
        * 4获取信道 
        * 5 从信道声明queue 
        * 6 发送消息 
        * 7 释放资源 
        */  
       connectionfactory factory=new connectionfactory();  
       factory.sethost("106.23.34.56");  
       factory.setport(5672);  
       factory.setvirtualhost("/tb");  
       factory.setusername("admin");  
       factory.setpassword("123456");  
       //从工厂获取连接  
       connection conn=factory.newconnection();  
       //从连接获取信道  
       channel chan=conn.createchannel();  
       //利用channel声明第一个队列  
       chan.queuedeclare("simple", false, false, false, null);  
       //queue string类型,表示声明的queue对列的名字  
       //durable boolean类型,表示是否持久化  
       //exclusive boolean类型:当前声明的queue是否专注;true当前连接创建的  
       //任何channle都可以连接这个queue,false,新的channel不可使用  
       //autodelete boolean类型:在最后连接使用完成后,是否删除队列,false  
       //arguments map类型,其他声明参数  
       //发送消息  
       string msg="helloworld,nihaoa";  
       chan.basicpublish("", "simple", null, msg.getbytes());  
       //exchange string类型,交换机名称,简单模式使用默认交换""  
       //routingkey string类型,当前的消息绑定的routingkey,简单模式下,与队列同名即可  
       //props basicproperties类型,消息的属性字段对象,例如basicproperties  
       //可以设置一个deliverymode的值0 持久化,1 表示不持久化,durable配合使用  
       //body byte[] :消息字符串的byte数组  
   }  
   //模拟消费端  
   @test  
   public void receive() throws exception{

 

connectionfactory factory=new connectionfactory();  
factory.sethost("106.23.34.56");  
factory.setport(5672);  
factory.setvirtualhost("/tb");  
factory.setusername("admin");  
factory.setpassword("123456");  
//从工厂获取连接

connection conn=factory.newconnection();//从连接获取信道channel chan=conn.createchannel();chan.queuedeclare("simple", false, false, false, null);//创建一个消费者queueingconsumer consumer= new queueingconsumer(chan);chan.basicconsume("simple", consumer);//监听队列while(true){//获取下一个delivery,delivery从队列获取消息delivery delivery = consumer.nextdelivery();string msg=new string(delivery.getbody());system.out.println(msg);}}}

2.work模式

rabbitMQ模式

生产者将消息放入队列
多个消费者同时监听同一个队列,消息如何被消费?
c1,c2共同争抢当前消息队列的内容,谁先拿到消息,谁来负责消费
应用场景:红包;大型项目中的资源调度过程(直接由最空闲的系统争抢到资源处理任务) 

案例:

1>首先写一个工具类

public class connectionutil {
 
 public static connection getconn(){
   try{
     connectionfactory factory=new connectionfactory();
     factory.sethost("106.33.44.179");
     factory.setport(5672);
     factory.setvirtualhost("/tb");
     factory.setusername("admin");
     factory.setpassword("123456");
   
     //从工厂获取连接
     connection conn=factory.newconnection();
     return conn;
   }catch(exception e){
     system.out.println(e.getmessage());
     return null;
   }
   
 }
}

 

2>写test类

public class worktest {
 @test
 public void send() throws exception{
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //声明队列
   chan.queuedeclare("work", false, false, false, null);
   for(int i=0;i<100;i++){
     string msg="1712,hello:"+i+"message";
     chan.basicpublish("", "work", null, msg.getbytes());
     system.out.println("第"+i+"条信息已经发送");
   }
   chan.close();
   conn.close();
 }
 @test
 public void receive1() throws exception{
   //获取连接,获取信道
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   chan.queuedeclare("work", false, false, false, null);
   //同一时刻服务器只发送一条消息给同一消费者,消费者空闲,才发送一条
   chan.basicqos(1);
   //定义消费者
   queueingconsumer consumer=new queueingconsumer(chan);
   //绑定队列和消费者的关系
   //queue
   //autoack:消息被消费后,是否自动确认回执,如果false,不自动需要手动在
   //完成消息消费后进行回执确认,channel.ack,channel.nack
   //callback
   //chan.basicconsume(queue, autoack, callback)
   chan.basicconsume("work", false, consumer);
   //监听
   while(true){
     delivery delivery=consumer.nextdelivery();
     byte[] result = delivery.getbody();
     string msg=new string(result);
     system.out.println("接受到:"+msg);
     thread.sleep(50);
     //返回服务器,回执
     chan.basicack(delivery.getenvelope().getdeliverytag(), false);
   }  
 }
 @test
 public void receive2() throws exception{
   //获取连接,获取信道
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   chan.queuedeclare("work", false, false, false, null);
   //同一时刻服务器只发送一条消息给同一消费者,消费者空闲,才发送一条
   chan.basicqos(1);
   //定义消费者
   queueingconsumer consumer=new queueingconsumer(chan);
   //绑定队列和消费者的关系
   //queue
   //autoack:消息被消费后,是否自动确认回执,如果false,不自动需要手动在
   //完成消息消费后进行回执确认,channel.ack,channel.nack
   //callback
   //chan.basicconsume(queue, autoack, callback)
   chan.basicconsume("work", false, consumer);
   //监听
   while(true){
     delivery delivery=consumer.nextdelivery();
     byte[] result = delivery.getbody();
     string msg=new string(result);
     system.out.println("接受到:"+msg);
     thread.sleep(150);
     //返回服务器,回执
     chan.basicack(delivery.getenvelope().getdeliverytag(), false);
   }
 }
 
}

3 publish/fanout发布订阅

rabbitMQ模式
生产者将消息交给交换机
有交换机根据发布订阅的模式设定将消息同步到所有的绑定队列中;
后端的消费者都能拿到消息

应用场景:邮件群发,群聊天,广告

案例:

public class fanouttest {
 //交换机,有类型,发布订阅:fanout
 //路由模式:direct
 //主题模式:topic
 @test
 public void send() throws exception {
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //声明交换机
   //参数意义,1 交换机名称,2 类型:fanout,direct,topic
   chan.exchangedeclare("fanoutex", "fanout");
   //发送消息
   for(int i=0;i<100;i++){
     string msg="1712 hello:"+i+"msg";
     chan.basicpublish("fanoutex", "", null, msg.getbytes());
     system.out.println("第"+i+"条信息已经发送");
   }
 }
 
 @test
 public void receiv01() throws exception{
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //生命队列
   chan.queuedeclare("fanout01", false, false, false, null);
   //声明交换机
   chan.exchangedeclare("fanoutex", "fanout");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queuebind("fanout01", "fanoutex", "");
   chan.basicqos(1);
   //定义消费者
   queueingconsumer consumer=new queueingconsumer(chan);
   //消费者与队列绑定
   chan.basicconsume("fanout01",false, consumer);
   while(true){
     delivery delivery= consumer.nextdelivery();
     system.out.println("一号消费者接收到"+
     new string(delivery.getbody()));
     chan.basicack(delivery.getenvelope().
         getdeliverytag(), false);
   }
 }
 @test
 public void receiv02() throws exception{
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //生命队列
   chan.queuedeclare("fanout02", false, false, false, null);
   //声明交换机
   chan.exchangedeclare("fanoutex", "fanout");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queuebind("fanout02", "fanoutex", "");
   chan.basicqos(1);
   //定义消费者
   queueingconsumer consumer=new queueingconsumer(chan);
   //消费者与队列绑定
   chan.basicconsume("fanout02",false, consumer);
   while(true){
     delivery delivery= consumer.nextdelivery();
     system.out.println("二号消费者接收到"+new string(delivery.getbody()));
     chan.basicack(delivery.getenvelope().getdeliverytag(), false);
   }
 }
}

4 routing路由模式

rabbitMQ模式

 

生产者发送消息到交换机,同时绑定一个路由key,交换机根据路由key对下游绑定的队列进行路
由key的判断,满足路由key的队列才会接收到消息,消费者消费消息

应用场景: 项目中的error报错

案例:

public class routingtopictest {
 
 @test
 public void routingsend() throws exception{
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //声明交换机
   //参数意义,1 交换机名称,2 类型:fanout,direct,topic
   chan.exchangedeclare("directex", "direct");
   //发送消息
   string msg="路由模式的消息";
   chan.basicpublish("directex", "jt1713", 
       null, msg.getbytes());
 }
 @test
 public void routingrec01() throws exception{
   system.out.println("一号消费者等待接收消息");
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //声明队列
   chan.queuedeclare("direct01", false, false, false, null);
   //声明交换机
   chan.exchangedeclare("directex", "direct");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queuebind("direct01", "directex", "jt1712");
   chan.basicqos(1);
   //定义消费者
   queueingconsumer consumer=new queueingconsumer(chan);
   //消费者与队列绑定
   chan.basicconsume("direct01",false, consumer);
   while(true){
     delivery delivery= consumer.nextdelivery();
     system.out.println("一号消费者接收到"+
     new string(delivery.getbody()));
     chan.basicack(delivery.getenvelope().
         getdeliverytag(), false);
   }
 }
 @test
 public void routingrec02() throws exception{
   system.out.println("二号消费者等待接收消息");
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //声明队列
   chan.queuedeclare("direct02", false, false, false, null);
   //声明交换机
   chan.exchangedeclare("directex", "direct");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queuebind("direct02", "directex", "jt1711");
   chan.basicqos(1);
   //定义消费者
   queueingconsumer consumer=new queueingconsumer(chan);
   //消费者与队列绑定
   chan.basicconsume("direct02",false, consumer);
   while(true){
     delivery delivery= consumer.nextdelivery();
     system.out.println("二号消费者接收到"+
     new string(delivery.getbody()));
     chan.basicack(delivery.getenvelope().
         getdeliverytag(), false);
   }
 }
}

5 topic主题模式

rabbitMQ模式

*号代表单个词语
#代表多个词语

其他的内容与routing路由模式一致

案例:

public class routingtopictest {
 
 
 @test
 public void routingrec02() throws exception{
   system.out.println("二号消费者等待接收消息");
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //声明队列
   chan.queuedeclare("direct02", false, false, false, null);
   //声明交换机
   chan.exchangedeclare("directex", "direct");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queuebind("direct02", "directex", "jt1711");
   chan.basicqos(1);
   //定义消费者
   queueingconsumer consumer=new queueingconsumer(chan);
   //消费者与队列绑定
   chan.basicconsume("direct02",false, consumer);
   while(true){
     delivery delivery= consumer.nextdelivery();
     system.out.println("二号消费者接收到"+
     new string(delivery.getbody()));
     chan.basicack(delivery.getenvelope().
         getdeliverytag(), false);
   }
 }
 
 @test
 public void topicsend() throws exception{
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //声明交换机
   //参数意义,1 交换机名称,2 类型:fanout,direct,topic
   chan.exchangedeclare("topicex", "topic");
   //发送消息
   string msg="主题模式的消息";
   chan.basicpublish("topicex", "jt1712.add.update", 
       null, msg.getbytes());
 }
 @test
 public void topicrec01() throws exception{
   system.out.println("一号消费者等待接收消息");
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //声明队列
   chan.queuedeclare("topic01", false, false, false, null);
   //声明交换机
   chan.exchangedeclare("topicex", "topic");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queuebind("topic01", "topicex", "jt1712");
   chan.basicqos(1);
   //定义消费者
   queueingconsumer consumer=new queueingconsumer(chan);
   //消费者与队列绑定
   chan.basicconsume("topic01",false, consumer);
   while(true){
     delivery delivery= consumer.nextdelivery();
     system.out.println("一号消费者接收到"+
     new string(delivery.getbody()));
     chan.basicack(delivery.getenvelope().
         getdeliverytag(), false);
   }
 }
 @test
 public void topicrec02() throws exception{
   system.out.println("二号消费者等待接收消息");
   //获取连接
   connection conn = connectionutil.getconn();
   channel chan = conn.createchannel();
   //声明队列
   chan.queuedeclare("topic02", false, false, false, null);
   //声明交换机
   chan.exchangedeclare("topicex", "topic");
   //绑定队列到交换机
   //参数 1 队列名称,2 交换机名称 3 路由key
   chan.queuebind("topic02", "topicex", "jt1712.#");
   chan.basicqos(1);
   //定义消费者
   queueingconsumer consumer=new queueingconsumer(chan);
   //消费者与队列绑定
   chan.basicconsume("topic02",false, consumer);
   while(true){
     delivery delivery= consumer.nextdelivery();
     system.out.println("二号消费者接收到"+
     new string(delivery.getbody()));
     chan.basicack(delivery.getenvelope().
         getdeliverytag(), false);
   }
 }
}