springboot整合rabbitmq的示例代码
概述
- rabbitmq是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理。
- 它现实了amqp协议,并且遵循mozilla public license开源协议,它支持多种语言,可以方便的和spring集成。
- 消息队列使用消息将应用程序连接起来,这些消息通过像rabbitmq这样的消息代理服务器在应用程序之间路由。
基本概念
broker
用来处理数据的消息队列服务器实体
vhost
由rabbitmq服务器创建的虚拟消息主机,拥有自己的权限机制,一个broker里可以开设多个vhost,用于不同用户的权限隔离,vhost之间是也完全隔离的。
productor
产生用于消息通信的数据
channel
消息通道,在amqp中可以建立多个channel,每个channel代表一个会话任务。
exchange
direct
转发消息到routing-key指定的队列
fanout
fanout
转发消息到所有绑定的队列,类似于一种广播发送的方式。
topic
topic
按照规则转发消息,这种规则多为模式匹配,也显得更加灵活
queue
queue
- 队列是rabbitmq的内部对象,存储消息
- 以动态的增加消费者,队列将接受到的消息以轮询(round-robin)的方式均匀的分配给多个消费者。
binding
表示交换机和队列之间的关系,在进行绑定时,带有一个额外的参数binding-key,来和routing-key相匹配。
consumer
监听消息队列来进行消息数据的读取
springboot下三种exchange模式(fanout,direct,topic)实现
pom.xml中引用spring-boot-starter-amqp
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency>
增加rabbitmq配置
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
direct
direct模式一般情况下只需要定义queue 使用自带交换机(defaultexchange)无需绑定交换机
@configuration public class rabbitp2pconfigure { public static final string queue_name = "p2p-queue"; @bean public queue queue() { return new queue(queue_name, true); } }
@runwith(springrunner.class) @springboottest(classes = bootcoretestapplication.class) @slf4j public class rabbittest { @autowired private amqptemplate amqptemplate; /** * 发送 */ @test public void sendlazy() throws interruptedexception { city city = new city(234556666l, "direct_name", "direct_code"); amqptemplate.convertandsend(rabbitlazyconfigure.queue_name, city); } /** * 领取 */ @test public void receive() throws interruptedexception { object obj = amqptemplate.receiveandconvert(rabbitlazyconfigure.queue_name); assert.notnull(obj, ""); log.debug(obj.tostring()); } }
适用场景:点对点
fanout
fanout则模式需要将多个queue绑定在同一个交换机上
@configuration public class rabbitfanoutconfigure { public static final string exchange_name = "fanout-exchange"; public static final string fanout_a = "fanout.a"; public static final string fanout_b = "fanout.b"; public static final string fanout_c = "fanout.c"; @bean public queue amessage() { return new queue(fanout_a); } @bean public queue bmessage() { return new queue(fanout_b); } @bean public queue cmessage() { return new queue(fanout_c); } @bean public fanoutexchange fanoutexchange() { return new fanoutexchange(exchange_name); } @bean public binding bindingexchangea(queue amessage, fanoutexchange fanoutexchange) { return bindingbuilder.bind(amessage).to(fanoutexchange); } @bean public binding bindingexchangeb(queue bmessage, fanoutexchange fanoutexchange) { return bindingbuilder.bind(bmessage).to(fanoutexchange); } @bean public binding bindingexchangec(queue cmessage, fanoutexchange fanoutexchange) { return bindingbuilder.bind(cmessage).to(fanoutexchange); } }
发送者
@slf4j public class sender { @autowired private amqptemplate rabbittemplate; public void sendfanout(object message) { log.debug("begin send fanout message<" + message + ">"); rabbittemplate.convertandsend(rabbitfanoutconfigure.exchange_name, "", message); } }
我们可以通过@rabbitlistener监听多个queue来进行消费
@slf4j @rabbitlistener(queues = { rabbitfanoutconfigure.fanout_a, rabbitfanoutconfigure.fanout_b, rabbitfanoutconfigure.fanout_c }) public class receiver { @rabbithandler public void receivemessage(string message) { log.debug("received <" + message + ">"); } }
适用场景
- 大规模多用户在线(mmo)游戏可以使用它来处理排行榜更新等全局事件
- 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
- 分发系统使用它来广播各种状态和配置更新
- 在群聊的时候,它被用来分发消息给参与群聊的用户
topic
这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”,exchange会将消息转发到所有关注主题能与routekey模糊匹配的队列。
在进行绑定时,要提供一个该队列关心的主题,如“topic.# (“#”表示0个或若干个关键字,“*”表示一个关键字。 )
@configuration public class rabbittopicconfigure { public static final string exchange_name = "topic-exchange"; public static final string topic = "topic"; public static final string topic_a = "topic.a"; public static final string topic_b = "topic.b"; @bean public queue queuetopic() { return new queue(rabbittopicconfigure.topic); } @bean public queue queuetopica() { return new queue(rabbittopicconfigure.topic_a); } @bean public queue queuetopicb() { return new queue(rabbittopicconfigure.topic_b); } @bean public topicexchange exchange() { topicexchange topicexchange = new topicexchange(exchange_name); topicexchange.setdelayed(true); return new topicexchange(exchange_name); } @bean public binding bindingexchangetopic(queue queuetopic, topicexchange exchange) { return bindingbuilder.bind(queuetopic).to(exchange).with(rabbittopicconfigure.topic); } @bean public binding bindingexchangetopics(queue queuetopica, topicexchange exchange) { return bindingbuilder.bind(queuetopica).to(exchange).with("topic.#"); } }
同时去监听三个queue
@slf4j @rabbitlistener(queues = { rabbittopicconfigure.topic, rabbittopicconfigure.topic_a, rabbittopicconfigure.topic_b }) public class receiver { @rabbithandler public void receivemessage(string message) { log.debug("received <" + message + ">"); } }
通过测试我们可以发现
@runwith(springrunner.class) @springboottest(classes = bootcoretestapplication.class) public class rabbittest { @autowired private amqptemplate rabbittemplate; @test public void sendall() { rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, "topic.test", "send all"); } @test public void sendtopic() { rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic, "send topic"); } @test public void sendtopica() { rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic_a, "send topica"); } }
适用场景
- 分发有关于特定地理位置的数据,例如销售点
- 由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
- 股票价格更新(以及其他类型的金融数据更新)
- 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
- 云端的不同种类服务的协调
- 分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。
延迟队列
延迟消费:
- 如用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
- 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
延迟重试:
- 如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。
- 如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。
设置交换机延迟属性为true
@configuration public class rabbitlazyconfigure { public static final string queue_name = "lazy-queue-t"; public static final string exchange_name = "lazy-exchange-t"; @bean public queue queue() { return new queue(queue_name, true); } @bean public directexchange defaultexchange() { directexchange directexchange = new directexchange(exchange_name, true, false); directexchange.setdelayed(true); return directexchange; } @bean public binding binding() { return bindingbuilder.bind(queue()).to(defaultexchange()).with(queue_name); } }
发送时设置延迟时间即可
@slf4j public class sender { @autowired private amqptemplate rabbittemplate; public void sendlazy(object msg) { log.debug("begin send lazy message<" + msg + ">"); rabbittemplate.convertandsend(rabbitlazyconfigure.exchange_name, rabbitlazyconfigure.queue_name, msg, message -> { message.getmessageproperties().setheader("x-delay", 10000); return message; } ); } }
结束
各种使用案例请直接查看
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: java编程两种树形菜单结构的转换代码