RabbitMQ与spring结合-direct消息
程序员文章站
2022-05-17 08:48:01
...
RabbitMQ与spring结合-direct消息
https://docs.spring.io/spring-amqp/docs/2.1.6.RELEASE/reference/html/#_introduction
spring官网
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
项目结构
env.properties中存着配置
#mq config
mq.host=127.0.0.1
mq.username=root
mq.password=root123
mq.port=5672
mq.vhost=/
pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ghgcn</groupId>
<artifactId>spring-mq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.jdk>1.8</version.jdk>
</properties>
<dependencies>
<dependency>
<groupId>com.sun</groupId>
<artifactId>tools</artifactId>
<version>${version.jdk}</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<!--日志-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<scope>provided</scope>
</dependency>
<!--只需要集成这个-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dev</id>
<properties>
<env>dev</env>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
<profile>
<id>test</id>
<properties>
<env>test</env>
</properties>
</profile>
<profile>
<id>prod</id>
<properties>
<env>prod</env>
</properties>
</profile>
<profile>
<id>prep</id>
<properties>
<env>prep</env>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>${version.jdk}</source>
<target>${version.jdk}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
<filters>
<filter>src/env/${env}.properties</filter>
</filters>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
</build>
</project>
配置direct.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.1.xsd">
<!-- 工厂 -->
<rabbit:connection-factory
id="mqConnectionFactory" host="${mq.host}" port="${mq.port}"
username="${mq.username}" password="${mq.password}"
virtual-host="${mq.vhost}" publisher-confirms="true"
channel-cache-size="50" />
<!--管理 -->
<rabbit:admin id="mqadmin"
connection-factory="mqConnectionFactory" />
<!-- mq模版 -->
<rabbit:template id="amqpTemplate"
connection-factory="mqConnectionFactory" />
<!--声明交换器 -->
<rabbit:direct-exchange
name="direct_spring_exchange" durable="true">
<rabbit:bindings>
<!-- 绑定队列 与routerkey direct类型rounterkey必须完全一样 -->
<rabbit:binding queue="direct_spring_queue"
key="direct.rount.key"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--声明队列 -->
<rabbit:queue name="direct_spring_queue" durable="true"
exclusive="false" auto-delete="false" declared-by="mqadmin">
</rabbit:queue>
<bean id="directListener"
class="com.ghgcn.springmq.direct.listener.DirectListener" />
<!-- 监听器 -->
<rabbit:listener-container
connection-factory="mqConnectionFactory" acknowledge="manual">
<rabbit:listener queues="direct_spring_queue"
ref="directListener" />
</rabbit:listener-container>
</beans>
消息监听
package com.ghgcn.springmq.direct.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
/**
* Direct消息监听器
* @author Administrator
*
*/
@Slf4j
public class DirectListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
log.info("DirectListener 接收 -message {} ",message.getMessageProperties().getMessageId());
log.info("DirectListener 接收 -channel {} ",channel);
//消息确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
发送消息
package com.ghgcn.springmq.direct;
import java.util.Date;
import java.util.UUID;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.rabbitmq.client.MessageProperties;
public class DirectProducer {
private static String exchange="direct_spring_exchange";
private static String rountKey="direct.rount.key";
@SuppressWarnings("resource")
public static void main(String[] args) {
//amqpTemplate
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("direct.xml");
AmqpTemplate amqpTemplate = applicationContext.getBean("amqpTemplate", AmqpTemplate.class);
System.out.println("============direct message==============");
//构建消息
byte [] msgBytes = ("direct message test "+ new Date()).getBytes();
Message messager = MessageBuilder.withBody(msgBytes)
.setContentType(MessageProperties.TEXT_PLAIN.getContentType())
.setMessageId(UUID.randomUUID().toString())
.build();
//发送消息
amqpTemplate.convertAndSend(exchange, rountKey, messager);
System.out.println("===发送结束===");
}
}
结果
============direct message==============
11:13:55.033 [main] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2)
11:13:55.034 [main] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - Added listener aaa@qq.com
11:13:55.034 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 52838] to map, size now 1
11:13:55.034 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$98/1793799654 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 52838]
11:13:55.034 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'direct message test Fri May 31 11:13:55 CST 2019' MessageProperties [headers={}, messageId=0d425fbe-0726-4787-ad59-2ab59e260a88, contentType=text/plain, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [direct_spring_exchange], routingKey = [direct.rount.key]
===发送结束===
11:13:55.039 [pool-1-thread-4] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-JrSWWfVMe1lLqh8fHZkLPA' with deliveryTag: '1' in aaa@qq.com: tags=[[amq.ctag-JrSWWfVMe1lLqh8fHZkLPA]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 52838], acknowledgeMode=MANUAL local queue size=0
11:13:55.039 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'direct message test Fri May 31 11:13:55 CST 2019' MessageProperties [headers={}, messageId=0d425fbe-0726-4787-ad59-2ab59e260a88, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct_spring_exchange, receivedRoutingKey=direct.rount.key, deliveryTag=1, consumerTag=amq.ctag-JrSWWfVMe1lLqh8fHZkLPA, consumerQueue=direct_spring_queue])
11:13:55.039 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.direct.listener.DirectListener - DirectListener 接收 -message 0d425fbe-0726-4787-ad59-2ab59e260a88
11:13:55.040 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.direct.listener.DirectListener - DirectListener 接收 -channel Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 52838]
11:13:55.068 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2) PC:Ack:1:false
推荐阅读
-
Spring Boot与RabbitMQ结合实现延迟队列的示例
-
mybatis-4 mybatis与spring结合使用及原理解析
-
Spring Boot RabbitMQ 延迟消息实现完整版示例
-
详解在spring boot中消息推送系统设计与实现
-
SpringBoot与spring security的结合的示例
-
SpringBoot与rabbitmq的结合的示例
-
深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议
-
Spring Boot2.X整合消息中间件RabbitMQ原理简浅探析
-
RabbitMQ与.net core(四) 消息的优先级 与 死信队列
-
RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间