欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RabbitMQ与spring结合-fanout消息

程序员文章站 2022-05-17 08:33:55
...

RabbitMQ与spring结合-fanout消息

https://docs.spring.io/spring-amqp/docs/2.1.6.RELEASE/reference/html/#_introduction
spring官网

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
  <version>2.1.6.RELEASE</version>
</dependency>

项目结构

RabbitMQ与spring结合-fanout消息

env.properties中存着配置

#mq config
mq.host=127.0.0.1
mq.username=root
mq.password=root123
mq.port=5672
mq.vhost=/

pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.ghgcn</groupId>
	<artifactId>spring-mq</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<version.jdk>1.8</version.jdk>
	</properties>

	<dependencies>

		<dependency>
			<groupId>com.sun</groupId>
			<artifactId>tools</artifactId>
			<version>${version.jdk}</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
		<!--日志-->
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-core</artifactId>
			<version>1.2.3</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.2.3</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.8</version>
			<scope>provided</scope>
		</dependency>
		<!--只需要集成这个-->
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit</artifactId>
			<version>2.1.6.RELEASE</version>
		</dependency>

	</dependencies>
	<profiles>
		<profile>
			<id>dev</id>
			<properties>
				<env>dev</env>
			</properties>
			<activation>
				<activeByDefault>true</activeByDefault>
			</activation>
		</profile>
		<profile>
			<id>test</id>
			<properties>
				<env>test</env>
			</properties>
		</profile>
		<profile>
			<id>prod</id>
			<properties>
				<env>prod</env>
			</properties>
		</profile>
		<profile>
			<id>prep</id>
			<properties>
				<env>prep</env>
			</properties>
		</profile>
	</profiles>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.2</version>
				<configuration>
					<source>${version.jdk}</source>
					<target>${version.jdk}</target>
					<encoding>${project.build.sourceEncoding}</encoding>
				</configuration>
			</plugin>
		</plugins>
		<filters>
			<filter>src/env/${env}.properties</filter>
		</filters>
		<resources>
			<resource>
				<directory>src/main/resources</directory>
				<includes>
					<include>**/*</include>
				</includes>
				<filtering>true</filtering>
			</resource>
		</resources>
	</build>
</project>

使用xml配置-fanout

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-2.1.xsd">


	<!-- 工厂 -->
	<rabbit:connection-factory
		id="mqConnectionFactory" host="${mq.host}" port="${mq.port}"
		username="${mq.username}" password="${mq.password}"
		virtual-host="${mq.vhost}" publisher-confirms="true"
		channel-cache-size="50" />


<!--管理  -->
	<rabbit:admin id="mqadmin"
		connection-factory="mqConnectionFactory" />
<!-- mq模版 -->
	<rabbit:template id="amqpTemplate"
		connection-factory="mqConnectionFactory" />

	<!--声明fanout交换器 -->
	<rabbit:fanout-exchange
		name="fanout_spring_exchange" durable="true">
		<!--绑定队列  -->
		<rabbit:bindings>
			<rabbit:binding queue="fanout_spring_queue"/>
			<rabbit:binding queue="fanout_spring_queue1"/>
		</rabbit:bindings>
	</rabbit:fanout-exchange>
	<!-- 声明fanout队列  2个-->
	<rabbit:queue name="fanout_spring_queue" durable="true" 
		auto-delete="false" exclusive="false" declared-by="mqadmin" />
		
	<rabbit:queue name="fanout_spring_queue1" durable="true"
		auto-delete="false" exclusive="false" declared-by="mqadmin" />
	<!-- 绑定关系 -->
	<bean id="fanoutMessageListener"
		class="com.ghgcn.springmq.fanout.listener.FanoutMessageListener" />
	<bean id="fanoutMessageListener1"
		class="com.ghgcn.springmq.fanout.listener.FanoutMessageListener1" />
		
	<!-- 绑定 -->	
	<rabbit:listener-container 
		connection-factory="mqConnectionFactory" acknowledge="manual" >
		<rabbit:listener ref="fanoutMessageListener"
			queues="fanout_spring_queue" />
		<rabbit:listener ref="fanoutMessageListener1"
			queues="fanout_spring_queue1"  />
	</rabbit:listener-container>


</beans>

消费者1-2

package com.ghgcn.springmq.fanout.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FanoutMessageListener implements ChannelAwareMessageListener {

	@Override
	public void onMessage(Message message, Channel channel) throws Exception {

		log.info(" 接收消息  FanoutMessageListener-message  {} ",message);
		log.info(" 接收消息  FanoutMessageListener-channel  {} ",channel);
		
		//消费确认
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
	}

}

package com.ghgcn.springmq.fanout.listener;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FanoutMessageListener1 implements ChannelAwareMessageListener {

	@Override
	public void onMessage(Message message, Channel channel) throws Exception {

		log.info(" 接收消息 FanoutMessageListener1-message  {} ",message);
		log.info(" 接收消息  FanoutMessageListener1-channel  {} ",channel);
		
		//消费确认
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
	}

}




生产者

package com.ghgcn.springmq.fanout;

import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.rabbitmq.client.MessageProperties;

public class FanoutProducer1 {

	
	private static String exchange="fanout_spring_exchange";
	@SuppressWarnings("resource")
	public static void main(String[] args) {

		//amqpTemplate
		ApplicationContext applicationContext = new ClassPathXmlApplicationContext("fanout.xml");
		
		
		AmqpTemplate amqpTemplate = applicationContext.getBean("amqpTemplate", AmqpTemplate.class);
		
		System.out.println("fanout message");
		//构建消息
		byte [] msgBytes = ("fanout message test "+ new Date()).getBytes();
		Message messager = MessageBuilder.withBody(msgBytes)
        .setContentType(MessageProperties.TEXT_PLAIN.getContentType())
        .build();
		//发送消息
		amqpTemplate.convertAndSend(exchange, "", messager);
		
		System.out.println("===发送结束===");
		
		
		
		
	}

}


运行 生产者

fanout message
10:58:06.804 [main] DEBUG org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Creating cached Rabbit Channel from PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,3)
10:58:06.804 [main] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - Added listener aaa@qq.com
10:58:06.804 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,3), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608] to map, size now 1
10:58:06.804 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$98/1239759990 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,3), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608]
10:58:06.805 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'fanout message test Fri May 31 10:58:06 CST 2019' MessageProperties [headers={}, contentType=text/plain, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [fanout_spring_exchange], routingKey = []
===发送结束===
10:58:06.809 [pool-1-thread-5] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-mJQgpSuitWkBUqRNlsl0bA' with deliveryTag: '1' in aaa@qq.com: tags=[[amq.ctag-mJQgpSuitWkBUqRNlsl0bA]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608], acknowledgeMode=MANUAL local queue size=0
10:58:06.809 [pool-1-thread-6] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-iIv7FMOwGCdcPsTeMmSSxA' with deliveryTag: '1' in aaa@qq.com: tags=[[amq.ctag-iIv7FMOwGCdcPsTeMmSSxA]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608], acknowledgeMode=MANUAL local queue size=0
10:58:06.809 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'fanout message test Fri May 31 10:58:06 CST 2019' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanout_spring_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-mJQgpSuitWkBUqRNlsl0bA, consumerQueue=fanout_spring_queue])
10:58:06.809 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#1-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'fanout message test Fri May 31 10:58:06 CST 2019' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanout_spring_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-iIv7FMOwGCdcPsTeMmSSxA, consumerQueue=fanout_spring_queue1])
10:58:06.809 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.fanout.listener.FanoutMessageListener -  接收消息  FanoutMessageListener-message  (Body:'fanout message test Fri May 31 10:58:06 CST 2019' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanout_spring_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-mJQgpSuitWkBUqRNlsl0bA, consumerQueue=fanout_spring_queue]) 
10:58:06.809 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#1-1] INFO com.ghgcn.springmq.fanout.listener.FanoutMessageListener1 -  接收消息 FanoutMessageListener1-message  (Body:'fanout message test Fri May 31 10:58:06 CST 2019' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanout_spring_exchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-iIv7FMOwGCdcPsTeMmSSxA, consumerQueue=fanout_spring_queue1]) 
10:58:06.810 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0-1] INFO com.ghgcn.springmq.fanout.listener.FanoutMessageListener -  接收消息  FanoutMessageListener-channel  Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,1), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608] 
10:58:06.810 [org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#1-1] INFO com.ghgcn.springmq.fanout.listener.FanoutMessageListener1 -  接收消息  FanoutMessageListener1-channel  Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,2), conn: aaa@qq.com Shared Rabbit Connection: aaa@qq.com [delegate=amqp://aaa@qq.com:5672/, localPort= 51608] 
10:58:06.847 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://aaa@qq.com:5672/,3) PC:Ack:1:false

github:https://github.com/ln0491/rabbitmq-spring-demo

相关标签: RabbitMQ