springboot 整合 RabbitMQ实现消息队列
程序员文章站
2022-06-28 17:09:59
springboot 整合 RabbitMQ消息队列作为分布式系统中重要的组件,可以有效解决应用耦合,异步消息,流量削锋等系列问题,有利于实现高性能,高可用,可伸缩和最终一致性架构。可应用在业务解耦,消息异步分发(提高应用响应速度)。本文主要讲述springboot部署rabbitmq的一些简单消息发送。依赖包 org.springframewo...
springboot 整合 RabbitMQ
消息队列作为分布式系统中重要的组件,可以有效解决应用耦合,异步消息,流量削锋等系列问题,有利于实现高性能,高可用,可伸缩和最终一致性架构。可应用在业务解耦,消息异步分发(提高应用响应速度)。本文主要讲述springboot部署rabbitmq的一些简单消息发送。
依赖包
<!-- 消息队列 rabbitmq 依赖包 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- 消息队列 rabbitmq 依赖包 end -->
配置
写在application.properties文件
## ==============rabbitmq配置========= 后台 http://ip地址:15672/ spring.rabbitmq.host=ip地址
spring.rabbitmq.port= 5672 spring.rabbitmq.username=root
spring.rabbitmq.password=123456789
rabbitmq配置
消息名称
package com.sise.demo1.demo.common.config.amqp; /**
* author zxq
* date 2020/8/6 23:27
* 消息名称
*/ public class MQField { public static final String HELLO_STRING_QUEUE = "stringQueue"; public static final String HELLO_USER_QUEUE = "userQueue"; public static final String MY_WORKER_QUEUE = "workerQueue"; public static final String MY_FANOUT_A_QUEUE = "fanoutAQueue"; public static final String MY_FANOUT_B_QUEUE = "fanoutBQueue"; public static final String MY_FANOUT_EXCHANGE = "fanoutQueue"; public static final String MY_TOPIC_A_QUEUE = "topicAQueue"; public static final String MY_TOPIC_B_QUEUE = "topicBQueue"; public static final String MY_TOPIC_EXCHANGE = "topicExchange"; }
配置类
package com.sise.demo1.demo.common.config.amqp; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /**
* author zxq
* date 2020/8/6 23:25
* RabbitMQ消息队列配置类
*/ @Configuration public class RabbitMQConfig { /**
* 声明接收字符串的队列 Hello 默认
* @return
*/ /**
* 声明接收字符串的队列 Hello 默认
*
* @return
*/ @Bean public Queue stringQueue() { //boolean isDurable = true;//是否持久化 //boolean isExclusive = false; //仅创建者可以使用的私有队列,断开后自动删除 //boolean isAutoDelete = false; //当所有消费客户端连接断开后,是否自动删除队列 //Queue queue = new Queue(MQField.HELLO_STRING_QUEUE, isDurable, isExclusive, isAutoDelete); //return queue; //return new Queue(MQField.HELLO_STRING_QUEUE); //默认支持持久化 return QueueBuilder.durable(MQField.HELLO_STRING_QUEUE) //.exclusive() //.autoDelete() .build(); } /**
* 声明接收user对象的队列 Hello 支持持久化
*
* @return
*/ @Bean public Queue userQueue() { return QueueBuilder.durable(MQField.HELLO_USER_QUEUE).build(); } /**
* 声明WorkQueue队列 competing consumers pattern,多个消费者不会重复消费队列的相同消息
*
* @return
*/ @Bean public Queue workQueue() { return QueueBuilder.durable(MQField.MY_WORKER_QUEUE).build(); } /**
* 消息队列中最常见的模式:发布订阅模式
* <p>
* 声明发布订阅模式队列 Publish/Subscribe
* <p>
* exchange类型包括:direct, topic, headers 和 fanout
**/ /*fanout(广播)队列相关声明开始*/ @Bean public Queue fanOutAQueue() { return QueueBuilder.durable(MQField.MY_FANOUT_A_QUEUE).build(); } @Bean public Queue fanOutBQueue() { return QueueBuilder.durable(MQField.MY_FANOUT_B_QUEUE).build(); } @Bean FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange(MQField.MY_FANOUT_EXCHANGE).build(); //return new FanoutExchange(MQField.MY_FANOUT_EXCHANGE); } @Bean Binding bindingExchangeA(Queue fanOutAQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutAQueue).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue fanOutBQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutBQueue).to(fanoutExchange); } /*fanout队列相关声明结束*/ /*topic队列相关声明开始*/ @Bean public Queue topicAQueue() { return QueueBuilder.durable(MQField.MY_TOPIC_A_QUEUE).build(); } @Bean public Queue topicBQueue() { return QueueBuilder.durable(MQField.MY_TOPIC_B_QUEUE).build(); } @Bean TopicExchange topicExchange() { return (TopicExchange) ExchangeBuilder.topicExchange(MQField.MY_TOPIC_EXCHANGE).build(); }
业务层
消息生产者
package com.sise.demo1.demo.model.service.amqpService.object; import com.sise.demo1.demo.common.config.amqp.MQField; import com.sise.demo1.demo.model.entity.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /**
* author zxq
* date 2020/8/7 20:47
*/ @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public boolean send(User user){ boolean isOk = false; rabbitTemplate.convertAndSend(MQField.HELLO_USER_QUEUE,user); isOk = true; System.out.println("Sender发送对象:"+isOk); return isOk; } }
//消息消费者
package com.sise.demo1.demo.model.service.amqpService.object; import com.sise.demo1.demo.common.config.amqp.MQField; import com.sise.demo1.demo.model.entity.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /**
* author zxq
* date 2020/8/7 20:46
*/ @Component public class Receive { @RabbitListener(queues = MQField.HELLO_USER_QUEUE) @RabbitHandler public void process(User user){ try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } System.out.println("Receiver接收到的对象是 => " + user.toString()); } }
控制层
package com.sise.demo1.demo.controller.amqp; import com.sise.demo1.demo.common.ResultVO; import com.sise.demo1.demo.model.entity.User; import com.sise.demo1.demo.model.service.IUserService; import com.sise.demo1.demo.model.service.amqpService.object.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.util.StringUtils; /**
* author zxq
* date 2020/8/6 23:49
*/ @RestController @RequestMapping(value = "/test") public class Test { @Autowired private IUserService userService; @Autowired private Sender senderObject; @RequestMapping(value = "/object") public ResultVO object_index(@RequestParam(value = "userId", required = false) Integer userId){ if(StringUtils.isEmpty(userId)){ return ResultVO.getError("缺少参数 userId"); } User user = userService.get(userId); boolean bool = senderObject.send(user); return ResultVO.getSuccessWithData("发送:"+bool,user); } }
运行截图
此处已经简单地实现了一端发消息,一端进行了接收消息。
如有错误,或有更好的想法,请联系我,感激不尽!
本文地址:https://blog.csdn.net/weixin_41908336/article/details/107885311
推荐阅读
-
在PHP中如何使用RabbitMQ来实现消息的订阅和发布?
-
Springboot2 之 Spring Data Redis 实现消息队列——发布/订阅模式
-
PHP+memcache实现消息队列案例分享_PHP
-
【Springboot+Redis】Springboot+Redis实现消息队列(生产者/消费者、发布订阅模式)
-
redis实现消息队列(发布/订阅模式)
-
php Memcache 中实现消息队列_PHP
-
SpringBoot 2.0 整合sharding-jdbc中间件实现数据分库分表
-
redis用list做消息队列的实现示例
-
C#利用RabbitMQ实现消息订阅与发布
-
实现基于SpringBoot+Maven+Mysql+Redis+RabbitMQ 的高并发秒杀系统(限时秒杀)