《RabbitMQ系列教程-第六章-SpringBoot整合RabbitMQ》
程序员文章站
2022-06-24 10:34:46
第六章 SpringBoot整合RabbitMQ6.1 搭建消息生产者6.1.1 pom.xml:
教程说明
- 本系列教程目录大纲:《RabbitMQ系列教程-目录大纲》
- 本系列教程配套代码:https://gitee.com/Horizon1024/rabbitmt.git(码云地址)
第六章 SpringBoot整合RabbitMQ
6.1 搭建消息生产者
6.1.1 pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lscl</groupId>
<artifactId>05_springboot_rabbitmq_producer</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- springboot 工程-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
<!-- rabbitmq场景启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 测试环境场景启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
6.1.2 application.yml:
spring:
rabbitmq:
host: 192.168.40.139
port: 5672
username: lscl
password: admin
virtual-host: /lscl
6.1.3 引导类:
package com.lscl.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class);
}
}
6.1.4 配置消息队列:
6.1.4.1 Work模式:
SpringBoot
中提供有两种构建Queue
的方式,一种是通过new Queue()
的方式,另一种采用QueueBuilder
构建器构建一个Queue
;
package com.lscl.rabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class Config_01_Work {
/**
* 定义一个队列
* @return
*/
@Bean("bootWorkQueue")
public Queue bootWorkQueue() {
/*
第一种方式:
durable():代表需要持久化
exclusive(): 代表该队列独占(只允许有一个consumer监听)
autoDelete(): 代表需要自动删除(没有consumer自动删除)
withArgument(): 队列的其他参数
*/
// return QueueBuilder.durable("boot_work_queue").exclusive().autoDelete().withArgument("key", "val").build();
/*
第二种方式:
通过new Queue对象来创建队列
参数1: 队列名称
参数2: 是否持久化(默认:true)
参数3: 是否独占(默认:false)
参数4: 是否自动删除(默认:false)
参数5: 队列的其他参数
*/
return new Queue("boot_work_queue", true, false, false,null);
}
}
6.1.4.2 Pub/Sub模式:
package com.lscl.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Pub/Sub模式(交换机类型为Fanout)
*/
@Configuration
public class Config_03_Fanout {
@Bean("bootFanoutQueue1")
public Queue bootFanoutQueue1(){
return QueueBuilder.durable("boot_fanout_queue1").build();
}
@Bean("bootFanoutQueue2")
public Queue bootFanoutQueue2(){
return QueueBuilder.durable("boot_fanout_queue2").build();
}
// fanout类型交换机
@Bean("bootFanoutExchange")
public Exchange bootFanoutExchange(){
return ExchangeBuilder.fanoutExchange("boot_fanout_exchange").durable(true).build();
}
/**
* 交换机与队列进行绑定
*/
@Bean
public Binding bindFanout1(){
Queue bootFanoutQueue1 = bootFanoutQueue1();
Exchange bootFanoutExchange = bootFanoutExchange();
// fanout类型交换机routing key为 ""
return BindingBuilder.bind(bootFanoutQueue1).to(bootFanoutExchange).with("").noargs();
}
/**
* 交换机与队列进行绑定
* @return
*/
@Bean
public Binding bindFanout2(){
Queue bootFanoutQueue2 = bootFanoutQueue2();
Exchange bootFanoutExchange = bootFanoutExchange();
return BindingBuilder.bind(bootFanoutQueue2).to(bootFanoutExchange).with("").noargs();
}
}
6.1.4.3 Routing模式:
package com.lscl.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* routing模式(交换机类型为Direct)
*/
@Configuration
public class Config_02_Direct {
/**
* 准备两个队列boot_direct_queue1、boot_direct_queue2
* @return
*/
@Bean("bootDirectQueue1")
public Queue bootDirectQueue1(){
return QueueBuilder.durable("boot_direct_queue1").build();
}
@Bean("bootDirectQueue2")
public Queue bootDirectQueue2(){
return QueueBuilder.durable("boot_direct_queue2").build();
}
// direct类型交换机
@Bean("bootDirectExchange")
public Exchange bootDirectExchange(){
/*
第一种方式: 通过ExchangeBuilder构建交换机
durable: 是否持久化
autoDelete: 是否自动删除
withArgument: 交换机其他参数
*/
// return ExchangeBuilder.directExchange("boot_direct_exchange").durable(true).autoDelete().withArgument("key","val").build();
/*
第二种方式:
参数1: 是否持久化(默认false)
参数2: 是否自动删除(默认false)
参数3: 其他参数
*/
return new DirectExchange("boot_direct_exchange",true,false,null);
}
/**
* 交换机与队列进行绑定
*/
@Bean
public Binding bindDirect1(){
Queue bootDirectQueue1 = bootDirectQueue1();
Exchange bootDirectExchange = bootDirectExchange();
/*
第一种方式:
bind(Queue): 需要绑定的queue
to(Exchange): 需要绑定到哪个交换机
with(String): routing key
noargs(): 进行构建
*/
// return BindingBuilder.bind(bootDirectQueue1).to(bootDirectExchange).with("article").noargs();
/*
第一种方式:
参数1: 绑定的队列
参数2: 绑定的类型 Binding.DestinationType.QUEUE: 绑定的类型为queue(交换机不仅可以绑定queue还可以绑定exchange)
参数3: 哪个交换机需要绑定
参数4: routing key
参数5: 其他参数
*/
return new Binding("boot_direct_queue1", Binding.DestinationType.QUEUE,"boot_direct_exchange","red",null);
}
/**
* 交换机与队列进行绑定
* @return
*/
@Bean
public Binding bindDirect2(){
Queue bootDirectQueue2 = bootDirectQueue2();
Exchange bootDirectExchange = bootDirectExchange();
return BindingBuilder.bind(bootDirectQueue2).to(bootDirectExchange).with("green").noargs();
}
}
6.1.4.4 Topics模式
package com.lscl.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Topics模式(交换机类型为Topic)
*/
@Configuration
public class Config_04_Topic {
@Bean("bootTopicQueue1")
public Queue bootTopicQueue1(){
return QueueBuilder.durable("boot_topic_queue1").build();
}
@Bean("bootTopicQueue2")
public Queue bootTopicQueue2(){
return QueueBuilder.durable("boot_topic_queue2").build();
}
// topic类型交换机
@Bean("bootTopicExchange")
public Exchange bootTopicExchange(){
return ExchangeBuilder.topicExchange("boot_topic_exchange").durable(true).build();
}
/**
* 交换机与队列进行绑定
*/
@Bean
public Binding bindTopic1(){
Queue bootTopicQueue1 = bootTopicQueue1();
Exchange bootTopicExchange = bootTopicExchange();
return BindingBuilder.bind(bootTopicQueue1).to(bootTopicExchange).with("red.#").noargs();
}
/**
* 交换机与队列进行绑定
* @return
*/
@Bean
public Binding bindTopic2(){
Queue bootTopicQueue2 = bootTopicQueue2();
Exchange bootTopicExchange = bootTopicExchange();
return BindingBuilder.bind(bootTopicQueue2).to(bootTopicExchange).with("green.*").noargs();
}
}
6.1.5 测试类:
package com.lscl.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest(classes = ProducerApplication.class)
@RunWith(SpringRunner.class)
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* work 模式
* @throws Exception
*/
@Test
public void testWork() throws Exception{
rabbitTemplate.convertAndSend("boot_work_queue","boot work...");
}
/**
* Routing 模式(交换机类型为Direct)
* @throws Exception
*/
@Test
public void testDirect() throws Exception{
rabbitTemplate.convertAndSend("boot_direct_exchange","red","boot direct red...");
rabbitTemplate.convertAndSend("boot_direct_exchange","green","boot direct green...");
}
/**
* Pub/Sub 模式(交换机类型为Fanout)
* @throws Exception
*/
@Test
public void testFanout() throws Exception{
rabbitTemplate.convertAndSend("boot_fanout_exchange","","boot fanout....");
}
/**
* Topics模式(交换机类型为Topic)
* @throws Exception
*/
@Test
public void tesTopic() throws Exception{
rabbitTemplate.convertAndSend("boot_topic_exchange","red.xxx.xxx","boot topic red.# ...");
rabbitTemplate.convertAndSend("boot_topic_exchange","green.xxx","boot topic green.* ....");
}
}
6.2 搭建消息消费者
6.2.1 pom.xml:
和生产者依赖一致
6.2.2 application.yml:
和生产者一致
6.2.3 引导类:
package com.lscl.rabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
}
6.2.4 监听器:
6.2.4.1 Work 监听:
package com.lscl.rabbitmq.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Listener_01_Work {
/**
* 监听boot_work_queue队列的消息
* @param message
*/
@RabbitListener(queues = "boot_work_queue")
public void receive(Message message){
System.out.println("boot_work_queue: "+new String(message.getBody()));
}
}
6.2.4.2 Direct 监听:
package com.lscl.rabbitmq.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Listener_02_Direct {
@RabbitListener(queues = "boot_direct_queue1")
public void boot_direct_queue1(Message message){
System.out.println("boot_direct_queue1: "+new String(message.getBody()));
}
@RabbitListener(queues = "boot_direct_queue2")
public void boot_direct_queue2(Message message){
System.out.println("boot_direct_queue2: "+new String(message.getBody()));
}
}
6.2.4.3 其他监听
其他几个监听对象的代码都是一致的,只要把监听的队列名换了即可
6.3 配置Header类型
6.3.1 生产者
6.3.1.1 声明交换机绑定Queue
package com.lscl.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* header模式
*/
@Configuration
public class Config_05_Header {
// 声明一个queue
@Bean("bootHeaderQueue")
public Queue bootHeaderQueue(){
return QueueBuilder.durable("boot_header_queue").build();
}
// header类型交换机
@Bean("bootHeaderExchange")
public Exchange bootHeaderExchange(){
return ExchangeBuilder.headersExchange("boot_header_exchange").durable(true).build();
}
// 将exchange和queue进行绑定
@Bean
public Binding bindHeader(){
Queue queue = bootHeaderQueue();
Exchange exchange = bootHeaderExchange();
Map<String, Object> headers = new HashMap<>();
/*
all:Producer必须匹配所有的键值对
any:只要Producer匹配任意一个键值对即可
*/
headers.put("x-match", "any");
headers.put("key1", "147");
headers.put("key2", "258");
headers.put("key3", "369");
// routing key 为空
return BindingBuilder.bind(queue).to(exchange).with("").and(headers);
}
}
6.3.1.2 测试类:
@Autowired
private RabbitMessagingTemplate rabbitMessagingTemplate;
/**
* header模式
*
* @throws Exception
*/
@Test
public void tesHeader() throws Exception {
// 准备header参数
Map<String, Object> headers = new HashMap<>();
headers.put("key1", "147");
headers.put("key2", "258");
headers.put("key3", "369");
// 使用的是rabbitMessagingTemplate 而不是 rabbitTemplate
rabbitMessagingTemplate.convertAndSend("boot_header_exchange", "", "boot header....", headers);
}
6.3.2 消费者
6.3.2.1 监听器
package com.lscl.rabbitmq.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Listener_05_Header {
@RabbitListener(queues = "boot_header_queue")
public void boot_topic_queue1(Message message) {
System.out.println("boot_header_queue: " + new String(message.getBody()));
}
}
本文地址:https://blog.csdn.net/Bb15070047748/article/details/112187452
下一篇: Java构建JDBC应用程序的实例操作
推荐阅读
-
SpringBoot整合RabbitMQ之主题交换机模式
-
微服务技术系列教程(46)-SpringBoot整合MongoDB(文章评论案例)
-
SpringBoot整合RabbitMQ实战教程 RabbitMQ
-
SpringBoot整合RabbitMQ(二):定制MessageConverter
-
Springboot整合RabbitMQ(六):远程过程调用(RPC)
-
SpringBoot ~ 整合AMQP(RabbitMQ)
-
SpringBoot整合rabbitMQ,spring-boot-starter-amqp 的使用
-
【RabbitMQ】基本使用:Spring AMQP配置使用及SpringBoot整合
-
springboot整合rabbitmq DEMO
-
springboot整合rabbitmq 五种模式及死信队列的demo代码