rabbitMQ模式
消息生产者p将消息放入队列
消费者监听队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列删除
(隐患,消息可能没有被消费者正确处理,已经消失了,无法恢复)
应用场景:聊天室
案例:
1>.首先准备依赖
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-amqp</artifactid>
</dependency>
2>.写一个test类
public class simpletest {
//模拟生产者将消息放入队列
@test
public void send() throws exception{
/*1 创建连接工厂
* 2 配置共创config
* 3 获取连接
* 4获取信道
* 5 从信道声明queue
* 6 发送消息
* 7 释放资源
*/
connectionfactory factory=new connectionfactory();
factory.sethost("106.23.34.56");
factory.setport(5672);
factory.setvirtualhost("/tb");
factory.setusername("admin");
factory.setpassword("123456");
//从工厂获取连接
connection conn=factory.newconnection();
//从连接获取信道
channel chan=conn.createchannel();
//利用channel声明第一个队列
chan.queuedeclare("simple", false, false, false, null);
//queue string类型,表示声明的queue对列的名字
//durable boolean类型,表示是否持久化
//exclusive boolean类型:当前声明的queue是否专注;true当前连接创建的
//任何channle都可以连接这个queue,false,新的channel不可使用
//autodelete boolean类型:在最后连接使用完成后,是否删除队列,false
//arguments map类型,其他声明参数
//发送消息
string msg="helloworld,nihaoa";
chan.basicpublish("", "simple", null, msg.getbytes());
//exchange string类型,交换机名称,简单模式使用默认交换""
//routingkey string类型,当前的消息绑定的routingkey,简单模式下,与队列同名即可
//props basicproperties类型,消息的属性字段对象,例如basicproperties
//可以设置一个deliverymode的值0 持久化,1 表示不持久化,durable配合使用
//body byte[] :消息字符串的byte数组
}
//模拟消费端
@test
public void receive() throws exception{
connectionfactory factory=new connectionfactory();
factory.sethost("106.23.34.56");
factory.setport(5672);
factory.setvirtualhost("/tb");
factory.setusername("admin");
factory.setpassword("123456");
//从工厂获取连接
connection conn=factory.newconnection();//从连接获取信道channel chan=conn.createchannel();chan.queuedeclare("simple", false, false, false, null);//创建一个消费者queueingconsumer consumer= new queueingconsumer(chan);chan.basicconsume("simple", consumer);//监听队列while(true){//获取下一个delivery,delivery从队列获取消息delivery delivery = consumer.nextdelivery();string msg=new string(delivery.getbody());system.out.println(msg);}}}
2.work模式
生产者将消息放入队列
多个消费者同时监听同一个队列,消息如何被消费?
c1,c2共同争抢当前消息队列的内容,谁先拿到消息,谁来负责消费
应用场景:红包;大型项目中的资源调度过程(直接由最空闲的系统争抢到资源处理任务)
案例:
1>首先写一个工具类
public class connectionutil {
public static connection getconn(){
try{
connectionfactory factory=new connectionfactory();
factory.sethost("106.33.44.179");
factory.setport(5672);
factory.setvirtualhost("/tb");
factory.setusername("admin");
factory.setpassword("123456");
//从工厂获取连接
connection conn=factory.newconnection();
return conn;
}catch(exception e){
system.out.println(e.getmessage());
return null;
}
}
}
2>写test类
public class worktest {
@test
public void send() throws exception{
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//声明队列
chan.queuedeclare("work", false, false, false, null);
for(int i=0;i<100;i++){
string msg="1712,hello:"+i+"message";
chan.basicpublish("", "work", null, msg.getbytes());
system.out.println("第"+i+"条信息已经发送");
}
chan.close();
conn.close();
}
@test
public void receive1() throws exception{
//获取连接,获取信道
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
chan.queuedeclare("work", false, false, false, null);
//同一时刻服务器只发送一条消息给同一消费者,消费者空闲,才发送一条
chan.basicqos(1);
//定义消费者
queueingconsumer consumer=new queueingconsumer(chan);
//绑定队列和消费者的关系
//queue
//autoack:消息被消费后,是否自动确认回执,如果false,不自动需要手动在
//完成消息消费后进行回执确认,channel.ack,channel.nack
//callback
//chan.basicconsume(queue, autoack, callback)
chan.basicconsume("work", false, consumer);
//监听
while(true){
delivery delivery=consumer.nextdelivery();
byte[] result = delivery.getbody();
string msg=new string(result);
system.out.println("接受到:"+msg);
thread.sleep(50);
//返回服务器,回执
chan.basicack(delivery.getenvelope().getdeliverytag(), false);
}
}
@test
public void receive2() throws exception{
//获取连接,获取信道
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
chan.queuedeclare("work", false, false, false, null);
//同一时刻服务器只发送一条消息给同一消费者,消费者空闲,才发送一条
chan.basicqos(1);
//定义消费者
queueingconsumer consumer=new queueingconsumer(chan);
//绑定队列和消费者的关系
//queue
//autoack:消息被消费后,是否自动确认回执,如果false,不自动需要手动在
//完成消息消费后进行回执确认,channel.ack,channel.nack
//callback
//chan.basicconsume(queue, autoack, callback)
chan.basicconsume("work", false, consumer);
//监听
while(true){
delivery delivery=consumer.nextdelivery();
byte[] result = delivery.getbody();
string msg=new string(result);
system.out.println("接受到:"+msg);
thread.sleep(150);
//返回服务器,回执
chan.basicack(delivery.getenvelope().getdeliverytag(), false);
}
}
}
3 publish/fanout发布订阅
生产者将消息交给交换机
有交换机根据发布订阅的模式设定将消息同步到所有的绑定队列中;
后端的消费者都能拿到消息
应用场景:邮件群发,群聊天,广告
案例:
public class fanouttest {
//交换机,有类型,发布订阅:fanout
//路由模式:direct
//主题模式:topic
@test
public void send() throws exception {
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//声明交换机
//参数意义,1 交换机名称,2 类型:fanout,direct,topic
chan.exchangedeclare("fanoutex", "fanout");
//发送消息
for(int i=0;i<100;i++){
string msg="1712 hello:"+i+"msg";
chan.basicpublish("fanoutex", "", null, msg.getbytes());
system.out.println("第"+i+"条信息已经发送");
}
}
@test
public void receiv01() throws exception{
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//生命队列
chan.queuedeclare("fanout01", false, false, false, null);
//声明交换机
chan.exchangedeclare("fanoutex", "fanout");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queuebind("fanout01", "fanoutex", "");
chan.basicqos(1);
//定义消费者
queueingconsumer consumer=new queueingconsumer(chan);
//消费者与队列绑定
chan.basicconsume("fanout01",false, consumer);
while(true){
delivery delivery= consumer.nextdelivery();
system.out.println("一号消费者接收到"+
new string(delivery.getbody()));
chan.basicack(delivery.getenvelope().
getdeliverytag(), false);
}
}
@test
public void receiv02() throws exception{
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//生命队列
chan.queuedeclare("fanout02", false, false, false, null);
//声明交换机
chan.exchangedeclare("fanoutex", "fanout");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queuebind("fanout02", "fanoutex", "");
chan.basicqos(1);
//定义消费者
queueingconsumer consumer=new queueingconsumer(chan);
//消费者与队列绑定
chan.basicconsume("fanout02",false, consumer);
while(true){
delivery delivery= consumer.nextdelivery();
system.out.println("二号消费者接收到"+new string(delivery.getbody()));
chan.basicack(delivery.getenvelope().getdeliverytag(), false);
}
}
}
4 routing路由模式
生产者发送消息到交换机,同时绑定一个路由key,交换机根据路由key对下游绑定的队列进行路
由key的判断,满足路由key的队列才会接收到消息,消费者消费消息
应用场景: 项目中的error报错
案例:
public class routingtopictest {
@test
public void routingsend() throws exception{
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//声明交换机
//参数意义,1 交换机名称,2 类型:fanout,direct,topic
chan.exchangedeclare("directex", "direct");
//发送消息
string msg="路由模式的消息";
chan.basicpublish("directex", "jt1713",
null, msg.getbytes());
}
@test
public void routingrec01() throws exception{
system.out.println("一号消费者等待接收消息");
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//声明队列
chan.queuedeclare("direct01", false, false, false, null);
//声明交换机
chan.exchangedeclare("directex", "direct");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queuebind("direct01", "directex", "jt1712");
chan.basicqos(1);
//定义消费者
queueingconsumer consumer=new queueingconsumer(chan);
//消费者与队列绑定
chan.basicconsume("direct01",false, consumer);
while(true){
delivery delivery= consumer.nextdelivery();
system.out.println("一号消费者接收到"+
new string(delivery.getbody()));
chan.basicack(delivery.getenvelope().
getdeliverytag(), false);
}
}
@test
public void routingrec02() throws exception{
system.out.println("二号消费者等待接收消息");
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//声明队列
chan.queuedeclare("direct02", false, false, false, null);
//声明交换机
chan.exchangedeclare("directex", "direct");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queuebind("direct02", "directex", "jt1711");
chan.basicqos(1);
//定义消费者
queueingconsumer consumer=new queueingconsumer(chan);
//消费者与队列绑定
chan.basicconsume("direct02",false, consumer);
while(true){
delivery delivery= consumer.nextdelivery();
system.out.println("二号消费者接收到"+
new string(delivery.getbody()));
chan.basicack(delivery.getenvelope().
getdeliverytag(), false);
}
}
}
5 topic主题模式
*号代表单个词语
#代表多个词语
其他的内容与routing路由模式一致
案例:
public class routingtopictest {
@test
public void routingrec02() throws exception{
system.out.println("二号消费者等待接收消息");
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//声明队列
chan.queuedeclare("direct02", false, false, false, null);
//声明交换机
chan.exchangedeclare("directex", "direct");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queuebind("direct02", "directex", "jt1711");
chan.basicqos(1);
//定义消费者
queueingconsumer consumer=new queueingconsumer(chan);
//消费者与队列绑定
chan.basicconsume("direct02",false, consumer);
while(true){
delivery delivery= consumer.nextdelivery();
system.out.println("二号消费者接收到"+
new string(delivery.getbody()));
chan.basicack(delivery.getenvelope().
getdeliverytag(), false);
}
}
@test
public void topicsend() throws exception{
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//声明交换机
//参数意义,1 交换机名称,2 类型:fanout,direct,topic
chan.exchangedeclare("topicex", "topic");
//发送消息
string msg="主题模式的消息";
chan.basicpublish("topicex", "jt1712.add.update",
null, msg.getbytes());
}
@test
public void topicrec01() throws exception{
system.out.println("一号消费者等待接收消息");
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//声明队列
chan.queuedeclare("topic01", false, false, false, null);
//声明交换机
chan.exchangedeclare("topicex", "topic");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queuebind("topic01", "topicex", "jt1712");
chan.basicqos(1);
//定义消费者
queueingconsumer consumer=new queueingconsumer(chan);
//消费者与队列绑定
chan.basicconsume("topic01",false, consumer);
while(true){
delivery delivery= consumer.nextdelivery();
system.out.println("一号消费者接收到"+
new string(delivery.getbody()));
chan.basicack(delivery.getenvelope().
getdeliverytag(), false);
}
}
@test
public void topicrec02() throws exception{
system.out.println("二号消费者等待接收消息");
//获取连接
connection conn = connectionutil.getconn();
channel chan = conn.createchannel();
//声明队列
chan.queuedeclare("topic02", false, false, false, null);
//声明交换机
chan.exchangedeclare("topicex", "topic");
//绑定队列到交换机
//参数 1 队列名称,2 交换机名称 3 路由key
chan.queuebind("topic02", "topicex", "jt1712.#");
chan.basicqos(1);
//定义消费者
queueingconsumer consumer=new queueingconsumer(chan);
//消费者与队列绑定
chan.basicconsume("topic02",false, consumer);
while(true){
delivery delivery= consumer.nextdelivery();
system.out.println("二号消费者接收到"+
new string(delivery.getbody()));
chan.basicack(delivery.getenvelope().
getdeliverytag(), false);
}
}
}