RabbitMQ与spring结合-fanout消息
程序员文章站
2022-05-17 08:33:55
...
RabbitMQ与spring结合-fanout消息
https://docs.spring.io/spring-amqp/docs/2.1.6.RELEASE/reference/html/#_introduction
spring官网
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
项目结构
env.properties中存着配置
#mq config
mq.host=127.0.0.1
mq.username=root
mq.password=root123
mq.port=5672
mq.vhost=/
pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ghgcn</groupId>
<artifactId>spring-mq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<version.jdk>1.8</version.jdk>
</properties>
<dependencies>
<dependency>
<groupId>com.sun</groupId>
<artifactId>tools</artifactId>
<version>${version.jdk}</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<!--日志-->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<scope>provided</scope>
</dependency>
<!--只需要集成这个-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>dev</id>
<properties>
<env>dev</env>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
<profile>
<id>test</id>
<properties>
<env>test</env>
</properties>
</profile>
<profile>
<id>prod</id>
<properties>
<env>prod</env>
</properties>
</profile>
<profile>
<id>prep</id>
<properties>
<env>prep</env>
</properties>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>${version.jdk}</source>
<target>${version.jdk}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
<filters>
<filter>src/env/${env}.properties</filter>
</filters>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
</build>
</project>
使用xml配置-fanout
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.1.xsd">
<!-- 工厂 -->
<rabbit:connection-factory
id="mqConnectionFactory" host="${mq.host}" port="${mq.port}"
username="${mq.username}" password="${mq.password}"
virtual-host="${mq.vhost}" publisher-confirms="true"
channel-cache-size="50" />
<!--管理 -->
<rabbit:admin id="mqadmin"
connection-factory="mqConnectionFactory" />
<!-- mq模版 -->
<rabbit:template id="amqpTemplate"
connection-factory="mqConnectionFactory" />
<!--声明fanout交换器 -->
<rabbit:fanout-exchange
name="fanout_spring_exchange" durable="true">
<!--绑定队列 -->
<rabbit:bindings>
<rabbit:binding queue="fanout_spring_queue"/>
<rabbit:binding queue="fanout_spring_queue1"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 声明fanout队列 2个-->
<rabbit:queue name="fanout_spring_queue" durable="true"
auto-delete="false" exclusive="false" declared-by="mqadmin" />
<rabbit:queue name="fanout_spring_queue1" durable="true"
auto-delete="false" exclusive="false" declared-by="mqadmin" />
<!-- 绑定关系 -->
<bean id="fanoutMessageListener"
class="com.ghgcn.springmq.fanout.listener.FanoutMessageListener" />
<bean id="fanoutMessageListener1"
class="com.ghgcn.springmq.fanout.listener.FanoutMessageListener1" />
<!-- 绑定 -->
<rabbit:listener-container
connection-factory="mqConnectionFactory" acknowledge="manual" >
<rabbit:listener ref="fanoutMessageListener"
queues="fanout_spring_queue" />
<rabbit:listener ref="fanoutMessageListener1"
queues="fanout_spring_queue1" />
</rabbit:listener-container>
</beans>
消费者1-2
package com.ghgcn.springmq.fanout.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FanoutMessageListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
log.info(" 接收消息 FanoutMessageListener-message {} ",message);
log.info(" 接收消息 FanoutMessageListener-channel {} ",channel);
//消费确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
package com.ghgcn.springmq.fanout.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FanoutMessageListener1 implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
log.info(" 接收消息 FanoutMessageListener1-message {} ",message);
log.info(" 接收消息 FanoutMessageListener1-channel {} ",channel);
//消费确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
生产者
package com.ghgcn.springmq.fanout;
import java.util.Date;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.rabbitmq.client.MessageProperties;
public class FanoutProducer1 {
private static String exchange="fanout_spring_exchange";
@SuppressWarnings("resource")
public static void main(String[] args) {
//amqpTemplate
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("fanout.xml");
AmqpTemplate amqpTemplate = applicationContext.getBean("amqpTemplate", AmqpTemplate.class);
System.out.println("fanout message");
//构建消息
byte [] msgBytes = ("fanout message test "+ new Date()).getBytes();
Message messager = MessageBuilder.withBody(msgBytes)
.setContentType(MessageProperties.TEXT_PLAIN.getContentType())
.build();
//发送消息
amqpTemplate.convertAndSend(exchange, "", messager);
System.out.println("===发送结束===");
}
}
运行 生产者
fanout message
10:58:06.804 [main] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,3)
10:58:06.804 [main] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - Added listener aaa@qq.com
10:58:06.804 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,3), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608] to map, size now 1
10:58:06.804 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$98/1239759990 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,3), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608]
10:58:06.805 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'fanout message test Fri May 31 10:58:06 CST 2019' MessageProperties [headers={}, contentType=text/plain, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [fanout_spring_exchange], routingKey = []
===发送结束===
10:58:06.809 [pool-1-thread-5] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-mJQgpSuitWkBUqRNlsl0bA' with deliveryTag: '1' in aaa@qq.com: tags=[[amq.ctag-mJQgpSuitWkBUqRNlsl0bA]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608], acknowledgeMode=MANUAL local queue size=0
10:58:06.809 [pool-1-thread-6] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-iIv7FMOwGCdcPsTeMmSSxA' with deliveryTag: '1' in aaa@qq.com: tags=[[amq.ctag-iIv7FMOwGCdcPsTeMmSSxA]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608], acknowledgeMode=MANUAL local queue size=0
10:58:06.809 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'fanout message test Fri May 31 10:58:06 CST 2019' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanout_spring_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-mJQgpSuitWkBUqRNlsl0bA, consumerQueue=fanout_spring_queue])
10:58:06.809 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#1-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'fanout message test Fri May 31 10:58:06 CST 2019' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanout_spring_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-iIv7FMOwGCdcPsTeMmSSxA, consumerQueue=fanout_spring_queue1])
10:58:06.809 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.fanout.listener.FanoutMessageListener - 接收消息 FanoutMessageListener-message (Body:'fanout message test Fri May 31 10:58:06 CST 2019' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanout_spring_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-mJQgpSuitWkBUqRNlsl0bA, consumerQueue=fanout_spring_queue])
10:58:06.809 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#1-1] INFO com.ghgcn.springmq.fanout.listener.FanoutMessageListener1 - 接收消息 FanoutMessageListener1-message (Body:'fanout message test Fri May 31 10:58:06 CST 2019' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanout_spring_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-iIv7FMOwGCdcPsTeMmSSxA, consumerQueue=fanout_spring_queue1])
10:58:06.810 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.fanout.listener.FanoutMessageListener - 接收消息 FanoutMessageListener-channel Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608]
10:58:06.810 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#1-1] INFO com.ghgcn.springmq.fanout.listener.FanoutMessageListener1 - 接收消息 FanoutMessageListener1-channel Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608]
10:58:06.847 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,3) PC:Ack:1:false
上一篇: centos7下fastdfs安装教程
推荐阅读
-
SpringBoot与spring security的结合的示例
-
SpringBoot与rabbitmq的结合的示例
-
深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议
-
Spring Boot2.X整合消息中间件RabbitMQ原理简浅探析
-
RabbitMQ与.net core(四) 消息的优先级 与 死信队列
-
RabbitMQ与.net core(三) fanout类型Exchange 与 消息的过期时间 与 队列的存活时间
-
RabbitMQ与Spring AMQP
-
消息队列RabbitMQ之Spring-AMQP
-
spring amqp rabbitmq 学习(二) 接收消息
-
spring amqp rabbitmq 学习(一) 发送消息