Springboot 配置RabbitMQ文档的方法步骤
程序员文章站
2024-02-25 21:44:45
简介
rabbitmq是实现amqp(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗
概念:...
简介
rabbitmq是实现amqp(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗
概念:
- 生产者 消息的产生方,负责将消息推送到消息队列
- 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
- 队列 消息的寄存器,负责存放生产者发送的消息
- 交换机 负责根据一定规则分发生产者产生的消息
- 绑定 完成交换机和队列之间的绑定
模式:
- direct:直连模式,用于实例间的任务分发
- topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列
- headers:适用规则复杂的分发,用headers里的参数表达规则
- fanout:分发给所有绑定到该exchange上的队列,忽略routing key
springboot集成rabbitmq
一、引入maven依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> <version>1.5.2.release</version> </dependency>
二、配置application.properties
# rabbitmq spring.rabbitmq.host = dev-mq.a.pa.com spring.rabbitmq.port = 5672 spring.rabbitmq.username = admin spring.rabbitmq.password = admin spring.rabbitmq.virtualhost = /message-test/
三、编写amqpconfiguration配置文件
package message.test.configuration; import org.springframework.amqp.core.acknowledgemode; import org.springframework.amqp.core.amqptemplate; import org.springframework.amqp.core.binding; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.directexchange; import org.springframework.amqp.core.queue; import org.springframework.amqp.rabbit.config.simplerabbitlistenercontainerfactory; import org.springframework.amqp.rabbit.connection.cachingconnectionfactory; import org.springframework.amqp.rabbit.connection.connectionfactory; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.boot.autoconfigure.amqp.rabbitproperties; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class amqpconfiguration { /** * 消息编码 */ public static final string message_encoding = "utf-8"; public static final string exchange_issue = "exchange_message_issue"; public static final string queue_issue_user = "queue_message_issue_user"; public static final string queue_issue_all_user = "queue_message_issue_all_user"; public static final string queue_issue_all_device = "queue_message_issue_all_device"; public static final string queue_issue_city = "queue_message_issue_city"; public static final string routing_key_issue_user = "routing_key_message_issue_user"; public static final string routing_key_issue_all_user = "routing_key_message_issue_all_user"; public static final string routing_key_issue_all_device = "routing_key_message_issue_all_device"; public static final string routing_key_issue_city = "routing_key_message_issue_city"; public static final string exchange_push = "exchange_message_push"; public static final string queue_push_result = "queue_message_push_result"; @autowired private rabbitproperties rabbitproperties; @bean public queue issueuserqueue() { return new queue(queue_issue_user); } @bean public queue issuealluserqueue() { return new queue(queue_issue_all_user); } @bean public queue issuealldevicequeue() { return new queue(queue_issue_all_device); } @bean public queue issuecityqueue() { return new queue(queue_issue_city); } @bean public queue pushresultqueue() { return new queue(queue_push_result); } @bean public directexchange issueexchange() { return new directexchange(exchange_issue); } @bean public directexchange pushexchange() { // 参数1:队列 // 参数2:是否持久化 // 参数3:是否自动删除 return new directexchange(exchange_push, true, true); } @bean public binding issueuserqueuebinding(@qualifier("issueuserqueue") queue queue, @qualifier("issueexchange") directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_user); } @bean public binding issuealluserqueuebinding(@qualifier("issuealluserqueue") queue queue, @qualifier("issueexchange") directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_user); } @bean public binding issuealldevicequeuebinding(@qualifier("issuealldevicequeue") queue queue, @qualifier("issueexchange") directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_device); } @bean public binding issuecityqueuebinding(@qualifier("issuecityqueue") queue queue, @qualifier("issueexchange") directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_city); } @bean public binding pushresultqueuebinding(@qualifier("pushresultqueue") queue queue, @qualifier("pushexchange") directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).withqueuename(); } @bean public connectionfactory defaultconnectionfactory() { cachingconnectionfactory connectionfactory = new cachingconnectionfactory(); connectionfactory.sethost(rabbitproperties.gethost()); connectionfactory.setport(rabbitproperties.getport()); connectionfactory.setusername(rabbitproperties.getusername()); connectionfactory.setpassword(rabbitproperties.getpassword()); connectionfactory.setvirtualhost(rabbitproperties.getvirtualhost()); return connectionfactory; } @bean public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory( @qualifier("defaultconnectionfactory") connectionfactory connectionfactory) { simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory(); factory.setconnectionfactory(connectionfactory); factory.setacknowledgemode(acknowledgemode.manual); return factory; } @bean public amqptemplate rabbittemplate(@qualifier("defaultconnectionfactory") connectionfactory connectionfactory) { return new rabbittemplate(connectionfactory); } }
三、编写生产者
body = json.tojsonstring(issuemessage).getbytes(amqpconfiguration.message_encoding); rabbittemplate.convertandsend(amqpconfiguration.exchange_issue, amqpconfiguration.routing_key_issue_user, body);
四、编写消费者
@rabbitlistener(queues = amqpconfiguration.queue_push_result) public void handlepushresult(@payload byte[] data, channel channel, @header(amqpheaders.delivery_tag) long deliverytag) { }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: centos下mysql主从复制设置详解